Update downloader.py

This commit is contained in:
aadit 2025-10-22 01:26:44 +05:30
parent 46f520aab2
commit 1160d360b4

303
downloader.py Normal file
View file

@ -0,0 +1,303 @@
#!/usr/bin/env python3
import re
import json
import subprocess
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from joblib import Parallel, delayed
def prompt(message: str, default=None, input_type=str):
"""Prompt user with optional default value."""
if default is not None:
msg = f"{message} [{default}]: "
else:
msg = f"{message}: "
while True:
try:
value = input(msg).strip()
if not value:
if default is not None:
return input_type(default)
print("This field is required.")
continue
return input_type(value)
except ValueError:
print(f"Invalid input. Please enter a valid {input_type.__name__}.")
def prompt_bool(message: str, default=True):
"""Prompt user for yes/no."""
default_str = "Y/n" if default else "y/N"
while True:
value = input(f"{message} [{default_str}]: ").strip().lower()
if not value:
return default
if value in ("y", "yes"):
return True
if value in ("n", "no"):
return False
print("Please enter 'y' or 'n'.")
def get_config():
"""Interactively gather configuration."""
print("\n" + "=" * 60)
print("YouTube Playlist Downloader")
print("=" * 60 + "\n")
# Required fields
playlist_url = prompt("Enter playlist URL")
album = prompt("Enter album name")
# Optional fields with defaults
print("\nOptional settings (press Enter for defaults):\n")
artist = prompt("Artist name", default="Various Artists")
album_artist = prompt("Album artist", default=artist)
outdir = Path(prompt("Output directory", default="downloads"))
disc = prompt("Disc number", default="1")
start_track = prompt("Starting track number", default="1", input_type=int)
# Advanced options
print("\nAdvanced options (press Enter for defaults):\n")
download_workers = prompt(
"Max concurrent downloads", default="6", input_type=int
)
tag_workers = prompt(
"Max concurrent tagging jobs", default="8", input_type=int
)
fragments = prompt(
"Concurrent HLS/DASH fragments per video",
default="8",
input_type=int,
)
# Confirmation
print("\n" + "=" * 60)
print("Configuration Summary:")
print("=" * 60)
print(f"Playlist URL: {playlist_url}")
print(f"Album: {album}")
print(f"Artist: {artist}")
print(f"Album Artist: {album_artist}")
print(f"Output Directory: {outdir}")
print(f"Disc Number: {disc}")
print(f"Starting Track: {start_track}")
print(f"Download Workers: {download_workers}")
print(f"Tag Workers: {tag_workers}")
print(f"Fragment Concurrency: {fragments}")
print("=" * 60 + "\n")
proceed = prompt_bool("Proceed with these settings?", default=True)
if not proceed:
print("Cancelled.")
exit(0)
return {
"playlist_url": playlist_url,
"album": album,
"artist": artist,
"album_artist": album_artist,
"outdir": outdir,
"disc": disc,
"start_track": start_track,
"download_workers": download_workers,
"tag_workers": tag_workers,
"fragments": fragments,
}
def clean_title(title: str) -> str:
t = re.sub(r"(?i)^\s*Bongo\s+Cat\s*-\s*", "", title)
t = re.sub(r"\s*\(Cover Version\)\s*🎧\s*$", "", t)
return t.strip().strip(".")
def run_json(cmd):
p = subprocess.run(cmd, check=True, capture_output=True, text=True)
return json.loads(p.stdout)
def get_playlist_entries(url: str):
data = run_json(["yt-dlp", "-J", "--flat-playlist", url])
entries = data.get("entries", [])
return [e["id"] for e in entries]
def get_title(video_id: str) -> str:
meta = run_json(
["yt-dlp", "-J", f"https://www.youtube.com/watch?v={video_id}"]
)
return meta.get("title", video_id)
def download_one(
idx: int,
vid: str,
cleaned_title: str,
outdir: Path,
concurrent_fragments: int,
):
out_base = f"{idx:03d} - {cleaned_title}"
out_tmpl = str(outdir / f"{out_base}.%(ext)s")
url = f"https://www.youtube.com/watch?v={vid}"
cmd = [
"yt-dlp",
url,
"--extract-audio",
"--audio-format",
"opus",
"--audio-quality",
"0",
"--embed-thumbnail",
"--embed-metadata",
"--add-metadata",
"--concurrent-fragments",
str(concurrent_fragments),
"-o",
out_tmpl,
]
subprocess.run(cmd, check=True)
opus = outdir / f"{out_base}.opus"
if opus.exists():
return opus
ogg = outdir / f"{out_base}.ogg"
return ogg if ogg.exists() else None
def set_tags(
path: Path,
title: str,
track_number: int,
artist: str,
album: str,
album_artist: str,
disc_number: str,
):
from mutagen.oggopus import OggOpus
from mutagen.oggvorbis import OggVorbis
audio = None
if path.suffix.lower() == ".opus":
audio = OggOpus(path)
else:
try:
audio = OggOpus(path)
except Exception:
audio = OggVorbis(path)
if audio.tags is None:
audio.add_tags()
audio["TITLE"] = [title]
audio["ARTIST"] = [artist]
audio["ALBUM"] = [album]
audio["ALBUMARTIST"] = [album_artist]
audio["DISCNUMBER"] = [disc_number]
audio["TRACKNUMBER"] = [str(track_number)]
audio["TRACKTOTAL"] = [""]
audio["COMPILATION"] = ["0"]
audio.save()
def main():
config = get_config()
config["outdir"].mkdir(exist_ok=True)
print("Fetching playlist order...")
vids = get_playlist_entries(config["playlist_url"])
total = len(vids)
if total == 0:
print("No entries found.")
return
print("Probing titles (parallel)...")
with ThreadPoolExecutor(
max_workers=config["download_workers"]
) as ex:
futs = {
ex.submit(get_title, vid): i for i, vid in enumerate(vids, start=1)
}
titles = [None] * total
for fut in as_completed(futs):
i = futs[fut]
try:
titles[i - 1] = fut.result()
except Exception as e:
titles[i - 1] = f"track_{i}"
print(f"Title probe failed for index {i}: {e}")
cleaned = [
clean_title(t or f"track_{i}")
for i, t in enumerate(titles, start=1)
]
print("Downloading (parallel)...")
results = [None] * total
def dl_task(i, vid, ct):
print(f"[{i}/{total}] {ct}")
try:
return download_one(
i,
vid,
ct,
config["outdir"],
config["fragments"],
)
except subprocess.CalledProcessError as e:
print(f"Download failed [{i}]: {e}")
return None
with ThreadPoolExecutor(
max_workers=config["download_workers"]
) as ex:
futs = {
ex.submit(dl_task, i, v, c): i
for i, (v, c) in enumerate(zip(vids, cleaned), start=1)
}
for fut in as_completed(futs):
i = futs[fut]
results[i - 1] = fut.result()
print("Tagging (parallel, in-place, no ffmpeg)...")
def tag_task(i, p, ct):
if p is None or not Path(p).exists():
return
track_num = config["start_track"] + (i - 1)
try:
set_tags(
Path(p),
ct,
track_num,
config["artist"],
config["album"],
config["album_artist"],
config["disc"],
)
except Exception as e:
print(f"Tagging failed [{i}]: {e}")
Parallel(n_jobs=config["tag_workers"], prefer="threads")(
delayed(tag_task)(i, p, c)
for i, (p, c) in enumerate(zip(results, cleaned), start=1)
)
print(f"\n✓ Done. Files in: {config['outdir']}")
if __name__ == "__main__":
main()