Spaces:
Sleeping
Sleeping
import gradio as gr | |
from internetarchive import search_items, get_item | |
import requests | |
import time | |
import subprocess | |
import json | |
import re | |
from bs4 import BeautifulSoup | |
from requests.exceptions import ReadTimeout | |
# --- News-station filter --- | |
NEWS_FILTER = [ | |
r"\bcnn\b", r"\bfox\b", r"\bmsnbc\b", r"\bbbc\b", r"\breuters\b", | |
r"\bal jazeera\b", r"\bbloomberg\b", r"\bsky news\b", r"\bcnbc\b", | |
r"\babc news\b", r"\bnbc\b", r"\bcbs\b", r"\bpbs\b", r"\bnewsweek\b", | |
r"\bthe guardian\b", r"\bvice news\b", r"\bpolitico\b", r"\bwashington post\b", | |
r"\bnew york times\b", r"\bforbes\b", r"\btime\b", r"\busa today\b" | |
] | |
FILTER_REGEX = re.compile("|".join(f"({pat})" for pat in NEWS_FILTER), re.IGNORECASE) | |
# --- VirusTotal helper functions --- | |
def scan_url_vt(url, api_key): | |
headers = {"x-apikey": api_key} | |
resp = requests.post( | |
"https://www.virustotal.com/api/v3/urls", headers=headers, data={"url": url} | |
) | |
resp.raise_for_status() | |
analysis_id = resp.json()["data"]["id"] | |
while True: | |
time.sleep(5) | |
status_resp = requests.get( | |
f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers=headers | |
) | |
status_resp.raise_for_status() | |
attr = status_resp.json()["data"]["attributes"] | |
if attr.get("status") == "completed": | |
stats = attr.get("stats", {}) | |
return stats.get("malicious", 0) == 0 | |
# --- FFprobe metadata extraction --- | |
def extract_ffprobe_metadata(url_or_path): | |
cmd = [ | |
"ffprobe", "-v", "error", "-print_format", "json", | |
"-show_format", "-show_streams", | |
url_or_path | |
] | |
out = subprocess.check_output(cmd) | |
md = json.loads(out) | |
for stream in md.get("streams", []): | |
if stream.get("codec_type") == "video": | |
avg_fr = stream.get("avg_frame_rate", "") | |
if avg_fr and "/" in avg_fr: | |
num, den = avg_fr.split("/") | |
if den != "0": | |
stream["computed_fps"] = round(int(num) / int(den), 2) | |
break | |
return md | |
# --- Scrape basic page metadata --- | |
def fetch_page_metadata(url): | |
try: | |
resp = requests.get(url, timeout=5) | |
resp.raise_for_status() | |
soup = BeautifulSoup(resp.text, "html.parser") | |
meta = {"url": url, "title": soup.title.string if soup.title else None} | |
for tag in soup.find_all("meta"): | |
prop = tag.get("property") or tag.get("name") | |
if prop and prop.startswith(("og:", "twitter:")): | |
meta[prop] = tag.get("content") | |
return meta | |
except Exception as e: | |
return {"url": url, "error": str(e)} | |
# --- Core search & scan logic --- | |
def fetch_clean_videos(keywords, api_key, scan_enabled): | |
# build IA query | |
query = " OR ".join(kw.strip().replace(" ", "+") for kw in keywords.split(",")) | |
ia_query = f"mediatype:(movies) AND ({query})" | |
# robust search with retries | |
max_attempts = 3 | |
for attempt in range(max_attempts): | |
try: | |
results = list(search_items(ia_query))[:50] | |
break | |
except ReadTimeout: | |
if attempt < max_attempts - 1: | |
time.sleep(2 ** attempt) | |
else: | |
results = [] | |
clean_urls = [] | |
for res in results: | |
title = res.get("title", "").lower() | |
# skip known news sources | |
if FILTER_REGEX.search(title): | |
continue | |
identifier = res["identifier"] | |
try: | |
item = get_item(identifier) | |
except Exception: | |
continue | |
for f in item.files: | |
name = f.get("name", "").lower() | |
# include common video file extensions | |
if name.endswith((".mp4", ".m4v", ".mov", ".avi", ".mpg", ".mpeg", ".mkv", ".webm")): | |
url = f"https://archive.org/download/{identifier}/{f['name']}" | |
if scan_enabled and api_key: | |
try: | |
if not scan_url_vt(url, api_key): | |
continue | |
except Exception: | |
continue | |
clean_urls.append(url) | |
return clean_urls | |
# --- Gradio UI setup --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# 📼 IA Scrape – Enhanced Archive Video Explorer") | |
with gr.Row(): | |
kw_input = gr.Textbox(label="Search keywords", value="drone strike, military uav") | |
vt_key_input = gr.Textbox(label="VirusTotal API Key", type="password") | |
scan_toggle = gr.Checkbox(label="Enable VT scan", value=True) | |
ffprobe_toggle = gr.Checkbox(label="Enable FFprobe metadata", value=False) | |
run_btn = gr.Button("Search & Scan") | |
url_dropdown = gr.Dropdown(label="Clean Video URLs", choices=[], interactive=True) | |
video_player = gr.Video(label="Video Player") | |
ia_meta_json = gr.JSON(label="► Raw IA Metadata") | |
ffprobe_json = gr.JSON(label="► FFprobe Metadata") | |
origins_json = gr.JSON(label="► Source‑Origin Metadata") | |
def search_and_populate(keywords, api_key, scan_enabled): | |
urls = fetch_clean_videos(keywords, api_key, scan_enabled) | |
return gr.update(choices=urls, value=urls[0] if urls else None) | |
def update_all(selected_url, ff_on, api_key): | |
if not selected_url: | |
return None, {}, {}, [] | |
# 1) IA metadata + files | |
parts = selected_url.split("/") | |
identifier = parts[4] if len(parts) > 4 else None | |
raw_ia = {"identifier": identifier, "metadata": {}, "files": []} | |
if identifier: | |
try: | |
item = get_item(identifier) | |
raw_ia["metadata"] = item.metadata | |
raw_ia["files"] = [ | |
{"name": f.get("name"), "format": f.get("format"), "size": f.get("size"), "md5": f.get("md5"), | |
**{k: v for k, v in f.items() if k not in ("name", "format", "size", "md5")}} | |
for f in item.files | |
] | |
except Exception: | |
raw_ia["error"] = "could not fetch IA metadata" | |
# 2) FFprobe metadata if toggled | |
ff_md = {} | |
if ff_on: | |
try: | |
ff_md = extract_ffprobe_metadata(selected_url) | |
except Exception as e: | |
ff_md = {"error": str(e)} | |
# 3) Source‑origin tracing | |
origins = [] | |
source_url = None | |
meta = raw_ia.get("metadata", {}) | |
# explicit fields | |
for key, val in meta.items(): | |
if key.lower() in ("source", "originalurl"): | |
source_url = val[0] if isinstance(val, list) else val | |
break | |
# fallback identifiers | |
if not source_url: | |
for key, val in meta.items(): | |
if key.lower().startswith("external-identifier"): | |
ext = val[0] if isinstance(val, list) else val | |
if "youtube" in ext: | |
vid = ext.split(":")[-1] | |
source_url = f"https://www.youtube.com/watch?v={vid}" | |
elif "vimeo" in ext: | |
vid = ext.split(":")[-1] | |
source_url = f"https://vimeo.com/{vid}" | |
break | |
# description fallback | |
if not source_url: | |
desc = meta.get("description", "") | |
found = re.findall(r"https?://[^\s\"<]+", desc) | |
if found: | |
source_url = found[0] | |
if source_url: | |
origins.append(fetch_page_metadata(source_url)) | |
return selected_url, raw_ia, ff_md, origins | |
run_btn.click( | |
fn=search_and_populate, | |
inputs=[kw_input, vt_key_input, scan_toggle], | |
outputs=[url_dropdown] | |
) | |
url_dropdown.change( | |
fn=update_all, | |
inputs=[url_dropdown, ffprobe_toggle, vt_key_input], | |
outputs=[video_player, ia_meta_json, ffprobe_json, origins_json] | |
) | |
if __name__ == "__main__": | |
demo.launch() | |