import os, uuid import tempfile import requests from mimetypes import guess_extension from PIL import Image from io import BytesIO import subprocess from fastapi import ( FastAPI, UploadFile, File, HTTPException, Response, Request, BackgroundTasks, ) from typing import List, Optional import asyncio, aiofiles from fastapi.responses import StreamingResponse, FileResponse app = FastAPI() def download_image(image_url: str): # Create a temporary directory temp_dir = tempfile.mkdtemp() # Get the image content response = requests.get(image_url) response.raise_for_status() # Detect the image type image = Image.open(BytesIO(response.content)) image_format = image.format.lower() image_extension = guess_extension(f"image/{image_format}") if image_extension is None: raise ValueError("Cannot detect image file type.") # Define the image file path image_path = os.path.join(temp_dir, f"image{image_extension}") # Save the image file with open(image_path, "wb") as image_file: image_file.write(response.content) # Return the image path and dimensions return image_path, image.size def make_effect( image_link: str, filename: str, frame_rate: int, duration: int, quality: int, ssaa: float, raw: bool, ): # Download the image and get its dimensions image_path, (width, height) = download_image(image_url=image_link) print(f"Image path: {image_path}, Width: {width}, Height: {height}", "#" * 100) # Define the output video file path destination = os.path.join("/tmp/Video", filename) # Create the destination directory if it doesn't exist os.makedirs(os.path.dirname(destination), exist_ok=True) # Build the depthflow command command = [ "depthflow", "input", "-i", image_path, "main", "-f", str(frame_rate), "-t", str(duration), "--width", str(width), "--height", str(height), "--quality", str(quality), "--ssaa", str(ssaa), "--benchmark", ] if raw: command.append("--raw") command.extend(["--output", destination]) # Execute the depthflow command subprocess.run(command, check=True) return destination @app.post("/generate_video") async def generate_video( background_task: BackgroundTasks, image_link: str = None, frame_rate: int = 30, duration: int = 3, quality: int = 10, ssaa: float = 0.75, raw: bool = True, ): filename = f"{str(uuid.uuid4())}.mp4" try: background_task.add_task( make_effect, image_link, filename, frame_rate, duration, quality, ssaa, raw ) return {"output_file": filename} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/download/{filename}") async def download_video(filename: str, request: Request): video_directory = "/tmp/Video" video_path = os.path.join(video_directory, filename) if not os.path.isfile(video_path): raise HTTPException(status_code=404, detail="Video not found") range_header = request.headers.get("Range", None) video_size = os.path.getsize(video_path) if range_header: start, end = range_header.strip().split("=")[1].split("-") start = int(start) end = video_size if end == "" else int(end) headers = { "Content-Range": f"bytes {start}-{end}/{video_size}", "Accept-Ranges": "bytes", } content = read_file_range(video_path, start, end) return StreamingResponse(content, media_type="video/mp4", headers=headers) return FileResponse(video_path, media_type="video/mp4") async def read_file_range(path, start, end): async with aiofiles.open(path, "rb") as file: await file.seek(start) while True: data = await file.read(1024 * 1024) # read in chunks of 1MB if not data or await file.tell() > end: break yield data