from fastapi import FastAPI, File, UploadFile, Form, Request, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from typing import List, Optional
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.document_loaders import PyPDFLoader, UnstructuredCSVLoader, UnstructuredExcelLoader, Docx2txtLoader, UnstructuredPowerPointLoader
from langchain.chains import StuffDocumentsChain
from langchain.chains.llm import LLMChain
from langchain.prompts import PromptTemplate
from langchain.vectorstores import FAISS
from langchain_google_genai import GoogleGenerativeAIEmbeddings
import json
import os
import google.generativeai as genai
import re
import nest_asyncio
import nltk
from langchain.text_splitter import CharacterTextSplitter
app = FastAPI()
templates = Jinja2Templates(directory="templates")
if os.getenv("FASTAPI_ENV") == "development":
nest_asyncio.apply()
nltk.download('averaged_perceptron_tagger_eng')
from nltk.tokenize import word_tokenize
# Initialize your model and other variables
uploaded_file_path = None
document_analyzed = False
summary = None
question_responses = []
api = None
llm = None
safety_settings = [
{"category": "HARM_CATEGORY_DANGEROUS", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
]
def format_text(text: str) -> str:
text = re.sub(r'\*\*(.*?)\*\*', r'\1', text)
text = text.replace('*', '
')
return text
# Route for main page
@app.get("/", response_class=HTMLResponse)
async def read_main(request: Request):
return templates.TemplateResponse("analyze.html", {
"request": request,
"summary": summary,
"show_conversation": document_analyzed,
"question_responses": question_responses
})
# Route for analyzing documents
@app.post("/", response_class=HTMLResponse)
async def analyze_document(
request: Request,
api_key: str = Form(...),
iam: str = Form(...),
context: str = Form(...),
output: str = Form(...),
summary_length: str = Form(...),
file: UploadFile = File(...)
):
global uploaded_file_path, document_analyzed, summary, question_responses, api, llm
loader = None
try:
# Initialize or update API key and models
api = api_key
genai.configure(api_key=api)
llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash-latest", google_api_key=api)
# Save the uploaded file
uploaded_file_path = "uploaded_file" + os.path.splitext(file.filename)[1]
with open(uploaded_file_path, "wb") as f:
f.write(file.file.read())
# Determine the file type and load accordingly
file_extension = os.path.splitext(uploaded_file_path)[1].lower()
print(f"File extension: {file_extension}") # Debugging statement
if file_extension == ".pdf":
loader = PyPDFLoader(uploaded_file_path)
elif file_extension == ".csv":
loader = UnstructuredCSVLoader(uploaded_file_path, mode="elements", encoding="utf8")
elif file_extension == ".xlsx":
loader = UnstructuredExcelLoader(uploaded_file_path, mode="elements")
elif file_extension == ".docx":
loader = Docx2txtLoader(uploaded_file_path)
elif file_extension == ".pptx":
loader = UnstructuredPowerPointLoader(uploaded_file_path)
elif file_extension == ".mp3":
# Process audio files differently
audio_file = genai.upload_file(path=uploaded_file_path)
model = genai.GenerativeModel(model_name="gemini-1.5-flash")
prompt = f"I am an {iam}. This file is about {context}. Answer the question based on this file: {output}. Write a {summary_length} concise summary."
response = model.generate_content([prompt, audio_file], safety_settings=safety_settings)
summary = format_text(response.text)
document_analyzed = True
outputs = {"summary": summary}
with open("output_summary.json", "w") as outfile:
json.dump(outputs, outfile)
return templates.TemplateResponse("analyze.html", {
"request": request,
"summary": summary,
"show_conversation": document_analyzed,
"question_responses": question_responses
})
# If no loader is set, raise an exception
if loader is None:
raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_extension}")
docs = loader.load()
prompt_template = PromptTemplate.from_template(
f"I am an {iam}. This file is about {context}. Answer the question based on this file: {output}. Write a {summary_length} concise summary of the following text: {{text}}"
)
llm_chain = LLMChain(llm=llm, prompt=prompt_template)
stuff_chain = StuffDocumentsChain(llm_chain=llm_chain, document_variable_name="text")
response = stuff_chain.invoke(docs)
summary = format_text(response["output_text"])
document_analyzed = True
outputs = {"summary": summary}
with open("output.json", "w") as outfile:
json.dump(outputs, outfile)
return templates.TemplateResponse("analyze.html", {
"request": request,
"summary": summary,
"show_conversation": document_analyzed,
"question_responses": question_responses
})
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred: {e}")
# Route for asking questions
from langchain.text_splitter import CharacterTextSplitter # Ensure this is imported
@app.post("/ask", response_class=HTMLResponse)
async def ask_question(request: Request, question: str = Form(...)):
global uploaded_file_path, question_responses, llm, api
loader = None
if uploaded_file_path:
# Determine the file type and load accordingly
file_extension = os.path.splitext(uploaded_file_path)[1].lower()
if file_extension == ".pdf":
loader = PyPDFLoader(uploaded_file_path)
elif file_extension == ".csv":
loader = UnstructuredCSVLoader(uploaded_file_path, mode="elements")
elif file_extension == ".xlsx":
loader = UnstructuredExcelLoader(uploaded_file_path, mode="elements")
elif file_extension == ".docx":
loader = Docx2txtLoader(uploaded_file_path)
elif file_extension == ".pptx":
loader = UnstructuredPowerPointLoader(uploaded_file_path)
elif file_extension == ".mp3":
audio_file = genai.upload_file(path=uploaded_file_path)
model = genai.GenerativeModel(model_name="gemini-1.5-flash")
latest_conversation = request.cookies.get("latest_question_response", "")
prompt = "Answer the question based on the speech: " + question + (f" Latest conversation: {latest_conversation}" if latest_conversation else "")
response = model.generate_content([prompt, audio_file], safety_settings=safety_settings)
current_response = response.text
current_question = f"You asked: {question}"
# Save the latest question and response to the session
question_responses.append((current_question, current_response))
# Perform vector embedding and search
text = current_response # Use the summary generated from the MP3 content
os.environ["GOOGLE_API_KEY"] = api
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
summary_embedding = embeddings.embed_query(text)
document_search = FAISS.from_texts([text], embeddings)
if document_search:
query_embedding = embeddings.embed_query(question)
results = document_search.similarity_search_by_vector(query_embedding, k=1)
if results:
current_response = results[0].page_content
else:
current_response = "No matching document found in the database."
else:
current_response = "Vector database not initialized."
# Append the question and response from FAISS search
question_responses.append((current_question, current_response))
# Save all results including FAISS response to output.json
save_to_json(summary, question_responses)
# Save the latest question and response to the session
response = templates.TemplateResponse("analyze.html", {"request": request, "summary": summary, "show_conversation": document_analyzed, "question_responses": question_responses})
response.set_cookie(key="latest_question_response", value=current_response)
return response
# If no loader is set, raise an exception
if loader is None:
raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_extension}")
docs = loader.load()
text = "\n".join([doc.page_content for doc in docs])
os.environ["GOOGLE_API_KEY"] = api
# Split the text into chunks
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = text_splitter.split_text(text)
# Define the Summarize Chain for the question
latest_conversation = request.cookies.get("latest_question_response", "")
template1 = question + """ answer the question based on the following:
"{text}"
:""" + (f" Answer the Question with no more than 3 sentences. Latest conversation: {latest_conversation}" if latest_conversation else "")
current_response = ""
for chunk in chunks:
prompt1 = PromptTemplate.from_template(template1.format(text=chunk))
# Initialize the LLMChain with the prompt
llm_chain1 = LLMChain(llm=llm, prompt=prompt1)
response1 = llm_chain1.invoke({"text": chunk})
current_response += response1["text"] + "\n"
# Generate embeddings for the combined responses
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
summary_embedding = embeddings.embed_query(current_response)
document_search = FAISS.from_texts([current_response], embeddings)
# Perform a search on the FAISS vector database if it's initialized
if document_search:
query_embedding = embeddings.embed_query(question)
results = document_search.similarity_search_by_vector(query_embedding, k=1)
if results:
current_response = format_text(results[0].page_content)
else:
current_response = "No matching document found in the database."
else:
current_response = "Vector database not initialized."
# Append the question and response from FAISS search
current_question = f"You asked: {question}"
question_responses.append((current_question, current_response))
# Save all results to output.json
save_to_json(summary, question_responses)
# Save the latest question and response to the session
response = templates.TemplateResponse("analyze.html", {"request": request, "summary": summary, "show_conversation": document_analyzed, "question_responses": question_responses})
response.set_cookie(key="latest_question_response", value=current_response)
return response
else:
raise HTTPException(status_code=400, detail="No file has been uploaded yet.")
def save_to_json(summary, question_responses):
outputs = {
"summary": summary,
"question_responses": question_responses
}
with open("output_summary.json", "w") as outfile:
json.dump(outputs, outfile)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)