Spaces:
Paused
Paused
| from fastapi import FastAPI, HTTPException, Response, Request | |
| from fastapi.responses import StreamingResponse | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import urlparse, parse_qs,unquote,quote | |
| import re | |
| app = FastAPI() | |
| User_Agent= "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 OPR/111.0.0.0" | |
| # Global variable to store M3U8 content | |
| m3u8_content = None | |
| def fetch_m3u8(url): | |
| """Fetch the M3U8 file from the URL and store it in memory.""" | |
| response = requests.get("http://elmythus.serv00.net:2020/" + url,headers = {"User-Agent": User_Agent, "user-agent": User_Agent}) | |
| response.raise_for_status() # Raise an error for bad responses | |
| return response.text | |
| #This is just for testing, ignore that endpoint and the func | |
| async def clone2_m3u8(d:str,token:str,expires:str,h:str = None, b:str = None, request: Request = None): | |
| try: | |
| m3u8 = f'{d}?token={token}&expires={expires}' | |
| if h: | |
| m3u8 = m3u8 + f'&h={h}' | |
| if b: | |
| m3u8 = m3u8 + f'&b={b}' | |
| forwarded_proto = request.headers.get("x-forwarded-proto") | |
| scheme = forwarded_proto if forwarded_proto else request.url.scheme | |
| instance_url = f"{scheme}://{request.url.netloc}" | |
| m3u8_content = fetch_m3u8(m3u8) | |
| modified_playlist = m3u8_content.replace("https://vixcloud.co/playlist/", f"{instance_url}/clony/") | |
| return Response(content=modified_playlist, media_type='application/vnd.apple.mpegurl') | |
| except Exception as e: | |
| print(f"Failed to fetch M3U8 file: {e}") | |
| raise HTTPException(status_code=404, detail="M3U8 content not found") | |
| ''' | |
| @app.get("/clone/manifest.m3u8") | |
| async def clone_m3u8(d: str = None): | |
| if d: | |
| try: | |
| if "%3D" in d: | |
| d = unquote(d) | |
| m3u8_content = fetch_m3u8(d) | |
| return StreamingResponse(content=m3u8_content, media_type='application/vnd.apple.mpegurl') | |
| except requests.RequestException as e: | |
| print(f"Failed to fetch M3U8 file: {e}") | |
| raise HTTPException(status_code=404, detail="M3U8 content not found") | |
| ''' | |
| async def clone2_m3u8(d:str,token:str,expires:str,h:str = None, b:str = None, request: Request = None): | |
| try: | |
| m3u8 = f'{d}?token={token}&expires={expires}' | |
| if h: | |
| m3u8 = m3u8 + f'&h={h}' | |
| if b: | |
| m3u8 = m3u8 + f'&b={b}' | |
| forwarded_proto = request.headers.get("x-forwarded-proto") | |
| scheme = forwarded_proto if forwarded_proto else request.url.scheme | |
| instance_url = f"{scheme}://{request.url.netloc}" | |
| m3u8_content = fetch_m3u8(m3u8) | |
| modified_playlist = m3u8_content.replace("https://vixcloud.co/playlist/", f"{instance_url}/clony/") | |
| return Response(content=modified_playlist, media_type='application/vnd.apple.mpegurl') | |
| except Exception as e: | |
| print(f"Failed to fetch M3U8 file: {e}") | |
| raise HTTPException(status_code=404, detail="M3U8 content not found") | |
| async def clony_m3u8(segment: str, request: Request): | |
| base_url = "https://vixcloud.co/playlist/" | |
| full_url = f"{base_url}{segment}?{request.query_params}" | |
| m3u8_content = fetch_m3u8(full_url) | |
| m3u8_content = re.sub(r"https://sc-b1-([0-2][0-9]|30).scws-content.net", "https://sc-u9-01.scws-content.net", m3u8_content) | |
| return Response(content=m3u8_content, media_type='application/vnd.apple.mpegurl') | |
| async def get_key(): | |
| response = requests.get("http://elmythus.serv00.net:2020/" + 'https://vixcloud.co/storage/enc.key', headers = {"User-Agent": User_Agent, "user-agent": User_Agent}) | |
| response_headers = { | |
| 'date': response.headers['date'], | |
| 'content-length': response.headers['content-length'], | |
| 'content-type': 'application/octet-stream', | |
| 'access-control-allow-origin': '*' | |
| } | |
| return Response( | |
| response.content, | |
| response.status_code, | |
| response_headers | |
| ) | |
| async def get_version(): | |
| #Extract the version from the main page of the site | |
| try: | |
| import json | |
| proxies = {} | |
| SC_DOMAIN = "prof" | |
| random_headers = {} | |
| random_headers['Referer'] = f"https://streamingcommunity.{SC_DOMAIN}/" | |
| random_headers['Origin'] = f"https://streamingcommunity.{SC_DOMAIN}" | |
| random_headers['User-Agent'] = User_Agent | |
| random_headers['user-agent'] = User_Agent | |
| base_url = f'https://streamingcommunity.{SC_DOMAIN}/richiedi-un-titolo' | |
| response = requests.get(base_url, headers=random_headers, allow_redirects = True, proxies = proxies) | |
| print(response) | |
| #Soup the response | |
| soup = BeautifulSoup(response.text, "lxml") | |
| # Extract version | |
| version = json.loads(soup.find("div", {"id": "app"}).get("data-page"))['version'] | |
| return version | |
| except Exception as e: | |
| print("Couldn't find the version",e) | |
| version = "65e52dcf34d64173542cd2dc6b8bb75b" | |
| return version | |
| async def get_film(): | |
| tid ="9997" | |
| proxies = {} | |
| version = await get_version() | |
| SC_DOMAIN = "prof" | |
| random_headers = {} | |
| random_headers['Referer'] = "https://streamingcommunity.buzz/" | |
| random_headers['Origin'] = "https://streamingcommunity.buzz" | |
| random_headers['x-inertia'] = "true" | |
| random_headers['x-inertia-version'] = version | |
| random_headers['User-Agent'] = User_Agent | |
| random_headers['user-agent'] = User_Agent | |
| #Access the iframe | |
| url = f'https://streamingcommunity.{SC_DOMAIN}/iframe/{tid}' | |
| response = requests.get(url, headers=random_headers, allow_redirects=True, proxies = proxies) | |
| iframe = BeautifulSoup(response.text, 'lxml') | |
| #Get the link of iframe | |
| iframe = iframe.find('iframe').get("src") | |
| #Get the ID containted in the src of iframe | |
| vixid = iframe.split("/embed/")[1].split("?")[0] | |
| parsed_url = urlparse(iframe) | |
| query_params = parse_qs(parsed_url.query) | |
| random_headers['Referer'] = f"https://streamingcommunity.{SC_DOMAIN}/" | |
| random_headers['Origin'] = f"https://streamingcommunity.{SC_DOMAIN}" | |
| random_headers['x-inertia'] = "true" | |
| random_headers['x-inertia-version'] = version | |
| random_headers['User-Agent'] = User_Agent | |
| random_headers['user-agent'] = User_Agent | |
| #Get real token and expires by looking at the page in the iframe, vixcloud/embed | |
| resp = requests.get("http://elmythus.serv00.net:2020/" + iframe, headers = random_headers, allow_redirects=True, proxies = proxies) | |
| print(resp) | |
| print(resp.text) | |
| soup= BeautifulSoup(resp.text, "lxml") | |
| script = soup.find("body").find("script").text | |
| token = re.search(r"'token':\s*'(\w+)'", script).group(1) | |
| expires = re.search(r"'expires':\s*'(\d+)'", script).group(1) | |
| quality = re.search(r'"quality":(\d+)', script).group(1) | |
| #Example url https://vixcloud.co/playlist/231315?b=1&token=bce060eec3dc9d1965a5d258dc78c964&expires=1728995040&rendition=1080p | |
| url = f'https://vixcloud.co/playlist/{vixid}.m3u8?token={token}&expires={expires}' | |
| if 'canPlayFHD' in query_params: | |
| canPlayFHD = 'h=1' | |
| url += "&h=1" | |
| if 'b' in query_params: | |
| b = 'b=1' | |
| url += "&b=1" | |
| ''' | |
| if quality == "1080": | |
| if "&h" in url: | |
| url = url | |
| elif "&b" in url and quality == "1080": | |
| url = url.replace("&b=1","&h=1") | |
| elif quality == "1080" and "&b" and "&h" not in url: | |
| url = url + "&h=1" | |
| else: | |
| url = url + f"&token={token}" | |
| ''' | |
| url720 = f'https://vixcloud.co/playlist/{vixid}.m3u8' | |
| url1 = quote(url) | |
| return url,url1 | |
| async def proxy_m3u8(segment: str, request: Request): | |
| # Base URL of the original M3U8 | |
| base_url = "https://vixcloud.co/playlist/" | |
| # Append requested segment | |
| full_url = f"{base_url}{segment}?{request.query_params}" | |
| # Fetch the requested content | |
| headers = {'User-Agent': 'Mozilla/5.0'} # Add headers if required | |
| resp = requests.get(full_url, headers=headers) | |
| # Return the content with the correct content type | |
| return Response(content=resp.content, media_type=resp.headers.get('Content-Type', 'application/vnd.apple.mpegurl')) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |