awalit / app.py
devcom33
debugging
d479573
import logging
import traceback
import os
import json
import re
import time
from fastapi import FastAPI, UploadFile, File, HTTPException, Form
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import config
from models import load_whisper, load_summarizer, load_spacy
from services import process_transcription, process_summary, create_enhanced_summary_prompt, format_summary_to_markdown
from utils import webm_to_wav, get_language_name
import google.generativeai as genai
from google.api_core import exceptions as api_core_exceptions
logger = logging.getLogger(__name__)
app = FastAPI(
title="Transcription and Summarization API",
description="API using Faster-Whisper, spaCy, and Hugging Face Transformers",
version="1.0.0",
)
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
logger.critical("GEMINI_API_KEY environment variable not set.")
else:
genai.configure(api_key=api_key)
logger.info("Application starting up - loading models...")
whisper_model = load_whisper(config)
summarizer_pipeline = load_summarizer(config)
nlp_spacy = load_spacy(config)
logger.info("Model loading complete.")
origins = ["http://localhost:8080"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
if not whisper_model:
logger.critical(
"Whisper model failed to load. Transcription endpoint will be unavailable."
)
if not summarizer_pipeline:
logger.critical(
"Summarizer pipeline failed to load. Summarization endpoint will be unavailable."
)
if not nlp_spacy:
logger.warning(
"SpaCy model failed to load. Summarization will proceed without spaCy preprocessing."
)
class TranscriptInput(BaseModel):
transcript: str
language: str
@app.get("/health")
def health():
return {
"status": "ok",
"whisper_loaded": whisper_model is not None,
"summarizer_loaded": summarizer_pipeline is not None,
"spacy_loaded": nlp_spacy is not None,
}
@app.post("/transcribe")
async def transcription(
audio_file: UploadFile = File(...),
enable_diarization: bool = Form(False)
):
if whisper_model is None:
raise HTTPException(status_code=503, detail="Transcription service unavailable.")
try:
start_time = time.time()
content_type = audio_file.content_type
content = await audio_file.read()
logger.warning(f"Received file: {audio_file.filename}, content_type: {content_type}")
if content_type in ["audio/webm", "video/webm"]:
wav_path = webm_to_wav(content)
with open(wav_path, "rb") as f:
wav_bytes = f.read()
os.remove(wav_path)
elif content_type in ["audio/wav", "audio/x-wav", "audio/vnd.wave", "application/octet-stream"] \
or audio_file.filename.endswith(".wav"):
logger.warning("[+] wav processing")
wav_bytes = content
else:
raise HTTPException(status_code=400, detail="Unsupported audio format. Use .webm or .wav")
transcript, info, diarized_segments = process_transcription(
wav_bytes,
whisper_model,
enable_diarization=enable_diarization
)
processing_time = time.time() - start_time
logger.info(f"Transcription successful. Language: {info.language}, Time: {processing_time:.2f}s")
speakers = []
if diarized_segments:
for segment in diarized_segments:
if segment["speaker"] not in speakers:
speakers.append(segment["speaker"])
response = {
"transcript": transcript,
"language": info.language,
"duration": info.duration,
}
if enable_diarization and diarized_segments:
response["speakers"] = speakers
response["segments"] = diarized_segments
return response
except HTTPException as http_exc:
raise http_exc
except ValueError as ve:
logger.error(f"Value error during transcription processing: {ve}")
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
logger.error(f"Unhandled error during transcription: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail="Internal server error during transcription.")
@app.post("/summarize")
async def summarize(input: TranscriptInput):
if not input.transcript or not input.transcript.strip():
raise HTTPException(status_code=400, detail="Transcript cannot be empty.")
try:
prompt = f"""
Summarize the following text concisely:
Transcript:
\"\"\"
{input.transcript}
\"\"\"
"""
model = genai.GenerativeModel('gemini-1.5-flash')
response = model.generate_content(prompt)
logger.info(f"Gemini /summarize response text: '{response.text}'")
return {"summary": response.text}
except api_core_exceptions.ResourceExhausted as e:
logger.error(f"Gemini API rate limit exceeded: {e}")
raise HTTPException(status_code=429, detail="API rate limit exceeded. Please wait and try again.")
except genai.types.BlockedPromptError as e:
logger.error(f"The prompt was blocked: {e}")
raise HTTPException(status_code=400, detail="The request was blocked by the content safety filter.")
except Exception as e:
logger.error(f"An unexpected error occurred during basic summarization: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/smart-summary")
def smart_summarize(input: TranscriptInput):
if summarizer_pipeline is None:
raise HTTPException(status_code=503, detail="Summarization service unavailable.")
if not input.transcript:
raise HTTPException(status_code=400, detail="Transcript cannot be empty.")
try:
summary = process_summary(input.transcript, summarizer_pipeline, nlp_spacy, config)
return {"summary": summary}
except ValueError as ve:
logger.error(f"Value error during summary processing: {ve}")
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
logger.error(f"Unhandled error during summarization: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error during summarization.")
@app.post("/enhanced-summary")
async def enhanced_summary(input: TranscriptInput):
if not input.transcript or not input.transcript.strip():
raise HTTPException(status_code=400, detail="Transcript cannot be empty.")
try:
code = input.language
logger.info(f"Detected language code: {code}")
language_name = get_language_name(code)
logger.info(f"Detected language name: {language_name}")
prompt = create_enhanced_summary_prompt(input.transcript, language_name)
model = genai.GenerativeModel('gemini-1.5-flash')
response = model.generate_content(
contents=prompt,
generation_config=genai.GenerationConfig(response_mime_type="application/json")
)
try:
cleaned_text = re.sub(r"```json\s*(.*)\s*```", r"\1", response.text, flags=re.DOTALL)
summary_json = json.loads(cleaned_text)
logger.info(f"Received JSON from Gemini: {summary_json}")
except (json.JSONDecodeError, TypeError) as e:
logger.error(f"Failed to parse LLM response as JSON: {e}\nResponse text: {response.text}")
raise HTTPException(status_code=500, detail="Failed to generate a structured summary due to an invalid model response.")
formatted_markdown = format_summary_to_markdown(summary_json, code)
logger.info(f"Formatted Markdown: {formatted_markdown}")
return {"summary": formatted_markdown}
except api_core_exceptions.ResourceExhausted as e:
logger.error(f"Gemini API rate limit exceeded: {e}")
raise HTTPException(status_code=429, detail="API rate limit exceeded. Please wait and try again.")
except genai.types.BlockedPromptError as e:
logger.error(f"The prompt was blocked: {e}")
raise HTTPException(status_code=400, detail="The request was blocked by the content safety filter.")
except Exception as e:
logger.error(f"An unexpected error occurred during summarization: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="An internal server error occurred during summarization.")