commit 46f520aab2edd872ad69b0c1ce1a54793276180b Author: aadit Date: Wed Oct 22 01:23:47 2025 +0530 Add downer.py diff --git a/downer.py b/downer.py new file mode 100644 index 0000000..eb87e53 --- /dev/null +++ b/downer.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 +import re +import json +import subprocess +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed +from joblib import Parallel, delayed + + +def prompt(message: str, default=None, input_type=str): + """Prompt user with optional default value.""" + if default is not None: + msg = f"{message} [{default}]: " + else: + msg = f"{message}: " + + while True: + try: + value = input(msg).strip() + if not value: + if default is not None: + return input_type(default) + print("This field is required.") + continue + return input_type(value) + except ValueError: + print(f"Invalid input. Please enter a valid {input_type.__name__}.") + + +def prompt_bool(message: str, default=True): + """Prompt user for yes/no.""" + default_str = "Y/n" if default else "y/N" + while True: + value = input(f"{message} [{default_str}]: ").strip().lower() + if not value: + return default + if value in ("y", "yes"): + return True + if value in ("n", "no"): + return False + print("Please enter 'y' or 'n'.") + + +def get_config(): + """Interactively gather configuration.""" + print("\n" + "=" * 60) + print("YouTube Playlist Downloader") + print("=" * 60 + "\n") + + # Required fields + playlist_url = prompt("Enter playlist URL") + + album = prompt("Enter album name") + + # Optional fields with defaults + print("\nOptional settings (press Enter for defaults):\n") + + artist = prompt("Artist name", default="Various Artists") + + album_artist = prompt("Album artist", default=artist) + + outdir = Path(prompt("Output directory", default="downloads")) + + disc = prompt("Disc number", default="1") + + start_track = prompt("Starting track number", default="1", input_type=int) + + # Advanced options + print("\nAdvanced options (press Enter for defaults):\n") + + download_workers = prompt( + "Max concurrent downloads", default="6", input_type=int + ) + + tag_workers = prompt( + "Max concurrent tagging jobs", default="8", input_type=int + ) + + fragments = prompt( + "Concurrent HLS/DASH fragments per video", + default="8", + input_type=int, + ) + + # Confirmation + print("\n" + "=" * 60) + print("Configuration Summary:") + print("=" * 60) + print(f"Playlist URL: {playlist_url}") + print(f"Album: {album}") + print(f"Artist: {artist}") + print(f"Album Artist: {album_artist}") + print(f"Output Directory: {outdir}") + print(f"Disc Number: {disc}") + print(f"Starting Track: {start_track}") + print(f"Download Workers: {download_workers}") + print(f"Tag Workers: {tag_workers}") + print(f"Fragment Concurrency: {fragments}") + print("=" * 60 + "\n") + + proceed = prompt_bool("Proceed with these settings?", default=True) + if not proceed: + print("Cancelled.") + exit(0) + + return { + "playlist_url": playlist_url, + "album": album, + "artist": artist, + "album_artist": album_artist, + "outdir": outdir, + "disc": disc, + "start_track": start_track, + "download_workers": download_workers, + "tag_workers": tag_workers, + "fragments": fragments, + } + + +def clean_title(title: str) -> str: + t = re.sub(r"(?i)^\s*Bongo\s+Cat\s*-\s*", "", title) + t = re.sub(r"\s*\(Cover Version\)\s*šŸŽ§\s*$", "", t) + return t.strip().strip(".") + + +def run_json(cmd): + p = subprocess.run(cmd, check=True, capture_output=True, text=True) + return json.loads(p.stdout) + + +def get_playlist_entries(url: str): + data = run_json(["yt-dlp", "-J", "--flat-playlist", url]) + entries = data.get("entries", []) + return [e["id"] for e in entries] + + +def get_title(video_id: str) -> str: + meta = run_json( + ["yt-dlp", "-J", f"https://www.youtube.com/watch?v={video_id}"] + ) + return meta.get("title", video_id) + + +def download_one( + idx: int, + vid: str, + cleaned_title: str, + outdir: Path, + concurrent_fragments: int, +): + out_base = f"{idx:03d} - {cleaned_title}" + out_tmpl = str(outdir / f"{out_base}.%(ext)s") + url = f"https://www.youtube.com/watch?v={vid}" + cmd = [ + "yt-dlp", + url, + "--extract-audio", + "--audio-format", + "opus", + "--audio-quality", + "0", + "--embed-thumbnail", + "--embed-metadata", + "--add-metadata", + "--concurrent-fragments", + str(concurrent_fragments), + "-o", + out_tmpl, + ] + subprocess.run(cmd, check=True) + opus = outdir / f"{out_base}.opus" + if opus.exists(): + return opus + ogg = outdir / f"{out_base}.ogg" + return ogg if ogg.exists() else None + + +def set_tags( + path: Path, + title: str, + track_number: int, + artist: str, + album: str, + album_artist: str, + disc_number: str, +): + from mutagen.oggopus import OggOpus + from mutagen.oggvorbis import OggVorbis + + audio = None + if path.suffix.lower() == ".opus": + audio = OggOpus(path) + else: + try: + audio = OggOpus(path) + except Exception: + audio = OggVorbis(path) + + if audio.tags is None: + audio.add_tags() + + audio["TITLE"] = [title] + audio["ARTIST"] = [artist] + audio["ALBUM"] = [album] + audio["ALBUMARTIST"] = [album_artist] + audio["DISCNUMBER"] = [disc_number] + audio["TRACKNUMBER"] = [str(track_number)] + audio["TRACKTOTAL"] = [""] + audio["COMPILATION"] = ["0"] + + audio.save() + + +def main(): + config = get_config() + + config["outdir"].mkdir(exist_ok=True) + + print("Fetching playlist order...") + vids = get_playlist_entries(config["playlist_url"]) + total = len(vids) + if total == 0: + print("No entries found.") + return + + print("Probing titles (parallel)...") + with ThreadPoolExecutor( + max_workers=config["download_workers"] + ) as ex: + futs = { + ex.submit(get_title, vid): i for i, vid in enumerate(vids, start=1) + } + titles = [None] * total + for fut in as_completed(futs): + i = futs[fut] + try: + titles[i - 1] = fut.result() + except Exception as e: + titles[i - 1] = f"track_{i}" + print(f"Title probe failed for index {i}: {e}") + + cleaned = [ + clean_title(t or f"track_{i}") + for i, t in enumerate(titles, start=1) + ] + + print("Downloading (parallel)...") + results = [None] * total + + def dl_task(i, vid, ct): + print(f"[{i}/{total}] {ct}") + try: + return download_one( + i, + vid, + ct, + config["outdir"], + config["fragments"], + ) + except subprocess.CalledProcessError as e: + print(f"Download failed [{i}]: {e}") + return None + + with ThreadPoolExecutor( + max_workers=config["download_workers"] + ) as ex: + futs = { + ex.submit(dl_task, i, v, c): i + for i, (v, c) in enumerate(zip(vids, cleaned), start=1) + } + for fut in as_completed(futs): + i = futs[fut] + results[i - 1] = fut.result() + + print("Tagging (parallel, in-place, no ffmpeg)...") + + def tag_task(i, p, ct): + if p is None or not Path(p).exists(): + return + track_num = config["start_track"] + (i - 1) + try: + set_tags( + Path(p), + ct, + track_num, + config["artist"], + config["album"], + config["album_artist"], + config["disc"], + ) + except Exception as e: + print(f"Tagging failed [{i}]: {e}") + + Parallel(n_jobs=config["tag_workers"], prefer="threads")( + delayed(tag_task)(i, p, c) + for i, (p, c) in enumerate(zip(results, cleaned), start=1) + ) + + print(f"\nāœ“ Done. Files in: {config['outdir']}") + + +if __name__ == "__main__": + main() \ No newline at end of file