Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, File, UploadFile, Form, Request, HTTPException | |
from fastapi.responses import HTMLResponse | |
from fastapi.templating import Jinja2Templates | |
from typing import List, Optional | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain_community.document_loaders import PyPDFLoader, UnstructuredCSVLoader, UnstructuredExcelLoader, Docx2txtLoader, UnstructuredPowerPointLoader | |
from langchain.chains import StuffDocumentsChain | |
from langchain.chains.llm import LLMChain | |
from langchain.prompts import PromptTemplate | |
from langchain.vectorstores import FAISS | |
from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
import json | |
import os | |
import google.generativeai as genai | |
import re | |
import nest_asyncio | |
import nltk | |
from langchain.text_splitter import CharacterTextSplitter | |
app = FastAPI() | |
templates = Jinja2Templates(directory="templates") | |
if os.getenv("FASTAPI_ENV") == "development": | |
nest_asyncio.apply() | |
nltk.download('averaged_perceptron_tagger_eng') | |
from nltk.tokenize import word_tokenize | |
# Initialize your model and other variables | |
uploaded_file_path = None | |
document_analyzed = False | |
summary = None | |
question_responses = [] | |
api = None | |
llm = None | |
safety_settings = [ | |
{"category": "HARM_CATEGORY_DANGEROUS", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
] | |
def format_text(text: str) -> str: | |
text = re.sub(r'\*\*(.*?)\*\*', r'<b>\1</b>', text) | |
text = text.replace('*', '<br>') | |
return text | |
# Route for main page | |
async def read_main(request: Request): | |
return templates.TemplateResponse("analyze.html", { | |
"request": request, | |
"summary": summary, | |
"show_conversation": document_analyzed, | |
"question_responses": question_responses | |
}) | |
# Route for analyzing documents | |
async def analyze_document( | |
request: Request, | |
api_key: str = Form(...), | |
iam: str = Form(...), | |
context: str = Form(...), | |
output: str = Form(...), | |
summary_length: str = Form(...), | |
file: UploadFile = File(...) | |
): | |
global uploaded_file_path, document_analyzed, summary, question_responses, api, llm | |
loader = None | |
try: | |
# Initialize or update API key and models | |
api = api_key | |
genai.configure(api_key=api) | |
llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash-latest", google_api_key=api) | |
# Save the uploaded file | |
uploaded_file_path = "uploaded_file" + os.path.splitext(file.filename)[1] | |
with open(uploaded_file_path, "wb") as f: | |
f.write(file.file.read()) | |
# Determine the file type and load accordingly | |
file_extension = os.path.splitext(uploaded_file_path)[1].lower() | |
print(f"File extension: {file_extension}") # Debugging statement | |
if file_extension == ".pdf": | |
loader = PyPDFLoader(uploaded_file_path) | |
elif file_extension == ".csv": | |
loader = UnstructuredCSVLoader(uploaded_file_path, mode="elements", encoding="utf8") | |
elif file_extension == ".xlsx": | |
loader = UnstructuredExcelLoader(uploaded_file_path, mode="elements") | |
elif file_extension == ".docx": | |
loader = Docx2txtLoader(uploaded_file_path) | |
elif file_extension == ".pptx": | |
loader = UnstructuredPowerPointLoader(uploaded_file_path) | |
elif file_extension == ".mp3": | |
# Process audio files differently | |
audio_file = genai.upload_file(path=uploaded_file_path) | |
model = genai.GenerativeModel(model_name="gemini-1.5-flash") | |
prompt = f"I am an {iam}. This file is about {context}. Answer the question based on this file: {output}. Write a {summary_length} concise summary." | |
response = model.generate_content([prompt, audio_file], safety_settings=safety_settings) | |
summary = format_text(response.text) | |
document_analyzed = True | |
outputs = {"summary": summary} | |
with open("output_summary.json", "w") as outfile: | |
json.dump(outputs, outfile) | |
return templates.TemplateResponse("analyze.html", { | |
"request": request, | |
"summary": summary, | |
"show_conversation": document_analyzed, | |
"question_responses": question_responses | |
}) | |
# If no loader is set, raise an exception | |
if loader is None: | |
raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_extension}") | |
docs = loader.load() | |
prompt_template = PromptTemplate.from_template( | |
f"I am an {iam}. This file is about {context}. Answer the question based on this file: {output}. Write a {summary_length} concise summary of the following text: {{text}}" | |
) | |
llm_chain = LLMChain(llm=llm, prompt=prompt_template) | |
stuff_chain = StuffDocumentsChain(llm_chain=llm_chain, document_variable_name="text") | |
response = stuff_chain.invoke(docs) | |
summary = format_text(response["output_text"]) | |
document_analyzed = True | |
outputs = {"summary": summary} | |
with open("output.json", "w") as outfile: | |
json.dump(outputs, outfile) | |
return templates.TemplateResponse("analyze.html", { | |
"request": request, | |
"summary": summary, | |
"show_conversation": document_analyzed, | |
"question_responses": question_responses | |
}) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"An error occurred: {e}") | |
# Route for asking questions | |
from langchain.text_splitter import CharacterTextSplitter # Ensure this is imported | |
async def ask_question(request: Request, question: str = Form(...)): | |
global uploaded_file_path, question_responses, llm, api | |
loader = None | |
if uploaded_file_path: | |
# Determine the file type and load accordingly | |
file_extension = os.path.splitext(uploaded_file_path)[1].lower() | |
if file_extension == ".pdf": | |
loader = PyPDFLoader(uploaded_file_path) | |
elif file_extension == ".csv": | |
loader = UnstructuredCSVLoader(uploaded_file_path, mode="elements") | |
elif file_extension == ".xlsx": | |
loader = UnstructuredExcelLoader(uploaded_file_path, mode="elements") | |
elif file_extension == ".docx": | |
loader = Docx2txtLoader(uploaded_file_path) | |
elif file_extension == ".pptx": | |
loader = UnstructuredPowerPointLoader(uploaded_file_path) | |
elif file_extension == ".mp3": | |
audio_file = genai.upload_file(path=uploaded_file_path) | |
model = genai.GenerativeModel(model_name="gemini-1.5-flash") | |
latest_conversation = request.cookies.get("latest_question_response", "") | |
prompt = "Answer the question based on the speech: " + question + (f" Latest conversation: {latest_conversation}" if latest_conversation else "") | |
response = model.generate_content([prompt, audio_file], safety_settings=safety_settings) | |
current_response = response.text | |
current_question = f"You asked: {question}" | |
# Save the latest question and response to the session | |
question_responses.append((current_question, current_response)) | |
# Perform vector embedding and search | |
text = current_response # Use the summary generated from the MP3 content | |
os.environ["GOOGLE_API_KEY"] = api | |
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
summary_embedding = embeddings.embed_query(text) | |
document_search = FAISS.from_texts([text], embeddings) | |
if document_search: | |
query_embedding = embeddings.embed_query(question) | |
results = document_search.similarity_search_by_vector(query_embedding, k=1) | |
if results: | |
current_response = results[0].page_content | |
else: | |
current_response = "No matching document found in the database." | |
else: | |
current_response = "Vector database not initialized." | |
# Append the question and response from FAISS search | |
question_responses.append((current_question, current_response)) | |
# Save all results including FAISS response to output.json | |
save_to_json(summary, question_responses) | |
# Save the latest question and response to the session | |
response = templates.TemplateResponse("analyze.html", {"request": request, "summary": summary, "show_conversation": document_analyzed, "question_responses": question_responses}) | |
response.set_cookie(key="latest_question_response", value=current_response) | |
return response | |
# If no loader is set, raise an exception | |
if loader is None: | |
raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_extension}") | |
docs = loader.load() | |
text = "\n".join([doc.page_content for doc in docs]) | |
os.environ["GOOGLE_API_KEY"] = api | |
# Split the text into chunks | |
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
chunks = text_splitter.split_text(text) | |
# Define the Summarize Chain for the question | |
latest_conversation = request.cookies.get("latest_question_response", "") | |
template1 = question + """ answer the question based on the following: | |
"{text}" | |
:""" + (f" Answer the Question with no more than 3 sentences. Latest conversation: {latest_conversation}" if latest_conversation else "") | |
current_response = "" | |
for chunk in chunks: | |
prompt1 = PromptTemplate.from_template(template1.format(text=chunk)) | |
# Initialize the LLMChain with the prompt | |
llm_chain1 = LLMChain(llm=llm, prompt=prompt1) | |
response1 = llm_chain1.invoke({"text": chunk}) | |
current_response += response1["text"] + "\n" | |
# Generate embeddings for the combined responses | |
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
summary_embedding = embeddings.embed_query(current_response) | |
document_search = FAISS.from_texts([current_response], embeddings) | |
# Perform a search on the FAISS vector database if it's initialized | |
if document_search: | |
query_embedding = embeddings.embed_query(question) | |
results = document_search.similarity_search_by_vector(query_embedding, k=1) | |
if results: | |
current_response = format_text(results[0].page_content) | |
else: | |
current_response = "No matching document found in the database." | |
else: | |
current_response = "Vector database not initialized." | |
# Append the question and response from FAISS search | |
current_question = f"You asked: {question}" | |
question_responses.append((current_question, current_response)) | |
# Save all results to output.json | |
save_to_json(summary, question_responses) | |
# Save the latest question and response to the session | |
response = templates.TemplateResponse("analyze.html", {"request": request, "summary": summary, "show_conversation": document_analyzed, "question_responses": question_responses}) | |
response.set_cookie(key="latest_question_response", value=current_response) | |
return response | |
else: | |
raise HTTPException(status_code=400, detail="No file has been uploaded yet.") | |
def save_to_json(summary, question_responses): | |
outputs = { | |
"summary": summary, | |
"question_responses": question_responses | |
} | |
with open("output_summary.json", "w") as outfile: | |
json.dump(outputs, outfile) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="127.0.0.1", port=8000) | |