Spaces:
Running
Running
import fitz | |
import io | |
from fastapi import FastAPI, File, UploadFile, Form, HTTPException | |
from fastapi.responses import JSONResponse | |
from transformers import pipeline | |
from PIL import Image | |
from io import BytesIO | |
from starlette.middleware import Middleware | |
from starlette.middleware.cors import CORSMiddleware | |
from pdf2image import convert_from_bytes | |
from pydub import AudioSegment | |
import numpy as np | |
import json | |
import torchaudio | |
import torch | |
from pydub import AudioSegment | |
import speech_recognition as sr | |
import logging | |
import asyncio | |
app = FastAPI() | |
# Set up CORS middleware | |
origins = ["*"] # or specify your list of allowed origins | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
nlp_qa = pipeline("document-question-answering", model="jinhybr/OCR-DocVQA-Donut") | |
nlp_qa_v2 = pipeline("document-question-answering", model="faisalraza/layoutlm-invoices") | |
nlp_qa_v3 = pipeline("question-answering", model="deepset/roberta-base-squad2") | |
nlp_classification = pipeline("text-classification", model="distilbert/distilbert-base-uncased-finetuned-sst-2-english") | |
nlp_classification_v2 = pipeline("text-classification", model="cardiffnlp/twitter-roberta-base-sentiment-latest") | |
nlp_speech_to_text = pipeline("automatic-speech-recognition", model="facebook/wav2vec2-base-960h") | |
nlp_sequence_classification = pipeline("zero-shot-classification", model="valhalla/distilbart-mnli-12-1") | |
description = """ | |
## Image-based Document QA | |
This API performs document question answering using a LayoutLMv2-based model. | |
### Endpoints: | |
- **POST /uploadfile/:** Upload an image file to extract text and answer provided questions. | |
- **POST /pdfQA/:** Provide a PDF file to extract text and answer provided questions. | |
""" | |
app = FastAPI(docs_url="/", description=description) | |
async def perform_document_qa( | |
file: UploadFile = File(...), | |
questions: str = Form(...), | |
): | |
try: | |
# Read the uploaded file as bytes | |
contents = await file.read() | |
# Open the image using PIL | |
image = Image.open(BytesIO(contents)) | |
# Perform document question answering for each question using LayoutLMv2-based model | |
answers_dict = {} | |
for question in questions.split(','): | |
result = nlp_qa( | |
image, | |
question.strip() | |
) | |
# Access the 'answer' key from the first item in the result list | |
answer = result[0]['answer'] | |
# Format the question as a string without extra characters | |
formatted_question = question.strip("[]") | |
answers_dict[formatted_question] = answer | |
return answers_dict | |
except Exception as e: | |
return JSONResponse(content=f"Error processing file: {str(e)}", status_code=500) | |
async def perform_document_qa( | |
file: UploadFile = File(...), | |
questions: str = Form(...), | |
): | |
try: | |
# Read the uploaded file as bytes | |
contents = await file.read() | |
# Open the image using PIL | |
image = Image.open(BytesIO(contents)) | |
# Perform document question answering for each question using LayoutLMv2-based model | |
answers_dict = {} | |
for question in questions.split(','): | |
result = nlp_qa_v2( | |
image, | |
question.strip() | |
) | |
# Access the 'answer' key from the first item in the result list | |
answer = result[0]['answer'] | |
# Format the question as a string without extra characters | |
formatted_question = question.strip("[]") | |
answers_dict[formatted_question] = answer | |
return answers_dict | |
except Exception as e: | |
return JSONResponse(content=f"Error processing file: {str(e)}", status_code=500) | |
async def perform_document_qa( | |
context: str = Form(...), | |
question: str = Form(...), | |
): | |
try: | |
QA_input = { | |
'question': question, | |
'context': context | |
} | |
res = nlp_qa_v3(QA_input) | |
return res['answer'] | |
except Exception as e: | |
return JSONResponse(content=f"Error processing file: {str(e)}", status_code=500) | |
async def classify_text(text: str = Form(...)): | |
try: | |
# Perform text classification using the pipeline | |
result = nlp_classification(text) | |
# Return the classification result | |
return result | |
except Exception as e: | |
return JSONResponse(content=f"Error classifying text: {str(e)}", status_code=500) | |
async def test_classify_text(text: str = Form(...)): | |
try: | |
# Perform text classification using the updated model that returns positive, neutral, or negative | |
result = nlp_classification_v2(text) | |
# Print the raw label for debugging purposes (can be removed later) | |
raw_label = result[0]['label'] | |
print(f"Raw label from model: {raw_label}") | |
# Map the model labels to human-readable format | |
label_map = { | |
"negative": "Negative", | |
"neutral": "Neutral", | |
"positive": "Positive" | |
} | |
# Get the readable label from the map | |
formatted_label = label_map.get(raw_label, "Unknown") | |
return {"label": formatted_label, "score": result[0]['score']} | |
except Exception as e: | |
return JSONResponse(content=f"Error classifying text: {str(e)}", status_code=500) | |
async def transcribe_and_answer( | |
file: UploadFile = File(...), | |
questions: str = Form(...) | |
): | |
try: | |
# Ensure correct file format | |
if file.content_type not in ["audio/wav", "audio/mpeg", "audio/mp3", "audio/webm"]: | |
raise HTTPException(status_code=400, detail="Unsupported audio format. Please upload a WAV or MP3 file.") | |
logging.info(f"Received file type: {file.content_type}") | |
logging.info(f"Received questions: {questions}") | |
# Convert uploaded file to WAV if needed | |
audio_data = await file.read() | |
audio_file = io.BytesIO(audio_data) | |
if file.content_type in ["audio/mpeg", "audio/mp3"]: | |
audio = AudioSegment.from_file(audio_file, format="mp3") | |
audio_wav = io.BytesIO() | |
audio.export(audio_wav, format="wav") | |
audio_wav.seek(0) | |
elif file.content_type == "audio/webm": | |
audio = AudioSegment.from_file(audio_file, format="webm") | |
audio_wav = io.BytesIO() | |
audio.export(audio_wav, format="wav") | |
audio_wav.seek(0) | |
else: | |
audio_wav = audio_file | |
# Transcription | |
recognizer = sr.Recognizer() | |
with sr.AudioFile(audio_wav) as source: | |
audio = recognizer.record(source) | |
transcription_text = recognizer.recognize_google(audio) | |
# Parse questions JSON | |
try: | |
questions_dict = json.loads(questions) | |
except json.JSONDecodeError as e: | |
raise HTTPException(status_code=400, detail="Invalid JSON format for questions") | |
# Answer each question | |
answers_dict = {} | |
for key, question in questions_dict.items(): | |
QA_input = { | |
'question': question, | |
'context': transcription_text | |
} | |
# Add error handling here for model-based Q&A | |
try: | |
result = nlp_qa_v3(QA_input) # Ensure this is defined or imported correctly | |
answers_dict[key] = result['answer'] | |
except Exception as e: | |
logging.error(f"Error in question answering model: {e}") | |
answers_dict[key] = "Error in answering this question." | |
# Return transcription + answers | |
return { | |
"transcription": transcription_text, | |
"answers": answers_dict | |
} | |
except Exception as e: | |
logging.error(f"General error: {e}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def test_transcription(file: UploadFile = File(...)): | |
try: | |
# Check if the file format is supported | |
if file.content_type not in ["audio/wav", "audio/mpeg", "audio/mp3"]: | |
raise HTTPException(status_code=400, detail="Unsupported audio format. Please upload a WAV or MP3 file.") | |
# Convert uploaded file to WAV if necessary for compatibility with SpeechRecognition | |
audio_data = await file.read() | |
audio_file = io.BytesIO(audio_data) | |
if file.content_type in ["audio/mpeg", "audio/mp3"]: | |
# Convert MP3 to WAV | |
audio = AudioSegment.from_file(audio_file, format="mp3") | |
audio_wav = io.BytesIO() | |
audio.export(audio_wav, format="wav") | |
audio_wav.seek(0) | |
else: | |
audio_wav = audio_file | |
# Transcribe audio using speech_recognition | |
recognizer = sr.Recognizer() | |
with sr.AudioFile(audio_wav) as source: | |
audio = recognizer.record(source) | |
transcription = recognizer.recognize_google(audio) | |
# Return the transcription | |
return {"transcription": transcription} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error during transcription: {str(e)}") | |
# Predefined classifications | |
labels = [ | |
"All Pricing copy quote requested", | |
"Change to quote", | |
"Change to quote & Status Check", | |
"Change to quote (Items missed?)", | |
"Confirmation", | |
"Copy quote requested", | |
"Cost copy quote requested", | |
"MRSP copy quote requested", | |
"MSRP & All Pricing copy quote requested", | |
"MSRP & Cost copy quote requested", | |
"No narrative in email", | |
"Notes not clear", | |
"Retail copy quote requested", | |
"Status Check (possibly)" | |
] | |
async def fast_classify_text(statement: str = Form(...)): | |
try: | |
# Use asyncio to set a timeout for the classification | |
result = await asyncio.wait_for( | |
nlp_sequence_classification(statement, labels, multi_label=False), | |
timeout=5 # timeout in seconds | |
) | |
# Extract the best label and score | |
best_label = result["labels"][0] | |
best_score = result["scores"][0] | |
return {"classification": best_label, "confidence": best_score} | |
except asyncio.TimeoutError: | |
return JSONResponse(content="Classification timed out. Try a shorter input or increase timeout.", status_code=504) | |
except Exception as e: | |
return JSONResponse(content=f"Error in classification: {str(e)}", status_code=500) | |
# Set up CORS middleware | |
origins = ["*"] # or specify your list of allowed origins | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) |