|
import streamlit as st |
|
from streamlit_player import st_player |
|
import requests |
|
from bs4 import BeautifulSoup |
|
from urllib.parse import unquote |
|
from typing import Optional, Tuple, Dict, List |
|
|
|
|
|
from utils import Utilities, VidSrcError, NoSourcesFound |
|
from sources.vidplay import VidplayExtractor |
|
from sources.filemoon import FilemoonExtractor |
|
|
|
SUPPORTED_SOURCES = ["Vidplay", "Filemoon"] |
|
|
|
class VidSrcExtractor: |
|
BASE_URL = "https://vidsrc.to" |
|
DEFAULT_KEY = "WXrUARXb1aDLaZjI" |
|
PROVIDER_URL = "https://vidplay.online" |
|
TMDB_BASE_URL = "https://www.themoviedb.org" |
|
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0" |
|
|
|
def __init__(self, **kwargs) -> None: |
|
self.source_name = kwargs.get("source_name") |
|
self.fetch_subtitles = kwargs.get("fetch_subtitles") |
|
|
|
def decrypt_source_url(self, source_url: str) -> str: |
|
encoded = Utilities.decode_base64_url_safe(source_url) |
|
decoded = Utilities.decode_data(VidSrcExtractor.DEFAULT_KEY, encoded) |
|
decoded_text = decoded.decode('utf-8') |
|
|
|
return unquote(decoded_text) |
|
|
|
def get_source_url(self, source_id: str) -> str: |
|
req = requests.get(f"{VidSrcExtractor.BASE_URL}/ajax/embed/source/{source_id}") |
|
if req.status_code != 200: |
|
error_msg = f"Couldnt fetch {req.url}, status code: {req.status_code}..." |
|
raise VidSrcError(error_msg) |
|
|
|
data = req.json() |
|
encrypted_source_url = data.get("result", {}).get("url") |
|
return self.decrypt_source_url(encrypted_source_url) |
|
|
|
def get_sources(self, data_id: str) -> Dict: |
|
req = requests.get(f"{VidSrcExtractor.BASE_URL}/ajax/embed/episode/{data_id}/sources") |
|
if req.status_code != 200: |
|
error_msg = f"Couldnt fetch {req.url}, status code: {req.status_code}..." |
|
raise VidSrcError(error_msg) |
|
|
|
data = req.json() |
|
return {video.get("title"): video.get("id") for video in data.get("result")} |
|
|
|
def get_streams(self, media_type: str, media_id: str, season: Optional[str], episode: Optional[str]) -> Tuple[Optional[List], Optional[Dict], Optional[str]]: |
|
url = f"{VidSrcExtractor.BASE_URL}/embed/{media_type}/{media_id}" |
|
if season and episode: |
|
url += f"/{season}/{episode}" |
|
|
|
print(f"[>] Requesting {url}...") |
|
req = requests.get(url) |
|
if req.status_code != 200: |
|
print(f"[VidSrcExtractor] Couldnt fetch \"{req.url}\", status code: {req.status_code}\n[VidSrcExtractor] \"{self.source_name}\" likely doesnt have the requested media...") |
|
return None, None, None |
|
|
|
soup = BeautifulSoup(req.text, "html.parser") |
|
sources_code = soup.find('a', {'data-id': True}) |
|
if not sources_code: |
|
print("[VidSrcExtractor] Could not fetch data-id, this could be due to an invalid imdb/tmdb code...") |
|
return None, None, None |
|
|
|
sources_code = sources_code.get("data-id") |
|
sources = self.get_sources(sources_code) |
|
source = sources.get(self.source_name) |
|
if not source: |
|
available_sources = ", ".join(list(sources.keys())) |
|
print(f"[VidSrcExtractor] No source found for \"{self.source_name}\"\nAvailable Sources: {available_sources}") |
|
return None, None, None |
|
|
|
source_url = self.get_source_url(source) |
|
|
|
if self.source_name == "Vidplay": |
|
print(f"[>] Fetching source for \"{self.source_name}\"...") |
|
|
|
extractor = VidplayExtractor() |
|
return extractor.resolve_source(url=source_url, fetch_subtitles=self.fetch_subtitles, provider_url=VidSrcExtractor.PROVIDER_URL) |
|
|
|
elif self.source_name == "Filemoon": |
|
print(f"[>] Fetching source for \"{self.source_name}\"...") |
|
|
|
if self.fetch_subtitles: |
|
print(f"[VidSrcExtractor] \"{self.source_name}\" doesnt provide subtitles...") |
|
|
|
extractor = FilemoonExtractor() |
|
return extractor.resolve_source(url=source_url, fetch_subtitles=self.fetch_subtitles, provider_url=VidSrcExtractor.PROVIDER_URL) |
|
|
|
else: |
|
print(f"[VidSrcExtractor] Sorry, this doesnt currently support \"{self.source_name}\" :(\n[VidSrcExtractor] (if you create an issue and ask really nicely ill maybe look into reversing it though)...") |
|
return None, None, None |
|
|
|
def query_tmdb(self, query: str) -> Dict: |
|
req = requests.get(f"{VidSrcExtractor.TMDB_BASE_URL}/search", params={'query': query.replace(" ", "+").lower()}, headers={'user-agent': VidSrcExtractor.USER_AGENT}) |
|
soup = BeautifulSoup(req.text, "html.parser") |
|
results = {} |
|
|
|
for index, data in enumerate(soup.find_all("div", {"class": "details"}), start=1): |
|
result = data.find("a", {"class": "result"}) |
|
title = result.find() |
|
|
|
if not title: |
|
continue |
|
|
|
title = title.text |
|
release_date = data.find("span", {"class": "release_date"}) |
|
release_date = release_date.text if release_date else "1 January, 1970" |
|
url = result.get("href") |
|
|
|
if not url: |
|
continue |
|
|
|
result_type, result_id = url[1:].split("/") |
|
if "-" in result_id: |
|
result_id = result_id.partition("-")[0] |
|
results.update({f"{index}. {title} ({release_date})": {"media_type": result_type, "tmdb_id": result_id}}) |
|
|
|
return results |
|
|
|
st.title("Vidsrc.to Resolver (Hugging Face Space)") |
|
|
|
|
|
source_name = st.sidebar.selectbox("Select Source", SUPPORTED_SOURCES) |
|
fetch_subtitles = st.sidebar.checkbox("Fetch Subtitles") |
|
default_subtitles = st.sidebar.text_input("Default Subtitles (Optional)") |
|
|
|
media_type = st.sidebar.radio("Media Type", ["Movie", "Tv"]) |
|
media_id = st.sidebar.text_input("IMDb/TMDb Code") |
|
season = st.sidebar.text_input("Season Number (Optional)") if media_type == "Tv" else None |
|
episode = st.sidebar.text_input("Episode Number (Optional)") if media_type == "Tv" else None |
|
|
|
|
|
query = st.sidebar.text_input("Search Media (Optional)") |
|
if query: |
|
vse = VidSrcExtractor(source_name=source_name, fetch_subtitles=fetch_subtitles) |
|
search_results = vse.query_tmdb(query) |
|
st.sidebar.write("Search Results:") |
|
for index, result in enumerate(search_results.items()): |
|
st.sidebar.write(f"{index + 1}. {result[0]}") |
|
st.sidebar.write(f" Type: {result[1]['media_type']}") |
|
st.sidebar.write(f" ID: {result[1]['tmdb_id']}") |
|
|
|
|
|
if st.button("Play"): |
|
if not media_id: |
|
st.error("Please enter an IMDb/TMDb code.") |
|
st.stop() |
|
|
|
vse = VidSrcExtractor(source_name=source_name, fetch_subtitles=fetch_subtitles) |
|
|
|
try: |
|
streams, subtitles, source_url = vse.get_streams( |
|
media_type.lower(), media_id, season, episode |
|
) |
|
|
|
if streams: |
|
stream = questionary.select("Select Stream", choices=streams).unsafe_ask() if len(streams) > 1 else streams[0] |
|
|
|
st_player(stream, height=400) |
|
|
|
else: |
|
st.error(f"Could not find sources for {source_name}.") |
|
|
|
except NoSourcesFound as e: |
|
st.error(e) |
|
except Exception as e: |
|
st.error(f"An error occurred: {e}") |