Add downer.py
This commit is contained in:
commit
46f520aab2
1 changed files with 303 additions and 0 deletions
303
downer.py
Normal file
303
downer.py
Normal file
|
|
@ -0,0 +1,303 @@
|
|||
#!/usr/bin/env python3
|
||||
import re
|
||||
import json
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from joblib import Parallel, delayed
|
||||
|
||||
|
||||
def prompt(message: str, default=None, input_type=str):
|
||||
"""Prompt user with optional default value."""
|
||||
if default is not None:
|
||||
msg = f"{message} [{default}]: "
|
||||
else:
|
||||
msg = f"{message}: "
|
||||
|
||||
while True:
|
||||
try:
|
||||
value = input(msg).strip()
|
||||
if not value:
|
||||
if default is not None:
|
||||
return input_type(default)
|
||||
print("This field is required.")
|
||||
continue
|
||||
return input_type(value)
|
||||
except ValueError:
|
||||
print(f"Invalid input. Please enter a valid {input_type.__name__}.")
|
||||
|
||||
|
||||
def prompt_bool(message: str, default=True):
|
||||
"""Prompt user for yes/no."""
|
||||
default_str = "Y/n" if default else "y/N"
|
||||
while True:
|
||||
value = input(f"{message} [{default_str}]: ").strip().lower()
|
||||
if not value:
|
||||
return default
|
||||
if value in ("y", "yes"):
|
||||
return True
|
||||
if value in ("n", "no"):
|
||||
return False
|
||||
print("Please enter 'y' or 'n'.")
|
||||
|
||||
|
||||
def get_config():
|
||||
"""Interactively gather configuration."""
|
||||
print("\n" + "=" * 60)
|
||||
print("YouTube Playlist Downloader")
|
||||
print("=" * 60 + "\n")
|
||||
|
||||
# Required fields
|
||||
playlist_url = prompt("Enter playlist URL")
|
||||
|
||||
album = prompt("Enter album name")
|
||||
|
||||
# Optional fields with defaults
|
||||
print("\nOptional settings (press Enter for defaults):\n")
|
||||
|
||||
artist = prompt("Artist name", default="Various Artists")
|
||||
|
||||
album_artist = prompt("Album artist", default=artist)
|
||||
|
||||
outdir = Path(prompt("Output directory", default="downloads"))
|
||||
|
||||
disc = prompt("Disc number", default="1")
|
||||
|
||||
start_track = prompt("Starting track number", default="1", input_type=int)
|
||||
|
||||
# Advanced options
|
||||
print("\nAdvanced options (press Enter for defaults):\n")
|
||||
|
||||
download_workers = prompt(
|
||||
"Max concurrent downloads", default="6", input_type=int
|
||||
)
|
||||
|
||||
tag_workers = prompt(
|
||||
"Max concurrent tagging jobs", default="8", input_type=int
|
||||
)
|
||||
|
||||
fragments = prompt(
|
||||
"Concurrent HLS/DASH fragments per video",
|
||||
default="8",
|
||||
input_type=int,
|
||||
)
|
||||
|
||||
# Confirmation
|
||||
print("\n" + "=" * 60)
|
||||
print("Configuration Summary:")
|
||||
print("=" * 60)
|
||||
print(f"Playlist URL: {playlist_url}")
|
||||
print(f"Album: {album}")
|
||||
print(f"Artist: {artist}")
|
||||
print(f"Album Artist: {album_artist}")
|
||||
print(f"Output Directory: {outdir}")
|
||||
print(f"Disc Number: {disc}")
|
||||
print(f"Starting Track: {start_track}")
|
||||
print(f"Download Workers: {download_workers}")
|
||||
print(f"Tag Workers: {tag_workers}")
|
||||
print(f"Fragment Concurrency: {fragments}")
|
||||
print("=" * 60 + "\n")
|
||||
|
||||
proceed = prompt_bool("Proceed with these settings?", default=True)
|
||||
if not proceed:
|
||||
print("Cancelled.")
|
||||
exit(0)
|
||||
|
||||
return {
|
||||
"playlist_url": playlist_url,
|
||||
"album": album,
|
||||
"artist": artist,
|
||||
"album_artist": album_artist,
|
||||
"outdir": outdir,
|
||||
"disc": disc,
|
||||
"start_track": start_track,
|
||||
"download_workers": download_workers,
|
||||
"tag_workers": tag_workers,
|
||||
"fragments": fragments,
|
||||
}
|
||||
|
||||
|
||||
def clean_title(title: str) -> str:
|
||||
t = re.sub(r"(?i)^\s*Bongo\s+Cat\s*-\s*", "", title)
|
||||
t = re.sub(r"\s*\(Cover Version\)\s*🎧\s*$", "", t)
|
||||
return t.strip().strip(".")
|
||||
|
||||
|
||||
def run_json(cmd):
|
||||
p = subprocess.run(cmd, check=True, capture_output=True, text=True)
|
||||
return json.loads(p.stdout)
|
||||
|
||||
|
||||
def get_playlist_entries(url: str):
|
||||
data = run_json(["yt-dlp", "-J", "--flat-playlist", url])
|
||||
entries = data.get("entries", [])
|
||||
return [e["id"] for e in entries]
|
||||
|
||||
|
||||
def get_title(video_id: str) -> str:
|
||||
meta = run_json(
|
||||
["yt-dlp", "-J", f"https://www.youtube.com/watch?v={video_id}"]
|
||||
)
|
||||
return meta.get("title", video_id)
|
||||
|
||||
|
||||
def download_one(
|
||||
idx: int,
|
||||
vid: str,
|
||||
cleaned_title: str,
|
||||
outdir: Path,
|
||||
concurrent_fragments: int,
|
||||
):
|
||||
out_base = f"{idx:03d} - {cleaned_title}"
|
||||
out_tmpl = str(outdir / f"{out_base}.%(ext)s")
|
||||
url = f"https://www.youtube.com/watch?v={vid}"
|
||||
cmd = [
|
||||
"yt-dlp",
|
||||
url,
|
||||
"--extract-audio",
|
||||
"--audio-format",
|
||||
"opus",
|
||||
"--audio-quality",
|
||||
"0",
|
||||
"--embed-thumbnail",
|
||||
"--embed-metadata",
|
||||
"--add-metadata",
|
||||
"--concurrent-fragments",
|
||||
str(concurrent_fragments),
|
||||
"-o",
|
||||
out_tmpl,
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
opus = outdir / f"{out_base}.opus"
|
||||
if opus.exists():
|
||||
return opus
|
||||
ogg = outdir / f"{out_base}.ogg"
|
||||
return ogg if ogg.exists() else None
|
||||
|
||||
|
||||
def set_tags(
|
||||
path: Path,
|
||||
title: str,
|
||||
track_number: int,
|
||||
artist: str,
|
||||
album: str,
|
||||
album_artist: str,
|
||||
disc_number: str,
|
||||
):
|
||||
from mutagen.oggopus import OggOpus
|
||||
from mutagen.oggvorbis import OggVorbis
|
||||
|
||||
audio = None
|
||||
if path.suffix.lower() == ".opus":
|
||||
audio = OggOpus(path)
|
||||
else:
|
||||
try:
|
||||
audio = OggOpus(path)
|
||||
except Exception:
|
||||
audio = OggVorbis(path)
|
||||
|
||||
if audio.tags is None:
|
||||
audio.add_tags()
|
||||
|
||||
audio["TITLE"] = [title]
|
||||
audio["ARTIST"] = [artist]
|
||||
audio["ALBUM"] = [album]
|
||||
audio["ALBUMARTIST"] = [album_artist]
|
||||
audio["DISCNUMBER"] = [disc_number]
|
||||
audio["TRACKNUMBER"] = [str(track_number)]
|
||||
audio["TRACKTOTAL"] = [""]
|
||||
audio["COMPILATION"] = ["0"]
|
||||
|
||||
audio.save()
|
||||
|
||||
|
||||
def main():
|
||||
config = get_config()
|
||||
|
||||
config["outdir"].mkdir(exist_ok=True)
|
||||
|
||||
print("Fetching playlist order...")
|
||||
vids = get_playlist_entries(config["playlist_url"])
|
||||
total = len(vids)
|
||||
if total == 0:
|
||||
print("No entries found.")
|
||||
return
|
||||
|
||||
print("Probing titles (parallel)...")
|
||||
with ThreadPoolExecutor(
|
||||
max_workers=config["download_workers"]
|
||||
) as ex:
|
||||
futs = {
|
||||
ex.submit(get_title, vid): i for i, vid in enumerate(vids, start=1)
|
||||
}
|
||||
titles = [None] * total
|
||||
for fut in as_completed(futs):
|
||||
i = futs[fut]
|
||||
try:
|
||||
titles[i - 1] = fut.result()
|
||||
except Exception as e:
|
||||
titles[i - 1] = f"track_{i}"
|
||||
print(f"Title probe failed for index {i}: {e}")
|
||||
|
||||
cleaned = [
|
||||
clean_title(t or f"track_{i}")
|
||||
for i, t in enumerate(titles, start=1)
|
||||
]
|
||||
|
||||
print("Downloading (parallel)...")
|
||||
results = [None] * total
|
||||
|
||||
def dl_task(i, vid, ct):
|
||||
print(f"[{i}/{total}] {ct}")
|
||||
try:
|
||||
return download_one(
|
||||
i,
|
||||
vid,
|
||||
ct,
|
||||
config["outdir"],
|
||||
config["fragments"],
|
||||
)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Download failed [{i}]: {e}")
|
||||
return None
|
||||
|
||||
with ThreadPoolExecutor(
|
||||
max_workers=config["download_workers"]
|
||||
) as ex:
|
||||
futs = {
|
||||
ex.submit(dl_task, i, v, c): i
|
||||
for i, (v, c) in enumerate(zip(vids, cleaned), start=1)
|
||||
}
|
||||
for fut in as_completed(futs):
|
||||
i = futs[fut]
|
||||
results[i - 1] = fut.result()
|
||||
|
||||
print("Tagging (parallel, in-place, no ffmpeg)...")
|
||||
|
||||
def tag_task(i, p, ct):
|
||||
if p is None or not Path(p).exists():
|
||||
return
|
||||
track_num = config["start_track"] + (i - 1)
|
||||
try:
|
||||
set_tags(
|
||||
Path(p),
|
||||
ct,
|
||||
track_num,
|
||||
config["artist"],
|
||||
config["album"],
|
||||
config["album_artist"],
|
||||
config["disc"],
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Tagging failed [{i}]: {e}")
|
||||
|
||||
Parallel(n_jobs=config["tag_workers"], prefer="threads")(
|
||||
delayed(tag_task)(i, p, c)
|
||||
for i, (p, c) in enumerate(zip(results, cleaned), start=1)
|
||||
)
|
||||
|
||||
print(f"\n✓ Done. Files in: {config['outdir']}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue