Spaces:
Sleeping
Sleeping
import json | |
import re | |
import requests | |
from bs4 import BeautifulSoup | |
from helpers.functions import Utils | |
class TwoEmbed: | |
def __init__(self): | |
self.base_url = "https://uqloads.xyz/e/" | |
self.headers = { | |
'Origin': 'https://uqloads.xyz', | |
'Referer': 'https://streamsrcs.2embed.cc/', | |
} | |
self.video_data_list = [] | |
self.utils = Utils() | |
def get_stream_id(self, imdb_id, media_type, title, year, season=None, episode=None): | |
url = "https://www.2embed.cc/embed/" | |
if media_type == "tv" and season is not None and episode is not None: | |
url += f"{imdb_id}/season-{season}-episode-{episode}" | |
else: | |
url += imdb_id | |
response = requests.get(url, headers=self.headers) | |
if response.status_code == 200: | |
soup = BeautifulSoup(response.text, 'html.parser') | |
iframe = soup.find('iframe') | |
data_src = iframe.get('data-src') if iframe else None | |
if data_src: | |
stream_id = data_src.split("?id=")[1].split("&")[0] | |
return stream_id | |
return None | |
def int_to_base(self, x, base): | |
digits = '0123456789abcdefghijklmnopqrstuvwxyz' | |
if x < 0: | |
return '-' + self.int_to_base(-x, base) | |
if x < base: | |
return digits[x] | |
return self.int_to_base(x // base, base) + digits[x % base] | |
def js_obfuscation_replacer(self, p, a, c, k): | |
for i in range(c - 1, -1, -1): | |
if k[i]: | |
base_a = self.int_to_base(i, a) | |
p = re.sub(rf'\b{base_a}\b', k[i], p) | |
return p | |
def get_pack(self, packed): | |
p = packed.split("}',")[0] + "'}" | |
k = "|" + packed.split(",'|")[1].split("'.split")[0] | |
ac = packed.split("}',")[1].split(",'|")[0] | |
a, c = map(int, ac.split(",")) | |
key_list = k.split("|") | |
return {"p": p, "a": a, "c": c, "k": key_list} | |
def extract_video_source(self, html_content): | |
soup = BeautifulSoup(html_content, 'html.parser') | |
script_tags = soup.find_all('script') | |
for script in script_tags: | |
script_text = script.string | |
if script_text and script_text.strip().startswith("eval(function(p,a,c,k,e,d)"): | |
pack = self.get_pack(script_text) | |
deobfuscated = self.js_obfuscation_replacer(pack['p'], pack['a'], pack['c'], pack['k']) | |
try: | |
json_text = deobfuscated.split("var links=")[1].split(";jwplayer")[0].strip() | |
stream_url = json.loads(json_text) | |
for value in stream_url.values(): | |
if isinstance(value, list): | |
for item in value: | |
if isinstance(item, str) and item.startswith("http"): | |
return item | |
elif isinstance(value, str) and value.startswith("http"): | |
return value | |
except Exception as e: | |
print(f"[extract_video_source] Error parsing JSON: {e}") | |
return None | |
def get_stream(self, imdb_id, tmdb_id, media_type, title, year, season=None, episode=None): | |
try: | |
stream_id = self.get_stream_id(imdb_id, media_type, title, year, season, episode) | |
if stream_id: | |
url = self.base_url + stream_id | |
response = requests.get(url, headers=self.headers) | |
source = self.extract_video_source(response.text) | |
if source and self.utils.is_accessible(source) and not any(d['videoSourceUrl'] == source for d in self.video_data_list): | |
self.video_data_list.append({ | |
"videoSource": f"2EMBED_{len(self.video_data_list) + 1}", | |
"videoUrl": source, | |
"videoHeaders": {} | |
}) | |
except Exception as e: | |
print(f"[scrape] Error: {e}") | |
return self.video_data_list | |