VoxSum / src /server /services /file_service.py
Luigi's picture
re-implement in FastAPI
ba4a241
from __future__ import annotations
import shutil
import uuid
from pathlib import Path
from typing import Tuple
from fastapi import UploadFile
from ..core.config import get_settings
settings = get_settings()
def cleanup_old_audio_files(max_files: int | None = None) -> None:
"""Remove old audio files from the static directory to save space."""
max_files = max_files or settings.max_audio_files
audio_dir = settings.audio_dir
audio_dir.mkdir(parents=True, exist_ok=True)
files = sorted(audio_dir.glob("*") , key=lambda f: f.stat().st_mtime if f.exists() else 0)
if len(files) <= max_files:
return
for old_file in files[:-max_files]:
try:
old_file.unlink()
except OSError:
continue
def save_upload_file(upload: UploadFile) -> Path:
"""Persist an UploadFile to the temporary directory and return its path."""
tmp_dir = settings.tmp_dir
tmp_dir.mkdir(parents=True, exist_ok=True)
suffix = Path(upload.filename or "audio").suffix or ".mp3"
temp_path = tmp_dir / f"upload_{uuid.uuid4().hex}{suffix}"
with temp_path.open("wb") as buffer:
shutil.copyfileobj(upload.file, buffer)
return temp_path
def store_audio_file(audio_path: Path, prefix: str | None = None) -> Tuple[Path, str]:
"""Copy an audio file to the public static folder and return the new path and URL."""
cleanup_old_audio_files()
prefix = prefix or "audio"
suffix = audio_path.suffix or ".mp3"
dest_filename = f"{prefix}_{uuid.uuid4().hex}{suffix}"
dest_path = settings.audio_dir / dest_filename
shutil.copy2(audio_path, dest_path)
url = f"/media/{dest_filename}"
return dest_path, url