Spaces:
Sleeping
Sleeping

Refactor Embed class by removing unused Host header; clean up NetFree class by deleting VideoData dataclass; remove unused import in VidSrc class; add requirements file with dependencies.
2b8e50d
import re | |
import json | |
import requests | |
from helpers.functions import Utils | |
class Embed: | |
def __init__(self): | |
self.base_url = "https://embed.su" | |
self.headers = { | |
"Referer": "https://embed.su", | |
"Origin": "https://embed.su", | |
} | |
self.videoDataList = [] | |
self.utils = Utils() | |
def build_url(self, tmdb_id, media_type, season=None, episode=None): | |
if media_type == "movie": | |
return f"{self.base_url}/embed/movie/{tmdb_id}" | |
elif media_type == "tv": | |
return f"{self.base_url}/embed/tv/{tmdb_id}/{season}/{episode}" | |
else: | |
raise ValueError("Invalid media type. Must be 'movie' or 'tv'.") | |
def extract_sctream_data(self, data): | |
data = data.split("atob(`")[1].split("`));")[0] | |
return data | |
def get_stream_ids(self, raw_data): | |
encoded_data = self.extract_sctream_data(raw_data) | |
encoded_hash = json.loads(self.utils.b64_decode(encoded_data))['hash'] | |
decoded_hash = self.utils.b64_decode(encoded_hash).split(".") | |
stream_data_list = json.loads(self.utils.b64_decode(decoded_hash[1]+decoded_hash[0])) | |
stream_id_list = [] | |
for stream_data in stream_data_list: | |
stream_id_list.append(stream_data['hash']) | |
return stream_id_list | |
def get_individual_streams(self, data): | |
resolutions = [] | |
stream_urls = [] | |
data = requests.get(data, headers=self.headers).text | |
for line in data.splitlines(): | |
match_res = re.search(r'RESOLUTION=(\d{3,4})x(\d{3,4})', line) | |
if match_res: | |
resolutions.append(f"{match_res.group(2)}") | |
match_url = re.search(r'^/api/.*.png$', line) | |
if match_url: | |
stream_urls.append(f"{self.base_url}{match_url.group(0).replace('.png', '.m3u8')}") | |
return resolutions, stream_urls | |
def get_stream(self, tmdb_id, imdb_id, media_type, title, year, season=None, episode=None): | |
final_url = self.build_url(tmdb_id=tmdb_id, media_type=media_type, season=season, episode=episode) | |
raw_data = requests.get(final_url, headers=self.headers).text | |
stream_id_list = self.get_stream_ids(raw_data) | |
for stream_id in stream_id_list: | |
stream_url = f"{self.base_url}/api/e/{stream_id}" | |
response = requests.get(stream_url, headers=self.headers) | |
if response.status_code == 200 and 'source' in response.json(): | |
data = response.json() | |
resolutions, stream_urls = self.get_individual_streams(data['source']) | |
for (resolution, stream_url) in zip(resolutions, stream_urls): | |
self.videoDataList.append( | |
{ | |
"videoSource": f"EMBED_{len(self.videoDataList)+1} ({resolution})", | |
"videoUrl": stream_url, | |
"videoHeaders": self.headers, | |
} | |
) | |
else: | |
continue | |
return self.videoDataList | |
if __name__ == "__main__": | |
auto_embed = Embed() | |
media_type = "movie" | |
tmdb_id = "822119" | |
imdb_id = "tt14513804" | |
title = "Captain America: Brave New World" | |
year = "2025" | |
season = None | |
episode = None | |
try: | |
encoded_data = auto_embed.get_streams(tmdb_id=tmdb_id, imdb_id=imdb_id, media_type=media_type, title=title, year=year, season=season, episode=episode) | |
print(json.dumps(encoded_data, indent=4)) | |
except Exception as e: | |
print(f"Error: {e}") |