#!/usr/bin/env python3 """ Flask server for YouTube Concert Splitter Downloads YouTube videos and splits them into tracks based on a setlist. """ from flask import Flask, request, render_template, jsonify import os import re import subprocess import yt_dlp from pydub import AudioSegment from mutagen.easyid3 import EasyID3 app = Flask(__name__) app.secret_key = os.environ.get('SECRET_KEY', 'your_secret_key_here_change_this') # Configuration DOWNLOAD_FOLDER = os.environ.get('DOWNLOAD_FOLDER', 'youtube') os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) def sanitize(s: str) -> str: """Sanitize string for use as filename.""" s = s.strip() s = re.sub(r'[\\/:"*?<>|]+', '', s) s = re.sub(r"[^A-Za-z0-9 _\-]", '_', s) return s def parse_timestamp(ts: str) -> int: """Parse timestamp string (HH:MM:SS or MM:SS) to milliseconds.""" parts = ts.strip().split(':') try: if len(parts) == 3: h, m, s = parts elif len(parts) == 2: h = 0 m, s = parts else: raise ValueError(f"Invalid timestamp format: '{ts}'") return (int(h) * 3600 + int(m) * 60 + int(s)) * 1000 except ValueError as e: raise ValueError(f"Cannot parse timestamp '{ts}': {e}") def download_youtube_audio(url: str, output_folder: str): """Download YouTube video and convert to MP3 at 320kbps with enhanced anti-blocking measures.""" # Check if cookies file exists cookies_file = 'cookies.txt' has_cookies = os.path.isfile(cookies_file) if has_cookies: print(f"✓ Using cookies from {cookies_file}") else: print("⚠️ No cookies.txt found - download may fail for some videos") print(" See COOKIES_INSTRUCTIONS.txt for how to add cookies") # Enhanced options to bypass YouTube restrictions ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': f'{output_folder}/%(title)s.%(ext)s', 'noplaylist': True, 'quiet': False, 'no_warnings': False, 'extract_flat': False, 'ignoreerrors': False, # Use cookies if available 'cookiefile': cookies_file if has_cookies else None, # Anti-blocking measures 'nocheckcertificate': True, 'geo_bypass': True, 'age_limit': None, # Better headers to mimic a real browser 'http_headers': { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', }, # Extractor specific arguments for YouTube 'extractor_args': { 'youtube': { 'player_client': ['android', 'web'], 'player_skip': ['webpage', 'configs'], } }, # Post-processing 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '320', }], # Retry options 'retries': 10, 'fragment_retries': 10, 'skip_unavailable_fragments': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) base_name = ydl.prepare_filename(info) # Determine the actual MP3 filename mp3_path = os.path.splitext(base_name)[0] + '.mp3' if not os.path.isfile(mp3_path): raise FileNotFoundError(f"Downloaded MP3 not found: {mp3_path}") return mp3_path, info def parse_setlist(setlist_text: str): """Parse setlist text into list of (timestamp_ms, title) tuples.""" entries = [] lines = setlist_text.strip().split('\n') for line in lines: line = line.strip() if not line: continue # Match pattern: "TIMESTAMP TITLE" m = re.match(r"(\d+:\d+(?::\d+)?)\s+(.+)", line) if not m: raise ValueError(f"Invalid setlist line format: '{line}'") ts, title = m.groups() try: timestamp_ms = parse_timestamp(ts) entries.append((timestamp_ms, title.strip())) except ValueError as e: raise ValueError(f"Error parsing line '{line}': {e}") # Sort by timestamp entries.sort(key=lambda x: x[0]) return entries def split_audio(mp3_path: str, entries: list, album: str, artist: str, output_dir: str): """ Split audio file into tracks based on setlist entries. Skip tracks that already exist. Returns list of track info with status (created or skipped). """ print(f"Loading audio file: {mp3_path}") audio = AudioSegment.from_file(mp3_path) total_ms = len(audio) track_results = [] created_count = 0 skipped_count = 0 for idx, (start_ms, title) in enumerate(entries, start=1): # Determine end time end_ms = entries[idx][0] if idx < len(entries) else total_ms # Create filename filename = f"{idx:02d} - {sanitize(title)}.mp3" filepath = os.path.join(output_dir, filename) # Check if file already exists if os.path.isfile(filepath): print(f"Skipping track {idx}/{len(entries)}: {filename} (already exists)") track_results.append({ 'filename': filename, 'status': 'skipped' }) skipped_count += 1 continue # Extract and export segment print(f"Creating track {idx}/{len(entries)}: {filename}") segment = audio[start_ms:end_ms] # Export with 320kbps segment.export(filepath, format='mp3', bitrate='320k') # Add ID3 tags try: tags = EasyID3(filepath) tags['title'] = title tags['album'] = album tags['artist'] = artist tags['tracknumber'] = str(idx) tags.save() except Exception as e: print(f"Warning: Could not add ID3 tags to {filename}: {e}") track_results.append({ 'filename': filename, 'status': 'created' }) created_count += 1 return track_results, created_count, skipped_count def set_permissions(directory: str): """Set directory permissions to 775 recursively.""" try: subprocess.run(['chmod', '-R', '775', directory], check=True) print(f"Set permissions 775 on {directory}") except subprocess.CalledProcessError as e: print(f"Warning: Could not set permissions: {e}") except FileNotFoundError: print("Warning: chmod command not found (might be on Windows)") @app.route('/') def index(): """Render the main page.""" return render_template('index.html') @app.route('/split', methods=['POST']) def split_concert(): """Handle the split request.""" try: # Get form data url = request.form.get('youtube_url', '').strip() artist = request.form.get('artist', '').strip() album = request.form.get('album', '').strip() setlist_text = request.form.get('setlist', '').strip() # Validate required inputs (setlist is OPTIONAL) if not url: return jsonify({'error': 'No YouTube URL provided'}), 400 if not artist: return jsonify({'error': 'No artist name provided'}), 400 if not album: return jsonify({'error': 'No album name provided'}), 400 # Sanitize album for directory name album_sanitized = sanitize(album) # Create output directory directly in DOWNLOAD_FOLDER output_dir = os.path.join(DOWNLOAD_FOLDER, album_sanitized) # Create directory only if it doesn't exist if not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True) print(f"✓ Created album directory: {output_dir}") else: print(f"✓ Album directory already exists: {output_dir}") # Download audio print(f"Downloading audio from: {url}") mp3_path, info = download_youtube_audio(url, DOWNLOAD_FOLDER) # DECISION POINT: Empty setlist = Single song mode if not setlist_text: print("📀 Single song mode: No setlist provided") # Use video title as track name track_title = info.get('title', 'Unknown Track') filename = f"01 - {sanitize(track_title)}.mp3" filepath = os.path.join(output_dir, filename) # Check if file already exists if os.path.isfile(filepath): print(f"⊘ Track already exists: {filename}") os.remove(mp3_path) # Clean up downloaded file track_results = [{ 'filename': filename, 'status': 'skipped' }] created_count = 0 skipped_count = 1 else: # Move and rename the file (no splitting needed) print(f"✓ Creating single track: {filename}") # Load audio to re-export with proper tags audio = AudioSegment.from_file(mp3_path) audio.export(filepath, format='mp3', bitrate='320k') # Add ID3 tags try: tags = EasyID3(filepath) tags['title'] = track_title tags['album'] = album tags['artist'] = artist tags['tracknumber'] = '1' tags.save() print(f"✓ Added ID3 tags") except Exception as e: print(f"⚠️ Warning: Could not add ID3 tags: {e}") # Clean up original os.remove(mp3_path) track_results = [{ 'filename': filename, 'status': 'created' }] created_count = 1 skipped_count = 0 else: # SPLIT MODE: Parse setlist and split audio print(f"✂️ Split mode: Processing setlist with {len(setlist_text.splitlines())} lines") # Parse setlist for splitting try: entries = parse_setlist(setlist_text) except ValueError as e: os.remove(mp3_path) # Clean up downloaded file return jsonify({'error': f'Setlist parsing error: {str(e)}'}), 400 if not entries: os.remove(mp3_path) # Clean up downloaded file return jsonify({'error': 'No valid tracks found in setlist'}), 400 # Split audio into tracks (skipping existing ones) print(f"Splitting into {len(entries)} tracks...") track_results, created_count, skipped_count = split_audio( mp3_path, entries, album, artist, output_dir ) # Clean up original MP3 print(f"Removing original file: {mp3_path}") os.remove(mp3_path) # Set permissions set_permissions(output_dir) # Return success response with detailed track information return jsonify({ 'success': True, 'album': album, 'artist': artist, 'total_tracks': len(track_results), 'created_count': created_count, 'skipped_count': skipped_count, 'tracks': track_results, 'output_dir': output_dir }) except Exception as e: print(f"❌ Error: {str(e)}") import traceback traceback.print_exc() return jsonify({'error': str(e)}), 500 if __name__ == '__main__': # Get configuration from environment variables host = os.environ.get('FLASK_HOST', '0.0.0.0') port = int(os.environ.get('FLASK_PORT', 5000)) debug = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true' print(f"Starting YouTube Concert Splitter on {host}:{port}") print(f"Music directory: {DOWNLOAD_FOLDER}") # Run on all interfaces, configurable port # app.run(host=host, port=port, debug=debug) app.run(host="0.0.0.0", port=port, debug=debug)