Spaces:
Running
Running
import datetime | |
import json | |
import random | |
import re | |
import time | |
import urllib.parse | |
from urllib.parse import quote_plus | |
import httpx | |
import requests | |
from pytz import country_names, country_timezones, timezone | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.common.by import By | |
from Hellbot.core import ENV, Config, db | |
from .formatter import format_text | |
class ChromeDriver: | |
def __init__(self) -> None: | |
self.carbon_theme = [ | |
"3024-night", | |
"a11y-dark", | |
"blackboard", | |
"base16-dark", | |
"base16-light", | |
"cobalt", | |
"duotone-dark", | |
"hopscotch", | |
"lucario", | |
"material", | |
"monokai", | |
"night-owl", | |
"nord", | |
"oceanic-next", | |
"one-light", | |
"one-dark", | |
"panda-syntax", | |
"paraiso-dark", | |
"seti", | |
"shades-of-purple", | |
"solarized+dark", | |
"solarized+light", | |
"synthwave-84", | |
"twilight", | |
"verminal", | |
"vscode", | |
"yeti", | |
"zenburn", | |
] | |
def get(self): | |
if not Config.CHROME_BIN: | |
return ( | |
None, | |
"ChromeBinaryErr: No binary path found! Install Chromium or Google Chrome.", | |
) | |
try: | |
options = Options() | |
options.binary_location = Config.CHROME_BIN | |
options.add_argument("--disable-dev-shm-usage") | |
options.add_argument("--ignore-certificate-errors") | |
options.add_argument("--disable-gpu") | |
options.add_argument("--headless=new") | |
options.add_argument("--test-type") | |
options.add_argument("--no-sandbox") | |
options.add_argument("--window-size=1920x1080") | |
options.add_experimental_option( | |
"prefs", {"download.default_directory": "./"} | |
) | |
service = Service(Config.CHROME_DRIVER) | |
driver = webdriver.Chrome(options, service) | |
return driver, None | |
except Exception as e: | |
return None, f"ChromeDriverErr: {e}" | |
def close(self, driver: webdriver.Chrome): | |
driver.close() | |
driver.quit() | |
def get_random_carbon(self) -> str: | |
url = "https://carbon.now.sh/?l=auto" | |
url += f"&t={random.choice(self.carbon_theme)}" | |
url += f"&bg=rgba%28{random.randint(1, 255)}%2C{random.randint(1, 255)}%2C{random.randint(1, 255)}%2C1%29" | |
url += "&code=" | |
return url | |
async def generate_carbon( | |
self, driver: webdriver.Chrome, code: str, is_random: bool = False | |
) -> str: | |
filename = f"{round(time.time())}" | |
BASE_URL = ( | |
self.get_random_carbon | |
if is_random | |
else "https://carbon.now.sh/?l=auto&code=" | |
) | |
driver.get(BASE_URL + format_text(quote_plus(code))) | |
driver.command_executor._commands["send_command"] = ( | |
"POST", | |
"/session/$sessionId/chromium/send_command", | |
) | |
params = { | |
"cmd": "Page.setDownloadBehavior", | |
"params": {"behavior": "allow", "downloadPath": Config.DWL_DIR}, | |
} | |
driver.execute("send_command", params) | |
driver.find_element(By.XPATH, "//button[@id='export-menu']").click() | |
driver.find_element(By.XPATH, "//input[@title='filename']").send_keys(filename) | |
driver.find_element(By.XPATH, "//button[@id='export-png']").click() | |
return f"{Config.DWL_DIR}/{filename}.png" | |
class ClimateDriver: | |
def __init__(self) -> None: | |
self.weather_api = "https://api.openweathermap.org/data/2.5/weather?lat={0}&lon={1}&appid={2}&units=metric" | |
self.location_api = ( | |
"https://api.openweathermap.org/geo/1.0/direct?q={0}&limit=1&appid={1}" | |
) | |
self.pollution_api = "http://api.openweathermap.org/data/2.5/air_pollution?lat={0}&lon={1}&appid={2}" | |
self.AQI_DICT = { | |
1: "Good", | |
2: "Fair", | |
3: "Moderate", | |
4: "Poor", | |
5: "Very Poor", | |
} | |
async def fetchLocation(self, city: str, apiKey: str): | |
response = httpx.get(self.location_api.format(city, apiKey)) | |
if response.status_code == 200: | |
data = response.json() | |
if data: | |
return data[0]["lat"], data[0]["lon"] | |
return None, None | |
async def fetchWeather(self, city: str, apiKey: str): | |
lattitude, longitude = await self.fetchLocation(city, apiKey) | |
if not lattitude and not longitude: | |
return None | |
response = httpx.get(self.weather_api.format(lattitude, longitude, apiKey)) | |
if response.status_code == 200: | |
return response.json() | |
return None | |
async def fetchAirPollution(self, city: str, apiKey: str): | |
lattitude, longitude = await self.fetchLocation(city, apiKey) | |
if not lattitude and not longitude: | |
return None | |
response = httpx.get(self.pollution_api.format(lattitude, longitude, apiKey)) | |
if response.status_code == 200: | |
return response.json() | |
return None | |
async def getTime(self, timestamp: int) -> str: | |
tz = await db.get_env(ENV.time_zone) or "Asia/Kolkata" | |
tz = timezone(tz) | |
return datetime.datetime.fromtimestamp(timestamp, tz=tz).strftime("%I:%M %p") | |
def getCountry(self, country_code: str) -> str: | |
return country_names.get(country_code, "Unknown") | |
def getCountryTimezone(self, country_code: str) -> str: | |
timezones = country_timezones.get(country_code, []) | |
if timezones: | |
return ", ".join(timezones) | |
return "Unknown" | |
def getWindData(self, windSpeed: str, windDegree: str) -> str: | |
dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"] | |
ix = round(windDegree / (360.00 / len(dirs))) | |
kmph = str(float(windSpeed) * 3.6) + " km/h" | |
return f"[{dirs[ix % len(dirs)]}] {kmph}" | |
class YoutubeDriver: | |
def __init__(self, search_terms: str, max_results: int = 5): | |
self.base_url = "https://youtube.com/results?search_query={0}" | |
self.search_terms = search_terms | |
self.max_results = max_results | |
self.videos = self._search() | |
def _search(self): | |
encoded_search = urllib.parse.quote_plus(self.search_terms) | |
response = requests.get(self.base_url.format(encoded_search)).text | |
while "ytInitialData" not in response: | |
response = requests.get(self.base_url.format(encoded_search)).text | |
results = self._parse_html(response) | |
if self.max_results is not None and len(results) > self.max_results: | |
return results[: self.max_results] | |
return results | |
def _parse_html(self, response: str): | |
results = [] | |
start = response.index("ytInitialData") + len("ytInitialData") + 3 | |
end = response.index("};", start) + 1 | |
json_str = response[start:end] | |
data = json.loads(json_str) | |
videos = data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"][ | |
"sectionListRenderer" | |
]["contents"][0]["itemSectionRenderer"]["contents"] | |
for video in videos: | |
res = {} | |
if "videoRenderer" in video.keys(): | |
video_data = video.get("videoRenderer", {}) | |
_id = video_data.get("videoId", None) | |
res["id"] = _id | |
res["thumbnail"] = f"https://i.ytimg.com/vi/{_id}/hqdefault.jpg" | |
res["title"] = ( | |
video_data.get("title", {}).get("runs", [[{}]])[0].get("text", None) | |
) | |
res["channel"] = ( | |
video_data.get("longBylineText", {}) | |
.get("runs", [[{}]])[0] | |
.get("text", None) | |
) | |
res["duration"] = video_data.get("lengthText", {}).get("simpleText", 0) | |
res["views"] = video_data.get("viewCountText", {}).get( | |
"simpleText", "Unknown" | |
) | |
res["publish_time"] = video_data.get("publishedTimeText", {}).get( | |
"simpleText", "Unknown" | |
) | |
res["url_suffix"] = ( | |
video_data.get("navigationEndpoint", {}) | |
.get("commandMetadata", {}) | |
.get("webCommandMetadata", {}) | |
.get("url", None) | |
) | |
results.append(res) | |
return results | |
def to_dict(self, clear_cache=True) -> list[dict]: | |
result = self.videos | |
if clear_cache: | |
self.videos = [] | |
return result | |
def check_url(url: str) -> tuple[bool, str]: | |
if "&" in url: | |
url = url[: url.index("&")] | |
if "?si=" in url: | |
url = url[: url.index("?si=")] | |
youtube_regex = ( | |
r"(https?://)?(www\.)?" | |
r"(youtube|youtu|youtube-nocookie)\.(com|be)/" | |
r'(video|embed|shorts/|watch\?v=|v/|e/|u/\\w+/|\\w+/)?([^"&?\\s]{11})' | |
) | |
match = re.match(youtube_regex, url) | |
if match: | |
return True, match.group(6) | |
else: | |
return False, "Invalid YouTube URL!" | |
def song_options() -> dict: | |
return { | |
"format": "bestaudio", | |
"addmetadata": True, | |
"key": "FFmpegMetadata", | |
"prefer_ffmpeg": True, | |
"geo_bypass": True, | |
"nocheckcertificate": True, | |
"postprocessors": [ | |
{ | |
"key": "FFmpegExtractAudio", | |
"preferredcodec": "mp3", | |
"preferredquality": "480", | |
} | |
], | |
"outtmpl": "%(id)s", | |
"quiet": True, | |
"logtostderr": False, | |
} | |
def video_options() -> dict: | |
return { | |
"format": "best", | |
"addmetadata": True, | |
"key": "FFmpegMetadata", | |
"prefer_ffmpeg": True, | |
"geo_bypass": True, | |
"nocheckcertificate": True, | |
"postprocessors": [ | |
{ | |
"key": "FFmpegVideoConvertor", | |
"preferedformat": "mp4", | |
} | |
], | |
"outtmpl": "%(id)s.mp4", | |
"quiet": True, | |
"logtostderr": False, | |
} | |
Driver = ChromeDriver() | |
Climate = ClimateDriver() | |