vid-test / app.py
sdafd's picture
Update app.py
87cb710 verified
import os
import subprocess
import shlex
import tempfile
import uuid
from typing import List
import shutil # Import shutil for rmtree
import logging
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from starlette.background import BackgroundTask # Import BackgroundTask
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Define the core ffmpeg function (remains the same) ---
def run_ffmpeg_concatenation(input_files: List[str], output_file: str, ffmpeg_executable: str = "ffmpeg"):
"""
Runs the FFmpeg concatenation process.
Returns (success: bool, message: str, stderr: str)
"""
if not input_files:
return False, "Error: Cannot concatenate, file list is empty.", ""
logger.info(f"Starting re-encode concatenation of {len(input_files)} videos into {output_file}...")
logger.info("This will re-encode the video and may take some time.")
input_args = []
for video_file in input_files:
input_args.extend(['-i', video_file])
filter_parts = []
for i in range(len(input_files)):
filter_parts.append(f"[{i}:v]")
filter_parts.append(f"[{i}:a]")
filter_string = "".join(filter_parts) + f"concat=n={len(input_files)}:v=1:a=1[outv][outa]"
command = [
ffmpeg_executable,
*input_args,
'-filter_complex', filter_string,
'-map', '[outv]',
'-map', '[outa]',
'-vsync', 'vfr',
'-movflags', '+faststart',
'-y', # Overwrite output without asking
output_file
]
logger.info("\nRunning FFmpeg command:")
try:
cmd_str = shlex.join(command)
logger.info(cmd_str)
except AttributeError: # shlex.join is Python 3.8+
cmd_str = ' '.join(f'"{arg}"' if ' ' in arg else arg for arg in command)
logger.info(cmd_str)
logger.info("\n--- FFmpeg Execution Start ---")
stderr_output = ""
try:
process = subprocess.run(
command,
check=True, # Raises CalledProcessError on non-zero exit
capture_output=True,
text=True,
encoding='utf-8',
errors='replace' # Handle potential encoding errors in ffmpeg output
)
stderr_output = process.stderr
logger.info("--- FFmpeg Execution End ---")
logger.info("\nSTDERR (Progress/Info):\n" + stderr_output)
msg = f"Successfully concatenated videos into {os.path.basename(output_file)}"
logger.info(msg)
return True, msg, stderr_output # Return success, message, and stderr
except subprocess.CalledProcessError as e:
stderr_output = e.stderr + "\n" + e.stdout # Combine stderr/stdout on error
logger.error("--- FFmpeg Execution End ---")
logger.error(f"\nError during FFmpeg execution (return code {e.returncode}):")
logger.error("\nSTDERR/STDOUT:\n" + stderr_output)
return False, f"FFmpeg failed with return code {e.returncode}.", stderr_output
except FileNotFoundError:
err_msg = f"Error: '{ffmpeg_executable}' command not found on the server."
logger.error(err_msg)
return False, err_msg, ""
except Exception as e:
err_msg = f"An unexpected server error occurred during ffmpeg processing: {e}"
logger.exception(err_msg) # Log full traceback
return False, err_msg, stderr_output # Include any captured stderr
# --- Cleanup Function ---
def cleanup_temp_dir(temp_dir: str):
"""Removes a temporary directory."""
try:
logger.info(f"Cleaning up temporary directory: {temp_dir}")
shutil.rmtree(temp_dir)
logger.info(f"Successfully cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.error(f"Error cleaning up temporary directory {temp_dir}: {e}")
# --- FastAPI App Definition ---
app = FastAPI()
@app.post("/concatenate/")
async def concatenate_videos_api(
files: List[UploadFile] = File(..., description="List of video files to concatenate"),
output_filename: str = Form("concatenated_video.mp4", description="Desired output filename (e.g., final.mp4)")
):
"""
API endpoint to concatenate uploaded video files using FFmpeg re-encoding.
Cleans up temporary files after response is sent.
"""
if not files:
raise HTTPException(status_code=400, detail="No files were uploaded.")
if not output_filename.lower().endswith(('.mp4', '.mov', '.avi', '.mkv')): # Basic validation
raise HTTPException(status_code=400, detail="Output filename must have a common video extension (mp4, mov, avi, mkv).")
logger.info(f"Received {len(files)} files for concatenation. Output name: {output_filename}")
# Create a temporary directory manually
temp_dir = tempfile.mkdtemp()
logger.info(f"Created temporary directory: {temp_dir}")
input_file_paths = []
original_filenames = []
try:
# Save uploaded files to the temporary directory
for uploaded_file in files:
original_filenames.append(uploaded_file.filename)
_, ext = os.path.splitext(uploaded_file.filename)
# Ensure unique name within the temp dir
temp_input_path = os.path.join(temp_dir, f"{uuid.uuid4()}{ext}")
logger.info(f"Saving uploaded file '{uploaded_file.filename}' to '{temp_input_path}'")
try:
with open(temp_input_path, "wb") as buffer:
buffer.write(await uploaded_file.read())
input_file_paths.append(temp_input_path)
finally:
await uploaded_file.close() # Ensure file handle is closed
logger.info(f"Saved files: {original_filenames}")
# Define the output path within the temporary directory
temp_output_path = os.path.join(temp_dir, output_filename)
# Run the FFmpeg concatenation logic
success, message, ffmpeg_stderr = run_ffmpeg_concatenation(input_file_paths, temp_output_path)
if success:
logger.info(f"Concatenation successful. Preparing file response for: {temp_output_path}")
# Return the concatenated file with a background task for cleanup
return FileResponse(
path=temp_output_path,
filename=output_filename, # Send back with the desired name
media_type='video/mp4', # Adjust if needed
background=BackgroundTask(cleanup_temp_dir, temp_dir) # Cleanup AFTER sending
)
else:
logger.error(f"Concatenation failed: {message}")
# Explicitly clean up now since we are not returning a FileResponse
cleanup_temp_dir(temp_dir)
# Return a JSON error response
return JSONResponse(
status_code=500,
content={
"detail": f"Video concatenation failed: {message}",
"ffmpeg_stderr": ffmpeg_stderr,
"input_files": original_filenames
}
)
except Exception as e:
logger.exception("An unexpected error occurred in the API endpoint.")
# Explicitly clean up in case of other errors before returning FileResponse
cleanup_temp_dir(temp_dir)
raise HTTPException(status_code=500, detail=f"Internal server error: {e}")
@app.get("/")
async def read_root():
return {"message": "Video Concatenation API is running. Use the /concatenate/ endpoint (POST) to process videos."}