dodd869 commited on
Commit
e59ecdb
·
verified ·
1 Parent(s): e93e14d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +33 -195
app.py CHANGED
@@ -1,206 +1,44 @@
1
- import os
2
- import json
3
- import glob
4
- import shutil
5
- import subprocess
6
- import time
7
- import re
8
- import requests
9
- from pathlib import Path
10
  import gradio as gr
11
- from ytmusicapi import YTMusic
12
- from mutagen.id3 import ID3, TIT2, TPE1, TALB, TCON, TYER, TRCK, TPE2, USLT, APIC, COMM
13
- from mutagen.mp3 import MP3
14
-
15
- TMP = Path("/tmp")
16
- WGCF_BIN = TMP / "wgcf"
17
- WARP_CONF= TMP / "wgcf-account.toml"
18
- WG_CONF = TMP / "wgcf-profile.conf"
19
- ytm = YTMusic()
20
-
21
- def safe(s):
22
- return re.sub(r'[\\/:"*?<>|]+', "", s).strip()
23
-
24
- def install_tools():
25
- import urllib.request
26
- if not WGCF_BIN.exists():
27
- url = "https://github.com/ViRb3/wgcf/releases/download/v2.2.29/wgcf_2.2.29_linux_amd64"
28
- urllib.request.urlretrieve(url, WGCF_BIN)
29
- os.chmod(WGCF_BIN, 0o755)
30
-
31
- def start_warp():
32
- import urllib.request, gzip, shutil, os, subprocess, time
33
-
34
- wgcf = TMP / "wgcf"
35
- if not wgcf.exists():
36
- url = "https://github.com/ViRb3/wgcf/releases/download/v2.2.20/wgcf_2.2.20_linux_amd64"
37
- urllib.request.urlretrieve(url, wgcf)
38
- os.chmod(wgcf, 0o755)
39
-
40
- toml = TMP / "wgcf-account.toml"
41
- conf = TMP / "wgcf-profile.conf"
42
-
43
- # Skip registration if toml already exists
44
- if not toml.exists():
45
- max_attempts = 3
46
- for attempt in range(max_attempts):
47
- try:
48
- env = os.environ.copy()
49
- env["WGCF_ENDPOINT"] = "162.159.193.1" # Alternative endpoint
50
- result = subprocess.run(
51
- [wgcf, "register", "--accept-tos"],
52
- cwd=TMP,
53
- capture_output=True,
54
- text=True,
55
- env=env
56
- )
57
- if result.returncode == 0:
58
- break
59
- else:
60
- print(f"Attempt {attempt + 1} failed: {result.stderr}")
61
- # Check if account file was created despite non-zero exit code
62
- if toml.exists():
63
- print("Account file created despite error, proceeding...")
64
- break
65
- if attempt < max_attempts - 1:
66
- time.sleep(5) # Wait before retrying
67
- except subprocess.CalledProcessError as e:
68
- print(f"Attempt {attempt + 1} failed with error: {e.stderr}")
69
- # Check if account file was created despite exception
70
- if toml.exists():
71
- print("Account file created despite exception, proceeding...")
72
- break
73
- if attempt == max_attempts - 1:
74
- if not conf.exists():
75
- raise RuntimeError("Failed to register wgcf after multiple attempts and no existing profile found")
76
- time.sleep(5)
77
- else:
78
- # If all attempts failed but profile exists, proceed
79
- if not conf.exists():
80
- raise RuntimeError("Failed to register wgcf and no existing profile found")
81
- else:
82
- # Account file already exists, update it if needed
83
- try:
84
- subprocess.run([wgcf, "update"], cwd=TMP, check=True)
85
- except subprocess.CalledProcessError as e:
86
- print(f"Warning: Failed to update existing account: {e}")
87
-
88
- # Generate profile if not exists
89
- if not conf.exists():
90
- subprocess.run([wgcf, "generate"], cwd=TMP, check=True)
91
-
92
- wggo = TMP / "wireguard-go"
93
- if not wggo.exists():
94
- gz = TMP / "wggo.gz"
95
- urllib.request.urlretrieve(
96
- "https://github.com/WireGuard/wireguard-go/releases/download/0.0.20230223/wireguard-go-linux-amd64.tar.gz", gz
97
- )
98
- with gzip.open(gz, "rb") as f_in:
99
- with open(wggo, "wb") as f_out:
100
- shutil.copyfileobj(f_in, f_out)
101
- os.chmod(wggo, 0o755)
102
- gz.unlink(missing_ok=True)
103
-
104
- # Start wireguard-go with the profile
105
- subprocess.Popen([wggo, "wgcf0"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
106
- time.sleep(3)
107
-
108
- def ytdlp_fast(url, out):
109
- subprocess.run([
110
- "yt-dlp", "-f", "bestaudio/best", "--extract-audio",
111
- "--audio-format", "mp3", "--audio-quality", "320K",
112
- "--concurrent-fragments", "16", "--external-downloader", "aria2c",
113
- "--external-downloader-args", "aria2c:-x16 -s16 -j16 -k1M",
114
- "--fragment-retries", "infinite", "--skip-unavailable-fragments",
115
- "--no-part", "--no-cache-dir", "--force-ipv4",
116
- "-o", out, url
117
- ], check=True)
118
-
119
- def write_tags(fn, title, artist, album, year, cover, lyrics, genre="", track="", album_artist="", description=""):
120
- audio = MP3(fn, ID3=ID3)
121
- if audio.tags is None:
122
- audio.add_tags()
123
- tags = audio.tags
124
- tags.add(TIT2(encoding=3, text=title))
125
- tags.add(TPE1(encoding=3, text=artist))
126
- tags.add(TALB(encoding=3, text=album))
127
- for v, cls, val in [(genre, TCON, genre), (year, TYER, year), (track, TRCK, track), (album_artist, TPE2, album_artist)]:
128
- if val:
129
- tags.add(cls(encoding=3, text=val))
130
- if lyrics:
131
- tags.add(USLT(encoding=3, lang="eng", desc="", text=lyrics))
132
- if description:
133
- tags.add(COMM(encoding=3, lang="eng", desc="desc", text=description))
134
- if cover:
135
- try:
136
- img = requests.get(cover, timeout=15).content
137
- tags.add(APIC(encoding=3, mime="image/jpeg", type=3, desc="Cover", data=img))
138
- except Exception:
139
- pass
140
- audio.save(v2_version=3)
141
-
142
- def download_yt(url):
143
- url = url.strip()
144
- if not url.startswith("http"):
145
- url = "https://" + url
146
- subprocess.run([
147
- "yt-dlp", "--skip-download", "--write-info-json", "--write-thumbnail",
148
- "-o", str(TMP / "tmp"), url
149
- ], check=True)
150
- info_file = next(TMP.glob("tmp*.json"), None)
151
- if not info_file:
152
- raise RuntimeError("info missing")
153
- info = json.loads(info_file.read_text(encoding="utf8"))
154
- vid, title, author = info["id"], info.get("title") or info.get("fulltitle") or "Unknown", info.get("artist") or info.get("uploader") or "Unknown"
155
- upload_date, extractor, description = info.get("upload_date", ""), info.get("extractor", ""), info.get("description", "")
156
- album, album_artist, genre, track, year, lyrics = "Single", "", "", "0", "", ""
157
- if "music.youtube" in url or extractor.startswith("youtube:music"):
158
- album = info.get("album") or "Single"
159
- album_artist = info.get("album_artist", "")
160
- track = str(info.get("track_number", 0))
161
- year = str(info.get("release_date") or upload_date)
162
- genre = info.get("genre", "") or info.get("category", "")
163
- try:
164
- pl = ytm.get_watch_playlist(videoId=vid)
165
- if pl.get("lyrics"):
166
- lyr = ytm.get_lyrics(pl["lyrics"])
167
- if lyr and lyr.get("lyrics"):
168
- lyrics = lyr["lyrics"]
169
- except Exception:
170
- pass
171
- else:
172
- year = upload_date
173
- base_name = safe(f"{title} [{vid}]")
174
- out_mp3 = TMP / f"{base_name}.mp3"
175
- ytdlp_fast(url, str(TMP / f"{base_name}.%(ext)s"))
176
- candidates = list(TMP.glob(f"{base_name}*.mp3"))
177
- if not candidates:
178
- raise RuntimeError("mp3 missing")
179
- shutil.move(str(candidates[0]), str(out_mp3))
180
- cover = None
181
- for ext in ("jpg", "jpeg", "webp", "png"):
182
- thumb = TMP / f"tmp.{ext}"
183
- if thumb.exists():
184
- cover = str(thumb)
185
- break
186
- write_tags(str(out_mp3), title, author, album, year, cover, lyrics, genre, track, album_artist, description)
187
- return out_mp3
188
 
189
  def gradio_fn(url):
 
 
190
  try:
191
- path = download_yt(url)
192
- return str(path), path.name
193
- except Exception as e:
194
- return None, f"Error: {e}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
195
 
196
- start_warp()
 
 
197
 
198
  with gr.Blocks(title="YT/YT-Music Downloader") as demo:
199
  gr.Markdown("### Paste YouTube / YouTube-Music link → 320 k MP3 with tags")
200
  with gr.Row():
201
- inp = gr.Textbox(label="URL", placeholder="https://www.youtube.com/watch?v=...")
202
  btn = gr.Button("Download", variant="primary")
203
- out = gr.File(label="MP3")
204
- btn.click(gradio_fn, inputs=inp, outputs=[out, out])
205
 
206
- demo.launch(server_name="0.0.0.0", server_port=7860)
 
 
 
 
 
 
 
 
 
 
1
  import gradio as gr
2
+ import requests
3
+ import io
4
+ from urllib.parse import urlparse, unquote
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
 
6
  def gradio_fn(url):
7
+ if not url:
8
+ return None
9
  try:
10
+ api_url = f"https://eolithic-daniele-nonchivalrously.ngrok-free.app/dl?url={url}"
11
+ response = requests.get(api_url)
12
+ response.raise_for_status()
13
+ data = response.json()
14
+
15
+ mp3_url = data.get('download_link')
16
+ if not mp3_url:
17
+ return None
18
+
19
+ mp3_response = requests.get(mp3_url)
20
+ mp3_response.raise_for_status()
21
+
22
+ parsed_url = urlparse(mp3_url)
23
+ filename = unquote(os.path.basename(parsed_url.path))
24
+ if not filename or '.' not in filename:
25
+ filename = "audio.mp3" # fallback
26
+
27
+ audio_bytes = io.BytesIO(mp3_response.content)
28
+ audio_bytes.seek(0)
29
+
30
+ return audio_bytes, filename
31
 
32
+ except Exception as e:
33
+ print(f"Error: {e}")
34
+ return None
35
 
36
  with gr.Blocks(title="YT/YT-Music Downloader") as demo:
37
  gr.Markdown("### Paste YouTube / YouTube-Music link → 320 k MP3 with tags")
38
  with gr.Row():
39
+ inp = gr.Textbox(label="URL", placeholder=" https://www.youtube.com/watch?v=...")
40
  btn = gr.Button("Download", variant="primary")
41
+ out = gr.File(label="MP3")
42
+ btn.click(gradio_fn, inputs=inp, outputs=out)
43
 
44
+ demo.launch()