303 lines
No EOL
7.9 KiB
Python
303 lines
No EOL
7.9 KiB
Python
#!/usr/bin/env python3
|
|
import re
|
|
import json
|
|
import subprocess
|
|
from pathlib import Path
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from joblib import Parallel, delayed
|
|
|
|
|
|
def prompt(message: str, default=None, input_type=str):
|
|
"""Prompt user with optional default value."""
|
|
if default is not None:
|
|
msg = f"{message} [{default}]: "
|
|
else:
|
|
msg = f"{message}: "
|
|
|
|
while True:
|
|
try:
|
|
value = input(msg).strip()
|
|
if not value:
|
|
if default is not None:
|
|
return input_type(default)
|
|
print("This field is required.")
|
|
continue
|
|
return input_type(value)
|
|
except ValueError:
|
|
print(f"Invalid input. Please enter a valid {input_type.__name__}.")
|
|
|
|
|
|
def prompt_bool(message: str, default=True):
|
|
"""Prompt user for yes/no."""
|
|
default_str = "Y/n" if default else "y/N"
|
|
while True:
|
|
value = input(f"{message} [{default_str}]: ").strip().lower()
|
|
if not value:
|
|
return default
|
|
if value in ("y", "yes"):
|
|
return True
|
|
if value in ("n", "no"):
|
|
return False
|
|
print("Please enter 'y' or 'n'.")
|
|
|
|
|
|
def get_config():
|
|
"""Interactively gather configuration."""
|
|
print("\n" + "=" * 60)
|
|
print("YouTube Playlist Downloader")
|
|
print("=" * 60 + "\n")
|
|
|
|
# Required fields
|
|
playlist_url = prompt("Enter playlist URL")
|
|
|
|
album = prompt("Enter album name")
|
|
|
|
# Optional fields with defaults
|
|
print("\nOptional settings (press Enter for defaults):\n")
|
|
|
|
artist = prompt("Artist name", default="Various Artists")
|
|
|
|
album_artist = prompt("Album artist", default=artist)
|
|
|
|
outdir = Path(prompt("Output directory", default="downloads"))
|
|
|
|
disc = prompt("Disc number", default="1")
|
|
|
|
start_track = prompt("Starting track number", default="1", input_type=int)
|
|
|
|
# Advanced options
|
|
print("\nAdvanced options (press Enter for defaults):\n")
|
|
|
|
download_workers = prompt(
|
|
"Max concurrent downloads", default="6", input_type=int
|
|
)
|
|
|
|
tag_workers = prompt(
|
|
"Max concurrent tagging jobs", default="8", input_type=int
|
|
)
|
|
|
|
fragments = prompt(
|
|
"Concurrent HLS/DASH fragments per video",
|
|
default="8",
|
|
input_type=int,
|
|
)
|
|
|
|
# Confirmation
|
|
print("\n" + "=" * 60)
|
|
print("Configuration Summary:")
|
|
print("=" * 60)
|
|
print(f"Playlist URL: {playlist_url}")
|
|
print(f"Album: {album}")
|
|
print(f"Artist: {artist}")
|
|
print(f"Album Artist: {album_artist}")
|
|
print(f"Output Directory: {outdir}")
|
|
print(f"Disc Number: {disc}")
|
|
print(f"Starting Track: {start_track}")
|
|
print(f"Download Workers: {download_workers}")
|
|
print(f"Tag Workers: {tag_workers}")
|
|
print(f"Fragment Concurrency: {fragments}")
|
|
print("=" * 60 + "\n")
|
|
|
|
proceed = prompt_bool("Proceed with these settings?", default=True)
|
|
if not proceed:
|
|
print("Cancelled.")
|
|
exit(0)
|
|
|
|
return {
|
|
"playlist_url": playlist_url,
|
|
"album": album,
|
|
"artist": artist,
|
|
"album_artist": album_artist,
|
|
"outdir": outdir,
|
|
"disc": disc,
|
|
"start_track": start_track,
|
|
"download_workers": download_workers,
|
|
"tag_workers": tag_workers,
|
|
"fragments": fragments,
|
|
}
|
|
|
|
|
|
def clean_title(title: str) -> str:
|
|
t = re.sub(r"(?i)^\s*Bongo\s+Cat\s*-\s*", "", title)
|
|
t = re.sub(r"\s*\(Cover Version\)\s*🎧\s*$", "", t)
|
|
return t.strip().strip(".")
|
|
|
|
|
|
def run_json(cmd):
|
|
p = subprocess.run(cmd, check=True, capture_output=True, text=True)
|
|
return json.loads(p.stdout)
|
|
|
|
|
|
def get_playlist_entries(url: str):
|
|
data = run_json(["yt-dlp", "-J", "--flat-playlist", url])
|
|
entries = data.get("entries", [])
|
|
return [e["id"] for e in entries]
|
|
|
|
|
|
def get_title(video_id: str) -> str:
|
|
meta = run_json(
|
|
["yt-dlp", "-J", f"https://www.youtube.com/watch?v={video_id}"]
|
|
)
|
|
return meta.get("title", video_id)
|
|
|
|
|
|
def download_one(
|
|
idx: int,
|
|
vid: str,
|
|
cleaned_title: str,
|
|
outdir: Path,
|
|
concurrent_fragments: int,
|
|
):
|
|
out_base = f"{idx:03d} - {cleaned_title}"
|
|
out_tmpl = str(outdir / f"{out_base}.%(ext)s")
|
|
url = f"https://www.youtube.com/watch?v={vid}"
|
|
cmd = [
|
|
"yt-dlp",
|
|
url,
|
|
"--extract-audio",
|
|
"--audio-format",
|
|
"opus",
|
|
"--audio-quality",
|
|
"0",
|
|
"--embed-thumbnail",
|
|
"--embed-metadata",
|
|
"--add-metadata",
|
|
"--concurrent-fragments",
|
|
str(concurrent_fragments),
|
|
"-o",
|
|
out_tmpl,
|
|
]
|
|
subprocess.run(cmd, check=True)
|
|
opus = outdir / f"{out_base}.opus"
|
|
if opus.exists():
|
|
return opus
|
|
ogg = outdir / f"{out_base}.ogg"
|
|
return ogg if ogg.exists() else None
|
|
|
|
|
|
def set_tags(
|
|
path: Path,
|
|
title: str,
|
|
track_number: int,
|
|
artist: str,
|
|
album: str,
|
|
album_artist: str,
|
|
disc_number: str,
|
|
):
|
|
from mutagen.oggopus import OggOpus
|
|
from mutagen.oggvorbis import OggVorbis
|
|
|
|
audio = None
|
|
if path.suffix.lower() == ".opus":
|
|
audio = OggOpus(path)
|
|
else:
|
|
try:
|
|
audio = OggOpus(path)
|
|
except Exception:
|
|
audio = OggVorbis(path)
|
|
|
|
if audio.tags is None:
|
|
audio.add_tags()
|
|
|
|
audio["TITLE"] = [title]
|
|
audio["ARTIST"] = [artist]
|
|
audio["ALBUM"] = [album]
|
|
audio["ALBUMARTIST"] = [album_artist]
|
|
audio["DISCNUMBER"] = [disc_number]
|
|
audio["TRACKNUMBER"] = [str(track_number)]
|
|
audio["TRACKTOTAL"] = [""]
|
|
audio["COMPILATION"] = ["0"]
|
|
|
|
audio.save()
|
|
|
|
|
|
def main():
|
|
config = get_config()
|
|
|
|
config["outdir"].mkdir(exist_ok=True)
|
|
|
|
print("Fetching playlist order...")
|
|
vids = get_playlist_entries(config["playlist_url"])
|
|
total = len(vids)
|
|
if total == 0:
|
|
print("No entries found.")
|
|
return
|
|
|
|
print("Probing titles (parallel)...")
|
|
with ThreadPoolExecutor(
|
|
max_workers=config["download_workers"]
|
|
) as ex:
|
|
futs = {
|
|
ex.submit(get_title, vid): i for i, vid in enumerate(vids, start=1)
|
|
}
|
|
titles = [None] * total
|
|
for fut in as_completed(futs):
|
|
i = futs[fut]
|
|
try:
|
|
titles[i - 1] = fut.result()
|
|
except Exception as e:
|
|
titles[i - 1] = f"track_{i}"
|
|
print(f"Title probe failed for index {i}: {e}")
|
|
|
|
cleaned = [
|
|
clean_title(t or f"track_{i}")
|
|
for i, t in enumerate(titles, start=1)
|
|
]
|
|
|
|
print("Downloading (parallel)...")
|
|
results = [None] * total
|
|
|
|
def dl_task(i, vid, ct):
|
|
print(f"[{i}/{total}] {ct}")
|
|
try:
|
|
return download_one(
|
|
i,
|
|
vid,
|
|
ct,
|
|
config["outdir"],
|
|
config["fragments"],
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Download failed [{i}]: {e}")
|
|
return None
|
|
|
|
with ThreadPoolExecutor(
|
|
max_workers=config["download_workers"]
|
|
) as ex:
|
|
futs = {
|
|
ex.submit(dl_task, i, v, c): i
|
|
for i, (v, c) in enumerate(zip(vids, cleaned), start=1)
|
|
}
|
|
for fut in as_completed(futs):
|
|
i = futs[fut]
|
|
results[i - 1] = fut.result()
|
|
|
|
print("Tagging (parallel, in-place, no ffmpeg)...")
|
|
|
|
def tag_task(i, p, ct):
|
|
if p is None or not Path(p).exists():
|
|
return
|
|
track_num = config["start_track"] + (i - 1)
|
|
try:
|
|
set_tags(
|
|
Path(p),
|
|
ct,
|
|
track_num,
|
|
config["artist"],
|
|
config["album"],
|
|
config["album_artist"],
|
|
config["disc"],
|
|
)
|
|
except Exception as e:
|
|
print(f"Tagging failed [{i}]: {e}")
|
|
|
|
Parallel(n_jobs=config["tag_workers"], prefer="threads")(
|
|
delayed(tag_task)(i, p, c)
|
|
for i, (p, c) in enumerate(zip(results, cleaned), start=1)
|
|
)
|
|
|
|
print(f"\n✓ Done. Files in: {config['outdir']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |