|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
from pathlib import Path |
|
|
|
|
|
from fastapi import APIRouter, File, Form, HTTPException, UploadFile |
|
|
from fastapi.responses import StreamingResponse |
|
|
|
|
|
from ..models.export import SummaryExportRequest, TranscriptExportRequest |
|
|
from ..models.summarization import SummaryRequest, SpeakerNameDetectionRequest |
|
|
from ..models.transcription import TranscriptionRequest |
|
|
from ..core.config import get_settings |
|
|
from ..services import config_service, export_service, podcast_service |
|
|
from ..services.asr_service import iter_transcription_events |
|
|
from ..services.file_service import save_upload_file, store_audio_file |
|
|
from ..services.summarization_service import iter_summary_events |
|
|
|
|
|
router = APIRouter(prefix="/api") |
|
|
|
|
|
|
|
|
@router.get("/config/models") |
|
|
def fetch_model_catalog(): |
|
|
return config_service.get_model_catalog() |
|
|
|
|
|
|
|
|
@router.post("/transcribe") |
|
|
def transcribe_audio( |
|
|
audio: UploadFile | None = File(default=None), |
|
|
options: str = Form("{}"), |
|
|
source: str | None = Form(default=None), |
|
|
): |
|
|
payload = TranscriptionRequest(**json.loads(options or "{}")) |
|
|
|
|
|
cleanup_temp = False |
|
|
if audio is not None: |
|
|
temp_path = save_upload_file(audio) |
|
|
_, audio_url = store_audio_file(temp_path) |
|
|
cleanup_temp = True |
|
|
elif source: |
|
|
filename = Path(source).name |
|
|
candidate_path = get_settings().audio_dir / filename |
|
|
if not candidate_path.exists(): |
|
|
raise HTTPException(status_code=404, detail="Audio source not found") |
|
|
temp_path = candidate_path |
|
|
audio_url = source |
|
|
else: |
|
|
raise HTTPException(status_code=400, detail="Either audio upload or source is required") |
|
|
|
|
|
def event_stream(): |
|
|
try: |
|
|
for event in iter_transcription_events(temp_path, audio_url, payload): |
|
|
yield json.dumps(event, ensure_ascii=False) + "\n" |
|
|
finally: |
|
|
if cleanup_temp: |
|
|
temp_path.unlink(missing_ok=True) |
|
|
|
|
|
return StreamingResponse(event_stream(), media_type="application/x-ndjson") |
|
|
|
|
|
|
|
|
@router.post("/summarize") |
|
|
def summarize_text(request: SummaryRequest): |
|
|
def event_stream(): |
|
|
for event in iter_summary_events(request): |
|
|
yield json.dumps(event, ensure_ascii=False) + "\n" |
|
|
|
|
|
return StreamingResponse(event_stream(), media_type="application/x-ndjson") |
|
|
|
|
|
|
|
|
@router.get("/podcast/search") |
|
|
def search_podcast(query: str): |
|
|
return podcast_service.search_series(query) |
|
|
|
|
|
|
|
|
@router.get("/podcast/episodes") |
|
|
def get_podcast_episodes(feed_url: str): |
|
|
return podcast_service.list_episodes(feed_url) |
|
|
|
|
|
|
|
|
@router.post("/podcast/download") |
|
|
def download_episode(payload: dict): |
|
|
audio_url = payload.get("audioUrl") or payload.get("audio_url") |
|
|
title = payload.get("title", "Episode") |
|
|
if not audio_url: |
|
|
raise HTTPException(status_code=400, detail="audioUrl is required") |
|
|
return podcast_service.download_episode(audio_url, title) |
|
|
|
|
|
|
|
|
@router.post("/youtube/fetch") |
|
|
def fetch_youtube_audio(payload: dict): |
|
|
url = payload.get("url") or payload.get("youtubeUrl") |
|
|
if not url: |
|
|
raise HTTPException(status_code=400, detail="url is required") |
|
|
return podcast_service.fetch_youtube_audio(url) |
|
|
|
|
|
|
|
|
@router.post("/export/transcript") |
|
|
def export_transcript(payload: TranscriptExportRequest): |
|
|
content, filename, mime_type = export_service.generate_transcript_export(payload) |
|
|
|
|
|
import urllib.parse |
|
|
encoded_filename = urllib.parse.quote(filename) |
|
|
content_disposition = f"attachment; filename*=UTF-8''{encoded_filename}" |
|
|
return StreamingResponse( |
|
|
iter([content.encode("utf-8")]), |
|
|
media_type=mime_type, |
|
|
headers={"Content-Disposition": content_disposition}, |
|
|
) |
|
|
|
|
|
|
|
|
@router.post("/export/summary") |
|
|
def export_summary(payload: SummaryExportRequest): |
|
|
content, filename, mime_type = export_service.generate_summary_export(payload) |
|
|
|
|
|
import urllib.parse |
|
|
encoded_filename = urllib.parse.quote(filename) |
|
|
content_disposition = f"attachment; filename*=UTF-8''{encoded_filename}" |
|
|
return StreamingResponse( |
|
|
iter([content.encode("utf-8")]), |
|
|
media_type=mime_type, |
|
|
headers={"Content-Disposition": content_disposition}, |
|
|
) |
|
|
|
|
|
|
|
|
@router.post("/detect-speaker-names") |
|
|
def detect_speaker_names(request: SpeakerNameDetectionRequest): |
|
|
from src.summarization import detect_speaker_names as detect_names |
|
|
result = detect_names(request.utterances, request.llm_model) |
|
|
return result |
|
|
|