#!/usr/bin/env python3 """ Flask server for YouTube Concert Splitter with async job processing Downloads YouTube videos and splits them into tracks based on a setlist. """ from flask import Flask, request, render_template, jsonify, Response import os import re import subprocess import yt_dlp from pydub import AudioSegment from mutagen.easyid3 import EasyID3 import json import sys import threading import uuid from datetime import datetime app = Flask(__name__) app.secret_key = os.environ.get('SECRET_KEY', 'your_secret_key_here_change_this') # Configuration DOWNLOAD_FOLDER = os.environ.get('DOWNLOAD_FOLDER', 'youtube') os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) # Job storage (in production, use Redis or database) jobs = {} def sanitize(s: str) -> str: """Sanitize string for use as filename.""" s = s.strip() s = re.sub(r'[\\/:"*?<>|]+', '', s) s = re.sub(r"[^A-Za-z0-9 _\-]", '_', s) return s def parse_timestamp(ts: str) -> int: """Parse timestamp string (HH:MM:SS or MM:SS) to milliseconds.""" parts = ts.strip().split(':') try: if len(parts) == 3: h, m, s = parts elif len(parts) == 2: h = 0 m, s = parts else: raise ValueError(f"Invalid timestamp format: '{ts}'") return (int(h) * 3600 + int(m) * 60 + int(s)) * 1000 except ValueError as e: raise ValueError(f"Cannot parse timestamp '{ts}': {e}") def download_youtube_audio(url: str, output_folder: str, job_id: str): """Download YouTube video and convert to MP3 at 320kbps.""" jobs[job_id]['status'] = 'downloading' jobs[job_id]['progress'] = 'Downloading video from YouTube...' cookies_file = 'cookies.txt' has_cookies = os.path.isfile(cookies_file) ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': f'{output_folder}/%(title)s.%(ext)s', 'noplaylist': True, 'quiet': False, 'no_warnings': False, 'extract_flat': False, 'ignoreerrors': False, 'cookiefile': cookies_file if has_cookies else None, 'nocheckcertificate': True, 'geo_bypass': True, 'age_limit': None, 'http_headers': { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', }, 'extractor_args': { 'youtube': { 'player_client': ['android', 'web'], 'player_skip': ['webpage', 'configs'], } }, 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '320', }], 'retries': 10, 'fragment_retries': 10, 'skip_unavailable_fragments': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) base_name = ydl.prepare_filename(info) mp3_path = os.path.splitext(base_name)[0] + '.mp3' if not os.path.isfile(mp3_path): raise FileNotFoundError(f"Downloaded MP3 not found: {mp3_path}") jobs[job_id]['progress'] = 'Download complete. Processing audio...' return mp3_path, info def parse_setlist(setlist_text: str): """Parse setlist text into list of (timestamp_ms, title) tuples.""" entries = [] lines = setlist_text.strip().split('\n') for line in lines: line = line.strip() if not line: continue m = re.match(r"(\d+:\d+(?::\d+)?)\s+(.+)", line) if not m: raise ValueError(f"Invalid setlist line format: '{line}'") ts, title = m.groups() try: timestamp_ms = parse_timestamp(ts) entries.append((timestamp_ms, title.strip())) except ValueError as e: raise ValueError(f"Error parsing line '{line}': {e}") entries.sort(key=lambda x: x[0]) return entries def split_audio(mp3_path: str, entries: list, album: str, artist: str, output_dir: str, job_id: str): """Split audio file into tracks based on setlist entries.""" jobs[job_id]['status'] = 'splitting' jobs[job_id]['progress'] = 'Loading audio file...' audio = AudioSegment.from_file(mp3_path) total_ms = len(audio) track_results = [] created_count = 0 skipped_count = 0 for idx, (start_ms, title) in enumerate(entries, start=1): jobs[job_id]['progress'] = f'Processing track {idx}/{len(entries)}: {title}' end_ms = entries[idx][0] if idx < len(entries) else total_ms filename = f"{idx:02d} - {sanitize(title)}.mp3" filepath = os.path.join(output_dir, filename) if os.path.isfile(filepath): track_results.append({'filename': filename, 'status': 'skipped'}) skipped_count += 1 continue segment = audio[start_ms:end_ms] segment.export(filepath, format='mp3', bitrate='320k') try: tags = EasyID3(filepath) tags['title'] = title tags['album'] = album tags['artist'] = artist tags['tracknumber'] = str(idx) tags.save() except Exception as e: print(f"Warning: Could not add ID3 tags to {filename}: {e}", flush=True) track_results.append({'filename': filename, 'status': 'created'}) created_count += 1 return track_results, created_count, skipped_count def set_permissions(directory: str): """Set directory permissions to 775 recursively.""" try: subprocess.run(['chmod', '-R', '775', directory], check=True) except (subprocess.CalledProcessError, FileNotFoundError): pass def process_job(job_id: str, url: str, artist: str, album: str, setlist_text: str): """Background job processor.""" try: album_sanitized = sanitize(album) output_dir = os.path.join(DOWNLOAD_FOLDER, album_sanitized) if not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True) # Download audio mp3_path, info = download_youtube_audio(url, DOWNLOAD_FOLDER, job_id) # Single song mode or split mode if not setlist_text: jobs[job_id]['status'] = 'processing' jobs[job_id]['progress'] = 'Creating single track...' track_title = info.get('title', 'Unknown Track') filename = f"01 - {sanitize(track_title)}.mp3" filepath = os.path.join(output_dir, filename) if os.path.isfile(filepath): os.remove(mp3_path) track_results = [{'filename': filename, 'status': 'skipped'}] created_count = 0 skipped_count = 1 else: audio = AudioSegment.from_file(mp3_path) audio.export(filepath, format='mp3', bitrate='320k') try: tags = EasyID3(filepath) tags['title'] = track_title tags['album'] = album tags['artist'] = artist tags['tracknumber'] = '1' tags.save() except Exception as e: print(f"Warning: Could not add ID3 tags: {e}", flush=True) os.remove(mp3_path) track_results = [{'filename': filename, 'status': 'created'}] created_count = 1 skipped_count = 0 else: # Split mode entries = parse_setlist(setlist_text) if not entries: raise ValueError('No valid tracks found in setlist') track_results, created_count, skipped_count = split_audio( mp3_path, entries, album, artist, output_dir, job_id ) os.remove(mp3_path) # Set permissions set_permissions(output_dir) # Update job with results jobs[job_id]['status'] = 'completed' jobs[job_id]['progress'] = 'Processing complete!' jobs[job_id]['result'] = { 'success': True, 'album': album, 'artist': artist, 'total_tracks': len(track_results), 'created_count': created_count, 'skipped_count': skipped_count, 'tracks': track_results, 'output_dir': output_dir } except Exception as e: print(f"Job {job_id} failed: {str(e)}", flush=True) import traceback traceback.print_exc() jobs[job_id]['status'] = 'failed' jobs[job_id]['error'] = str(e) @app.route('/') def index(): """Render the main page.""" return render_template('index.html') @app.route('/split', methods=['POST']) def split_concert(): """Submit a split job and return job ID immediately.""" try: url = request.form.get('youtube_url', '').strip() artist = request.form.get('artist', '').strip() album = request.form.get('album', '').strip() setlist_text = request.form.get('setlist', '').strip() if not url: return jsonify({'error': 'No YouTube URL provided'}), 400 if not artist: return jsonify({'error': 'No artist name provided'}), 400 if not album: return jsonify({'error': 'No album name provided'}), 400 # Create job job_id = str(uuid.uuid4()) jobs[job_id] = { 'status': 'queued', 'progress': 'Job queued...', 'created_at': datetime.now().isoformat(), 'url': url, 'artist': artist, 'album': album } # Start background thread thread = threading.Thread( target=process_job, args=(job_id, url, artist, album, setlist_text) ) thread.daemon = True thread.start() print(f"✓ Job {job_id} created and started", flush=True) return jsonify({ 'success': True, 'job_id': job_id }) except Exception as e: print(f"Error creating job: {str(e)}", flush=True) return jsonify({'error': str(e)}), 500 @app.route('/status/', methods=['GET']) def get_status(job_id): """Get job status and results.""" if job_id not in jobs: return jsonify({'error': 'Job not found'}), 404 job = jobs[job_id] response = { 'status': job['status'], 'progress': job.get('progress', '') } if job['status'] == 'completed': response['result'] = job.get('result', {}) elif job['status'] == 'failed': response['error'] = job.get('error', 'Unknown error') return jsonify(response) if __name__ == '__main__': host = os.environ.get('FLASK_HOST', '0.0.0.0') port = int(os.environ.get('FLASK_PORT', 5000)) debug = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true' print(f"Starting YouTube Concert Splitter on {host}:{port}", flush=True) print(f"Music directory: {DOWNLOAD_FOLDER}", flush=True) app.run(host="0.0.0.0", port=port, debug=debug, threaded=True)