| |
| import shutil |
| import tempfile |
| import requests |
| from fastapi import FastAPI, Query, HTTPException |
| from fastapi.responses import FileResponse |
| import os |
| from urllib.parse import urlparse |
|
|
| app = FastAPI(title="Telegram-Friendly Video Proxy") |
|
|
| @app.get("/download") |
| def download_video(url: str = Query(..., description="Public video URL")): |
| """ |
| Downloads a video from a streaming/HTML-wrapped URL and serves it |
| as a raw file Telegram can accept. |
| """ |
| |
| parsed = urlparse(url) |
| if not parsed.scheme.startswith("http"): |
| raise HTTPException(status_code=400, detail="Invalid URL") |
|
|
| |
| _, ext = os.path.splitext(parsed.path) |
| if ext.lower() not in [".mp4", ".mkv", ".webm", ".mov", ".avi"]: |
| ext = ".mp4" |
|
|
| |
| tmp_file = tempfile.mktemp(suffix=ext) |
|
|
| try: |
| |
| with requests.get(url, stream=True, timeout=60) as r: |
| r.raise_for_status() |
| with open(tmp_file, "wb") as f: |
| shutil.copyfileobj(r.raw, f) |
|
|
| |
| return FileResponse( |
| tmp_file, |
| media_type="application/octet-stream", |
| filename=f"video{ext}", |
| headers={"Content-Disposition": f'attachment; filename="video{ext}"'} |
| ) |
|
|
| except Exception as e: |
| |
| if os.path.exists(tmp_file): |
| os.remove(tmp_file) |
| raise HTTPException(status_code=500, detail=f"Failed to download file: {repr(e)}") |