randydev commited on
Commit
6d78308
1 Parent(s): 6965fc4

Upload 2 files

Browse files
Files changed (2) hide show
  1. driver.py +151 -0
  2. main.py +29 -0
driver.py ADDED
@@ -0,0 +1,151 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import datetime
2
+ import json
3
+ import os
4
+ import random
5
+ import re
6
+ import time
7
+ import urllib.parse
8
+ from urllib.parse import quote_plus
9
+
10
+ import httpx
11
+ import requests
12
+ from pytz import country_names, country_timezones, timezone
13
+ from selenium import webdriver
14
+ from selenium.webdriver.chrome.options import Options
15
+ from selenium.webdriver.chrome.service import Service
16
+ from selenium.webdriver.common.by import By
17
+ from selenium.webdriver.support.expected_conditions import presence_of_element_located
18
+ from selenium.webdriver.support.wait import WebDriverWait
19
+
20
+ class YoutubeDriver:
21
+ def __init__(self, search_terms: str, max_results: int = 5):
22
+ self.base_url = "https://youtube.com/results?search_query={0}"
23
+ self.search_terms = search_terms
24
+ self.max_results = max_results
25
+ self.videos = self._search()
26
+
27
+ def _search(self):
28
+ encoded_search = urllib.parse.quote_plus(self.search_terms)
29
+ response = requests.get(self.base_url.format(encoded_search)).text
30
+
31
+ while "ytInitialData" not in response:
32
+ response = requests.get(self.base_url.format(encoded_search)).text
33
+
34
+ results = self._parse_html(response)
35
+
36
+ if self.max_results is not None and len(results) > self.max_results:
37
+ return results[: self.max_results]
38
+
39
+ return results
40
+
41
+ def _parse_html(self, response: str):
42
+ results = []
43
+ start = response.index("ytInitialData") + len("ytInitialData") + 3
44
+ end = response.index("};", start) + 1
45
+ json_str = response[start:end]
46
+ data = json.loads(json_str)
47
+
48
+ videos = data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"][
49
+ "sectionListRenderer"
50
+ ]["contents"][0]["itemSectionRenderer"]["contents"]
51
+
52
+ for video in videos:
53
+ res = {}
54
+ if "videoRenderer" in video.keys():
55
+ video_data = video.get("videoRenderer", {})
56
+ _id = video_data.get("videoId", None)
57
+
58
+ res["id"] = _id
59
+ res["thumbnail"] = f"https://i.ytimg.com/vi/{_id}/hqdefault.jpg"
60
+ res["title"] = (
61
+ video_data.get("title", {}).get("runs", [[{}]])[0].get("text", None)
62
+ )
63
+ res["channel"] = (
64
+ video_data.get("longBylineText", {})
65
+ .get("runs", [[{}]])[0]
66
+ .get("text", None)
67
+ )
68
+ res["duration"] = video_data.get("lengthText", {}).get("simpleText", 0)
69
+ res["views"] = video_data.get("viewCountText", {}).get(
70
+ "simpleText", "Unknown"
71
+ )
72
+ res["publish_time"] = video_data.get("publishedTimeText", {}).get(
73
+ "simpleText", "Unknown"
74
+ )
75
+ res["url_suffix"] = (
76
+ video_data.get("navigationEndpoint", {})
77
+ .get("commandMetadata", {})
78
+ .get("webCommandMetadata", {})
79
+ .get("url", None)
80
+ )
81
+
82
+ results.append(res)
83
+ return results
84
+
85
+ def to_dict(self, clear_cache=True) -> list[dict]:
86
+ result = self.videos
87
+ if clear_cache:
88
+ self.videos = []
89
+ return result
90
+
91
+ @staticmethod
92
+ def check_url(url: str) -> tuple[bool, str]:
93
+ if "&" in url:
94
+ url = url[: url.index("&")]
95
+
96
+ if "?si=" in url:
97
+ url = url[: url.index("?si=")]
98
+
99
+ youtube_regex = (
100
+ r"(https?://)?(www\.)?"
101
+ r"(youtube|youtu|youtube-nocookie)\.(com|be)/"
102
+ r'(video|embed|shorts/|watch\?v=|v/|e/|u/\\w+/|\\w+/)?([^"&?\\s]{11})'
103
+ )
104
+ match = re.match(youtube_regex, url)
105
+ if match:
106
+ return True, match.group(6)
107
+ else:
108
+ return False, "Invalid YouTube URL!"
109
+
110
+ @staticmethod
111
+ def song_options() -> dict:
112
+ return {
113
+ "format": "bestaudio",
114
+ "addmetadata": True,
115
+ "key": "FFmpegMetadata",
116
+ "prefer_ffmpeg": True,
117
+ "geo_bypass": True,
118
+ "nocheckcertificate": True,
119
+ "postprocessors": [
120
+ {
121
+ "key": "FFmpegExtractAudio",
122
+ "preferredcodec": "mp3",
123
+ "preferredquality": "480",
124
+ }
125
+ ],
126
+ "cookiefile": "cookies.txt",
127
+ "outtmpl": "%(id)s",
128
+ "quiet": True,
129
+ "logtostderr": False,
130
+ }
131
+
132
+ @staticmethod
133
+ def video_options() -> dict:
134
+ return {
135
+ "format": "best",
136
+ "addmetadata": True,
137
+ "key": "FFmpegMetadata",
138
+ "prefer_ffmpeg": True,
139
+ "geo_bypass": True,
140
+ "nocheckcertificate": True,
141
+ "postprocessors": [
142
+ {
143
+ "key": "FFmpegVideoConvertor",
144
+ "preferedformat": "mp4",
145
+ }
146
+ ],
147
+ "cookiefile": "cookies.txt",
148
+ "outtmpl": "%(id)s.mp4",
149
+ "quiet": True,
150
+ "logtostderr": False,
151
+ }
main.py CHANGED
@@ -85,6 +85,7 @@ import logging
85
  import functions as code
86
  from fluxai import router as fluxai_router
87
  from whisper import router as whisper_router
 
88
 
89
  logging.basicConfig(level=logging.ERROR)
90
  logging.basicConfig(level=logging.INFO)
@@ -150,6 +151,34 @@ Please modify your search terms and/or try again later thank you for your unders
150
 
151
  UPLOAD_DIRECTORY = "./uploads"
152
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
  @app.post("/uploadfile/")
154
  async def upload_file(file: UploadFile = File(...)):
155
  try:
 
85
  import functions as code
86
  from fluxai import router as fluxai_router
87
  from whisper import router as whisper_router
88
+ from driver import YoutubeDriver
89
 
90
  logging.basicConfig(level=logging.ERROR)
91
  logging.basicConfig(level=logging.INFO)
 
151
 
152
  UPLOAD_DIRECTORY = "./uploads"
153
 
154
+ class YouTubeBase(BaseModel):
155
+ link: str
156
+
157
+ @app.post("/akeno/youtube-video")
158
+ async def youtube_video(payload: YouTubeBase):
159
+ status, url = YoutubeDriver.check_url(payload.link)
160
+ if not status:
161
+ return SuccessResponse(
162
+ status="False",
163
+ randydev={"error": url}
164
+ )
165
+ try:
166
+ with YoutubeDL(YoutubeDriver.video_options()) as ytdl:
167
+ yt_data = ytdl.extract_info(url, download=True)
168
+ yt_file = ytdl.prepare_filename(yt_data)
169
+ if not os.path.exists(yt_file):
170
+ return SuccessResponse(
171
+ status="False",
172
+ randydev={"error": "Video download failed"}
173
+ )
174
+ file_stream = open(yt_file, "rb")
175
+ return StreamingResponse(file_stream, media_type="video/mp4")
176
+ except Exception as e:
177
+ return SuccessResponse(
178
+ status="False",
179
+ randydev={"error": f"An error occurred: {str(e)}"}
180
+ )
181
+
182
  @app.post("/uploadfile/")
183
  async def upload_file(file: UploadFile = File(...)):
184
  try: