File size: 4,737 Bytes
bb58f2e
4b11d55
bb58f2e
4b11d55
b02246d
4b11d55
 
 
 
 
 
b02246d
65004e0
b02246d
3e6dcd1
 
 
b02246d
 
 
e1bd545
 
 
 
 
 
 
 
 
b02246d
 
 
 
 
 
 
 
4b11d55
 
 
 
 
 
 
 
 
 
 
b02246d
 
 
 
 
 
8de39ba
 
 
 
 
 
 
b02246d
 
bb58f2e
 
 
 
 
4b11d55
 
 
 
 
 
8de39ba
 
 
b02246d
 
 
8de39ba
 
 
4b11d55
 
8de39ba
4b11d55
b02246d
 
 
 
 
 
 
 
 
 
 
 
 
4b11d55
b02246d
 
 
 
 
 
bb58f2e
4b11d55
 
 
 
 
 
bb58f2e
 
 
 
 
 
 
 
4b11d55
 
 
 
 
 
 
bb58f2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b02246d
 
 
 
 
 
 
 
 
 
 
 
 
3e6dcd1
b02246d
3e6dcd1
b02246d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import os
import re
import shutil
import requests
from tqdm import tqdm
from datetime import datetime
from zoneinfo import ZoneInfo
from tzlocal import get_localzone
from mutagen.mp3 import MP3
from mutagen.flac import FLAC, Picture
from mutagen.id3 import TIT2, TPE1, TALB, USLT, APIC

EN_US = os.getenv("LANG") != "zh_CN.UTF-8"
API_163 = os.getenv("api_music163")
API_KUWO_2 = os.getenv("api_kuwo_2")
API_KUWO_1 = os.getenv("api_kuwo_1")
KEY_KUWO_1 = os.getenv("apikey_kuwo_1")
API_QQ_2 = os.getenv("api_qmusic_2")
API_QQ_1 = os.getenv("api_qmusic_1")
KEY_QQ_1 = os.getenv("apikey_qmusic_1")
if not (
    API_163
    and API_KUWO_2
    and API_KUWO_1
    and KEY_KUWO_1
    and API_QQ_2
    and API_QQ_1
    and KEY_QQ_1
):
    print("请检查环境变量!")
    exit()

TIMEOUT = None
TMP_DIR = "./__pycache__"
HEADER = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36"
}


def timestamp(naive_time: datetime = None, target_tz=ZoneInfo("Asia/Shanghai")):
    if not naive_time:
        naive_time = datetime.now()

    local_tz = get_localzone()
    aware_local = naive_time.replace(tzinfo=local_tz)
    return aware_local.astimezone(target_tz).strftime("%Y-%m-%d %H:%M:%S")


def insert_meta(audio_in: str, title, artist, album, lyric, cover: str, audio_out=""):
    if audio_out:
        shutil.copyfile(audio_in, audio_out)
    else:
        audio_out = audio_in

    if cover:
        if cover.startswith("http"):
            cover = requests.get(cover).content
        else:
            with open(cover, "rb") as file:
                cover = file.read()

    if audio_out.endswith(".mp3"):
        audio = MP3(audio_out)
        if audio.tags:
            for tag_id in list(audio.tags.keys()):
                del audio.tags[tag_id]

        else:
            audio.add_tags()

        audio.tags.add(TIT2(encoding=3, text=title))
        audio.tags.add(TPE1(encoding=3, text=artist))
        audio.tags.add(TALB(encoding=3, text=album))
        audio.tags.add(USLT(encoding=3, desc="Lyrics", text=lyric))
        if cover:
            audio.tags.add(
                APIC(
                    encoding=3,  # UTF-8 编码
                    mime="image/jpeg",  # 图片的 MIME 类型
                    type=3,  # 封面图片
                    desc="Cover",
                    data=cover,
                )
            )

        audio.save()

    elif audio_out.endswith(".flac"):
        audio = FLAC(audio_out)
        audio.tags["TITLE"] = title
        audio.tags["ARTIST"] = artist
        audio.tags["ALBUM"] = album
        audio.tags["LYRICS"] = lyric
        audio.clear_pictures()
        if cover:
            picture = Picture()
            picture.type = 3  # 封面图片
            picture.mime = "image/jpeg"
            picture.data = cover
            audio.add_picture(picture)

        audio.save()

    return audio_out


def extract_fst_url(text):
    match = re.search(r'(https?://[^\s"]+)', text)
    if match:
        return match.group(1)
    else:
        return None


def extract_fst_int(text: str):
    match = re.search(r"\d+", text)
    if match:
        return str(int(match.group()))
    else:
        return None


def get_real_url(short_url):
    return requests.get(
        short_url,
        headers=HEADER,
        allow_redirects=True,
        timeout=TIMEOUT,
    ).url


def mk_dir(dirpath: str):
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)


def rm_dir(dirpath: str):
    if os.path.exists(dirpath):
        shutil.rmtree(dirpath)


def clean_dir(dirpath: str):
    rm_dir(dirpath)
    os.makedirs(dirpath)


def download_file(
    id: int,
    url: str,
    title: str,
    artist: str,
    album: str,
    lyric: str,
    cover: str,
    cache: str,
):
    clean_dir(cache)
    fmt = url.split(".")[-1].split("?")[0]
    local_file = f"{cache}/{id}.{fmt}"
    response = requests.get(url, stream=True, verify=False)
    if response.status_code == 200:
        total_size = int(response.headers.get("Content-Length", 0)) + 1
        time_stamp = timestamp()
        progress_bar = tqdm(
            total=total_size,
            unit="B",
            unit_scale=True,
            desc=f"[{time_stamp}] {local_file}",
        )
        with open(local_file, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:  # 确保 chunk 不为空
                    f.write(chunk)  # 更新进度条
                    progress_bar.update(len(chunk))

        return insert_meta(local_file, title, artist, album, lyric, cover)

    else:
        raise ConnectionError(f"下载失败,状态码:{response.status_code}")