369 lines
12 KiB
Python
369 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Flask server for YouTube Concert Splitter
|
|
Downloads YouTube videos and splits them into tracks based on a setlist.
|
|
"""
|
|
from flask import Flask, request, render_template, jsonify
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import yt_dlp
|
|
from pydub import AudioSegment
|
|
from mutagen.easyid3 import EasyID3
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.environ.get('SECRET_KEY', 'your_secret_key_here_change_this')
|
|
|
|
# Configuration
|
|
DOWNLOAD_FOLDER = os.environ.get('DOWNLOAD_FOLDER', 'youtube')
|
|
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
|
|
|
|
|
|
def sanitize(s: str) -> str:
|
|
"""Sanitize string for use as filename."""
|
|
s = s.strip()
|
|
s = re.sub(r'[\\/:"*?<>|]+', '', s)
|
|
s = re.sub(r"[^A-Za-z0-9 _\-]", '_', s)
|
|
return s
|
|
|
|
|
|
def parse_timestamp(ts: str) -> int:
|
|
"""Parse timestamp string (HH:MM:SS or MM:SS) to milliseconds."""
|
|
parts = ts.strip().split(':')
|
|
try:
|
|
if len(parts) == 3:
|
|
h, m, s = parts
|
|
elif len(parts) == 2:
|
|
h = 0
|
|
m, s = parts
|
|
else:
|
|
raise ValueError(f"Invalid timestamp format: '{ts}'")
|
|
return (int(h) * 3600 + int(m) * 60 + int(s)) * 1000
|
|
except ValueError as e:
|
|
raise ValueError(f"Cannot parse timestamp '{ts}': {e}")
|
|
|
|
|
|
def download_youtube_audio(url: str, output_folder: str):
|
|
"""Download YouTube video and convert to MP3 at 320kbps with enhanced anti-blocking measures."""
|
|
|
|
# Check if cookies file exists
|
|
cookies_file = 'cookies.txt'
|
|
has_cookies = os.path.isfile(cookies_file)
|
|
if has_cookies:
|
|
print(f"✓ Using cookies from {cookies_file}")
|
|
else:
|
|
print("⚠️ No cookies.txt found - download may fail for some videos")
|
|
print(" See COOKIES_INSTRUCTIONS.txt for how to add cookies")
|
|
|
|
# Enhanced options to bypass YouTube restrictions
|
|
ydl_opts = {
|
|
'format': 'bestaudio/best',
|
|
'outtmpl': f'{output_folder}/%(title)s.%(ext)s',
|
|
'noplaylist': True,
|
|
'quiet': False,
|
|
'no_warnings': False,
|
|
'extract_flat': False,
|
|
'ignoreerrors': False,
|
|
|
|
# Use cookies if available
|
|
'cookiefile': cookies_file if has_cookies else None,
|
|
|
|
# Anti-blocking measures
|
|
'nocheckcertificate': True,
|
|
'geo_bypass': True,
|
|
'age_limit': None,
|
|
|
|
# Better headers to mimic a real browser
|
|
'http_headers': {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
|
'Accept-Language': 'en-us,en;q=0.5',
|
|
'Accept-Encoding': 'gzip,deflate',
|
|
'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7',
|
|
'Connection': 'keep-alive',
|
|
},
|
|
|
|
# Extractor specific arguments for YouTube
|
|
'extractor_args': {
|
|
'youtube': {
|
|
'player_client': ['android', 'web'],
|
|
'player_skip': ['webpage', 'configs'],
|
|
}
|
|
},
|
|
|
|
# Post-processing
|
|
'postprocessors': [{
|
|
'key': 'FFmpegExtractAudio',
|
|
'preferredcodec': 'mp3',
|
|
'preferredquality': '320',
|
|
}],
|
|
|
|
# Retry options
|
|
'retries': 10,
|
|
'fragment_retries': 10,
|
|
'skip_unavailable_fragments': True,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(url, download=True)
|
|
base_name = ydl.prepare_filename(info)
|
|
|
|
# Determine the actual MP3 filename
|
|
mp3_path = os.path.splitext(base_name)[0] + '.mp3'
|
|
|
|
if not os.path.isfile(mp3_path):
|
|
raise FileNotFoundError(f"Downloaded MP3 not found: {mp3_path}")
|
|
|
|
return mp3_path, info
|
|
|
|
|
|
def parse_setlist(setlist_text: str):
|
|
"""Parse setlist text into list of (timestamp_ms, title) tuples."""
|
|
entries = []
|
|
lines = setlist_text.strip().split('\n')
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Match pattern: "TIMESTAMP TITLE"
|
|
m = re.match(r"(\d+:\d+(?::\d+)?)\s+(.+)", line)
|
|
if not m:
|
|
raise ValueError(f"Invalid setlist line format: '{line}'")
|
|
|
|
ts, title = m.groups()
|
|
try:
|
|
timestamp_ms = parse_timestamp(ts)
|
|
entries.append((timestamp_ms, title.strip()))
|
|
except ValueError as e:
|
|
raise ValueError(f"Error parsing line '{line}': {e}")
|
|
|
|
# Sort by timestamp
|
|
entries.sort(key=lambda x: x[0])
|
|
return entries
|
|
|
|
|
|
def split_audio(mp3_path: str, entries: list, album: str, artist: str, output_dir: str):
|
|
"""
|
|
Split audio file into tracks based on setlist entries.
|
|
Skip tracks that already exist.
|
|
Returns list of track info with status (created or skipped).
|
|
"""
|
|
print(f"Loading audio file: {mp3_path}")
|
|
audio = AudioSegment.from_file(mp3_path)
|
|
total_ms = len(audio)
|
|
|
|
track_results = []
|
|
created_count = 0
|
|
skipped_count = 0
|
|
|
|
for idx, (start_ms, title) in enumerate(entries, start=1):
|
|
# Determine end time
|
|
end_ms = entries[idx][0] if idx < len(entries) else total_ms
|
|
|
|
# Create filename
|
|
filename = f"{idx:02d} - {sanitize(title)}.mp3"
|
|
filepath = os.path.join(output_dir, filename)
|
|
|
|
# Check if file already exists
|
|
if os.path.isfile(filepath):
|
|
print(f"Skipping track {idx}/{len(entries)}: {filename} (already exists)")
|
|
track_results.append({
|
|
'filename': filename,
|
|
'status': 'skipped'
|
|
})
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Extract and export segment
|
|
print(f"Creating track {idx}/{len(entries)}: {filename}")
|
|
segment = audio[start_ms:end_ms]
|
|
|
|
# Export with 320kbps
|
|
segment.export(filepath, format='mp3', bitrate='320k')
|
|
|
|
# Add ID3 tags
|
|
try:
|
|
tags = EasyID3(filepath)
|
|
tags['title'] = title
|
|
tags['album'] = album
|
|
tags['artist'] = artist
|
|
tags['tracknumber'] = str(idx)
|
|
tags.save()
|
|
except Exception as e:
|
|
print(f"Warning: Could not add ID3 tags to {filename}: {e}")
|
|
|
|
track_results.append({
|
|
'filename': filename,
|
|
'status': 'created'
|
|
})
|
|
created_count += 1
|
|
|
|
return track_results, created_count, skipped_count
|
|
|
|
|
|
def set_permissions(directory: str):
|
|
"""Set directory permissions to 775 recursively."""
|
|
try:
|
|
subprocess.run(['chmod', '-R', '775', directory], check=True)
|
|
print(f"Set permissions 775 on {directory}")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Warning: Could not set permissions: {e}")
|
|
except FileNotFoundError:
|
|
print("Warning: chmod command not found (might be on Windows)")
|
|
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Render the main page."""
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/split', methods=['POST'])
|
|
def split_concert():
|
|
"""Handle the split request."""
|
|
try:
|
|
# Get form data
|
|
url = request.form.get('youtube_url', '').strip()
|
|
artist = request.form.get('artist', '').strip()
|
|
album = request.form.get('album', '').strip()
|
|
setlist_text = request.form.get('setlist', '').strip()
|
|
|
|
# Validate required inputs (setlist is OPTIONAL)
|
|
if not url:
|
|
return jsonify({'error': 'No YouTube URL provided'}), 400
|
|
|
|
if not artist:
|
|
return jsonify({'error': 'No artist name provided'}), 400
|
|
|
|
if not album:
|
|
return jsonify({'error': 'No album name provided'}), 400
|
|
|
|
# Sanitize album for directory name
|
|
album_sanitized = sanitize(album)
|
|
|
|
# Create output directory directly in DOWNLOAD_FOLDER
|
|
output_dir = os.path.join(DOWNLOAD_FOLDER, album_sanitized)
|
|
|
|
# Create directory only if it doesn't exist
|
|
if not os.path.exists(output_dir):
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
print(f"✓ Created album directory: {output_dir}")
|
|
else:
|
|
print(f"✓ Album directory already exists: {output_dir}")
|
|
|
|
# Download audio
|
|
print(f"Downloading audio from: {url}")
|
|
mp3_path, info = download_youtube_audio(url, DOWNLOAD_FOLDER)
|
|
|
|
# DECISION POINT: Empty setlist = Single song mode
|
|
if not setlist_text:
|
|
print("📀 Single song mode: No setlist provided")
|
|
|
|
# Use video title as track name
|
|
track_title = info.get('title', 'Unknown Track')
|
|
filename = f"01 - {sanitize(track_title)}.mp3"
|
|
filepath = os.path.join(output_dir, filename)
|
|
|
|
# Check if file already exists
|
|
if os.path.isfile(filepath):
|
|
print(f"⊘ Track already exists: {filename}")
|
|
os.remove(mp3_path) # Clean up downloaded file
|
|
|
|
track_results = [{
|
|
'filename': filename,
|
|
'status': 'skipped'
|
|
}]
|
|
created_count = 0
|
|
skipped_count = 1
|
|
else:
|
|
# Move and rename the file (no splitting needed)
|
|
print(f"✓ Creating single track: {filename}")
|
|
|
|
# Load audio to re-export with proper tags
|
|
audio = AudioSegment.from_file(mp3_path)
|
|
audio.export(filepath, format='mp3', bitrate='320k')
|
|
|
|
# Add ID3 tags
|
|
try:
|
|
tags = EasyID3(filepath)
|
|
tags['title'] = track_title
|
|
tags['album'] = album
|
|
tags['artist'] = artist
|
|
tags['tracknumber'] = '1'
|
|
tags.save()
|
|
print(f"✓ Added ID3 tags")
|
|
except Exception as e:
|
|
print(f"⚠️ Warning: Could not add ID3 tags: {e}")
|
|
|
|
# Clean up original
|
|
os.remove(mp3_path)
|
|
|
|
track_results = [{
|
|
'filename': filename,
|
|
'status': 'created'
|
|
}]
|
|
created_count = 1
|
|
skipped_count = 0
|
|
|
|
else:
|
|
# SPLIT MODE: Parse setlist and split audio
|
|
print(f"✂️ Split mode: Processing setlist with {len(setlist_text.splitlines())} lines")
|
|
|
|
# Parse setlist for splitting
|
|
try:
|
|
entries = parse_setlist(setlist_text)
|
|
except ValueError as e:
|
|
os.remove(mp3_path) # Clean up downloaded file
|
|
return jsonify({'error': f'Setlist parsing error: {str(e)}'}), 400
|
|
|
|
if not entries:
|
|
os.remove(mp3_path) # Clean up downloaded file
|
|
return jsonify({'error': 'No valid tracks found in setlist'}), 400
|
|
|
|
# Split audio into tracks (skipping existing ones)
|
|
print(f"Splitting into {len(entries)} tracks...")
|
|
track_results, created_count, skipped_count = split_audio(
|
|
mp3_path, entries, album, artist, output_dir
|
|
)
|
|
|
|
# Clean up original MP3
|
|
print(f"Removing original file: {mp3_path}")
|
|
os.remove(mp3_path)
|
|
|
|
# Set permissions
|
|
set_permissions(output_dir)
|
|
|
|
# Return success response with detailed track information
|
|
return jsonify({
|
|
'success': True,
|
|
'album': album,
|
|
'artist': artist,
|
|
'total_tracks': len(track_results),
|
|
'created_count': created_count,
|
|
'skipped_count': skipped_count,
|
|
'tracks': track_results,
|
|
'output_dir': output_dir
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
if __name__ == '__main__':
|
|
# Get configuration from environment variables
|
|
host = os.environ.get('FLASK_HOST', '0.0.0.0')
|
|
port = int(os.environ.get('FLASK_PORT', 5000))
|
|
debug = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true'
|
|
|
|
print(f"Starting YouTube Concert Splitter on {host}:{port}")
|
|
print(f"Music directory: {DOWNLOAD_FOLDER}")
|
|
|
|
# Run on all interfaces, configurable port
|
|
# app.run(host=host, port=port, debug=debug)
|
|
app.run(host="0.0.0.0", port=port, debug=debug)
|
|
|