from fastapi import FastAPI, File, UploadFile, Request, HTTPException from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware import requests import time import asyncio from typing import Dict app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) HTML_CONTENT = """ Radd PRO Uploader

Radd PRO Uploader

or drag and drop file here/paste image

Allowed file types: .zip, .mp4, .txt, .mp3, all image types, .pdf
""" @app.get("/", response_class=HTMLResponse) async def index(): return HTML_CONTENT @app.post("/upload") async def handle_upload(file: UploadFile = File(...)): if not file: raise HTTPException(status_code=400, detail="No file part") if file.filename == '': raise HTTPException(status_code=400, detail="No selected file") cookies = await get_cookies() if 'csrftoken' not in cookies or 'sessionid' not in cookies: raise HTTPException(status_code=500, detail="Failed to obtain necessary cookies") upload_result = await initiate_upload(cookies, file.filename, file.content_type) if not upload_result or 'upload_url' not in upload_result: raise HTTPException(status_code=500, detail="Failed to initiate upload") file_content = await file.read() upload_success = await retry_upload(upload_result['upload_url'], file_content, file.content_type) if not upload_success: raise HTTPException(status_code=500, detail="File upload failed after multiple attempts") original_url = upload_result['serving_url'] mirrored_url = f"/rbxg/{original_url.split('/pbxt/')[1]}" return JSONResponse(content={"url": mirrored_url}) @app.get("/rbxg/{path:path}") async def handle_video_stream(path: str, request: Request): original_url = f'https://replicate.delivery/pbxt/{path}' range_header = request.headers.get('Range') headers = {'Range': range_header} if range_header else {} response = requests.get(original_url, headers=headers, stream=True) def generate(): for chunk in response.iter_content(chunk_size=8192): yield chunk headers = dict(response.headers) headers['Access-Control-Allow-Origin'] = '*' headers['Content-Disposition'] = 'inline' if response.status_code == 206: headers['Content-Range'] = response.headers.get('Content-Range') return StreamingResponse(generate(), status_code=response.status_code, headers=headers) @app.get("/embed") async def embed_video(url: str, thumbnail: str): html = f''' ''' return HTMLResponse(content=html) async def get_cookies() -> Dict[str, str]: try: response = requests.get('https://replicate.com/levelsio/neon-tokyo', headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36' }) return dict(response.cookies) except Exception as e: print(f'Error fetching the page: {e}') return {} async def initiate_upload(cookies: Dict[str, str], filename: str, content_type: str) -> Dict: url = f'https://replicate.com/api/upload/{filename}?content_type={content_type}' try: response = requests.post(url, cookies=cookies, headers={ 'X-CSRFToken': cookies.get('csrftoken'), 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36', 'Referer': 'https://replicate.com/levelsio/neon-tokyo', 'Origin': 'https://replicate.com', 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'identity', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'Sec-GPC': '1', 'Priority': 'u=1, i' }) return response.json() except Exception as e: print(f'Error initiating upload: {e}') raise async def upload_file(upload_url: str, file_content: bytes, content_type: str) -> bool: try: response = requests.put(upload_url, data=file_content, headers={'Content-Type': content_type}) return response.status_code == 200 except Exception as e: print(f'Error uploading file: {e}') return False async def retry_upload(upload_url: str, file_content: bytes, content_type: str, max_retries: int = 5, delay: int = 1) -> bool: retries = 0 while retries < max_retries: try: success = await upload_file(upload_url, file_content, content_type) if success: return True print(f"Upload attempt {retries + 1} failed. Retrying...") except Exception as e: print(f"Error during upload attempt {retries + 1}: {e}") retries += 1 await asyncio.sleep(delay) delay = min(delay * 2, 60) # Exponential backoff, capped at 60 seconds return False