#!/usr/bin/env python3 import re import json import subprocess from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from joblib import Parallel, delayed def prompt(message: str, default=None, input_type=str): """Prompt user with optional default value.""" if default is not None: msg = f"{message} [{default}]: " else: msg = f"{message}: " while True: try: value = input(msg).strip() if not value: if default is not None: return input_type(default) print("This field is required.") continue return input_type(value) except ValueError: print(f"Invalid input. Please enter a valid {input_type.__name__}.") def prompt_bool(message: str, default=True): """Prompt user for yes/no.""" default_str = "Y/n" if default else "y/N" while True: value = input(f"{message} [{default_str}]: ").strip().lower() if not value: return default if value in ("y", "yes"): return True if value in ("n", "no"): return False print("Please enter 'y' or 'n'.") def get_config(): """Interactively gather configuration.""" print("\n" + "=" * 60) print("YouTube Playlist Downloader") print("=" * 60 + "\n") # Required fields playlist_url = prompt("Enter playlist URL") album = prompt("Enter album name") # Optional fields with defaults print("\nOptional settings (press Enter for defaults):\n") artist = prompt("Artist name", default="Various Artists") album_artist = prompt("Album artist", default=artist) outdir = Path(prompt("Output directory", default="downloads")) disc = prompt("Disc number", default="1") start_track = prompt("Starting track number", default="1", input_type=int) # Advanced options print("\nAdvanced options (press Enter for defaults):\n") download_workers = prompt( "Max concurrent downloads", default="6", input_type=int ) tag_workers = prompt( "Max concurrent tagging jobs", default="8", input_type=int ) fragments = prompt( "Concurrent HLS/DASH fragments per video", default="8", input_type=int, ) # Confirmation print("\n" + "=" * 60) print("Configuration Summary:") print("=" * 60) print(f"Playlist URL: {playlist_url}") print(f"Album: {album}") print(f"Artist: {artist}") print(f"Album Artist: {album_artist}") print(f"Output Directory: {outdir}") print(f"Disc Number: {disc}") print(f"Starting Track: {start_track}") print(f"Download Workers: {download_workers}") print(f"Tag Workers: {tag_workers}") print(f"Fragment Concurrency: {fragments}") print("=" * 60 + "\n") proceed = prompt_bool("Proceed with these settings?", default=True) if not proceed: print("Cancelled.") exit(0) return { "playlist_url": playlist_url, "album": album, "artist": artist, "album_artist": album_artist, "outdir": outdir, "disc": disc, "start_track": start_track, "download_workers": download_workers, "tag_workers": tag_workers, "fragments": fragments, } def clean_title(title: str) -> str: t = re.sub(r"(?i)^\s*Bongo\s+Cat\s*-\s*", "", title) t = re.sub(r"\s*\(Cover Version\)\s*šŸŽ§\s*$", "", t) return t.strip().strip(".") def run_json(cmd): p = subprocess.run(cmd, check=True, capture_output=True, text=True) return json.loads(p.stdout) def get_playlist_entries(url: str): data = run_json(["yt-dlp", "-J", "--flat-playlist", url]) entries = data.get("entries", []) return [e["id"] for e in entries] def get_title(video_id: str) -> str: meta = run_json( ["yt-dlp", "-J", f"https://www.youtube.com/watch?v={video_id}"] ) return meta.get("title", video_id) def download_one( idx: int, vid: str, cleaned_title: str, outdir: Path, concurrent_fragments: int, ): out_base = f"{idx:03d} - {cleaned_title}" out_tmpl = str(outdir / f"{out_base}.%(ext)s") url = f"https://www.youtube.com/watch?v={vid}" cmd = [ "yt-dlp", url, "--extract-audio", "--audio-format", "opus", "--audio-quality", "0", "--embed-thumbnail", "--embed-metadata", "--add-metadata", "--concurrent-fragments", str(concurrent_fragments), "-o", out_tmpl, ] subprocess.run(cmd, check=True) opus = outdir / f"{out_base}.opus" if opus.exists(): return opus ogg = outdir / f"{out_base}.ogg" return ogg if ogg.exists() else None def set_tags( path: Path, title: str, track_number: int, artist: str, album: str, album_artist: str, disc_number: str, ): from mutagen.oggopus import OggOpus from mutagen.oggvorbis import OggVorbis audio = None if path.suffix.lower() == ".opus": audio = OggOpus(path) else: try: audio = OggOpus(path) except Exception: audio = OggVorbis(path) if audio.tags is None: audio.add_tags() audio["TITLE"] = [title] audio["ARTIST"] = [artist] audio["ALBUM"] = [album] audio["ALBUMARTIST"] = [album_artist] audio["DISCNUMBER"] = [disc_number] audio["TRACKNUMBER"] = [str(track_number)] audio["TRACKTOTAL"] = [""] audio["COMPILATION"] = ["0"] audio.save() def main(): config = get_config() config["outdir"].mkdir(exist_ok=True) print("Fetching playlist order...") vids = get_playlist_entries(config["playlist_url"]) total = len(vids) if total == 0: print("No entries found.") return print("Probing titles (parallel)...") with ThreadPoolExecutor( max_workers=config["download_workers"] ) as ex: futs = { ex.submit(get_title, vid): i for i, vid in enumerate(vids, start=1) } titles = [None] * total for fut in as_completed(futs): i = futs[fut] try: titles[i - 1] = fut.result() except Exception as e: titles[i - 1] = f"track_{i}" print(f"Title probe failed for index {i}: {e}") cleaned = [ clean_title(t or f"track_{i}") for i, t in enumerate(titles, start=1) ] print("Downloading (parallel)...") results = [None] * total def dl_task(i, vid, ct): print(f"[{i}/{total}] {ct}") try: return download_one( i, vid, ct, config["outdir"], config["fragments"], ) except subprocess.CalledProcessError as e: print(f"Download failed [{i}]: {e}") return None with ThreadPoolExecutor( max_workers=config["download_workers"] ) as ex: futs = { ex.submit(dl_task, i, v, c): i for i, (v, c) in enumerate(zip(vids, cleaned), start=1) } for fut in as_completed(futs): i = futs[fut] results[i - 1] = fut.result() print("Tagging (parallel, in-place, no ffmpeg)...") def tag_task(i, p, ct): if p is None or not Path(p).exists(): return track_num = config["start_track"] + (i - 1) try: set_tags( Path(p), ct, track_num, config["artist"], config["album"], config["album_artist"], config["disc"], ) except Exception as e: print(f"Tagging failed [{i}]: {e}") Parallel(n_jobs=config["tag_workers"], prefer="threads")( delayed(tag_task)(i, p, c) for i, (p, c) in enumerate(zip(results, cleaned), start=1) ) print(f"\nāœ“ Done. Files in: {config['outdir']}") if __name__ == "__main__": main()