ConversAI / app.py
ishworrsubedii's picture
Update app.py
2e0e1b9 verified
raw
history blame
No virus
22.9 kB
import io
import tempfile
import jwt
import base64
import json
from click import option
from jwt import ExpiredSignatureError, InvalidTokenError
from starlette import status
from functions import *
import pandas as pd
from fastapi import FastAPI, File, UploadFile, HTTPException,Request
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from src.api.speech_api import speech_translator_router
from functions import client as supabase
from urllib.parse import urlparse
import nltk
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from dateutil.parser import isoparse
nltk.download('punkt_tab')
app = FastAPI(title="ConversAI", root_path="/api/v1")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(speech_translator_router, prefix="/speech")
@app.post("/signup")
async def sign_up(email, username, password):
res, _ = supabase.auth.sign_up(
{"email": email, "password": password, "role": "user"}
)
user_id = res[1].id
r_ = createUser(user_id=user_id, username=username, email=email)
if r_.get('code') == 409:
return r_
elif r_.get('code') == 200:
response = {
"status": "success",
"code": 200,
"message": "Please check you email address for email verification",
}
else:
response = {
"status": "failed",
"code": 400,
"message": "Failed to sign up please try again later",
}
return response
@app.post("/session-check")
async def check_session(user_id: str):
res = supabase.auth.get_session()
if res == None:
try:
supabase.table("Stores").delete().eq(
"StoreID", user_id
).execute()
resp = supabase.auth.sign_out()
response = {"message": "success", "code": 200, "Session": res}
return response
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
return res
@app.post("/get-user")
async def get_user(access_token):
res = supabase.auth.get_user(jwt=access_token)
return res
@app.post("/referesh-token")
async def refresh_token(refresh_token):
res = supabase.auth.refresh_token(refresh_token)
return res
@app.post("/login")
async def sign_in(email, password):
try:
res = supabase.auth.sign_in_with_password(
{"email": email, "password": password}
)
user_id = res.user.id
access_token = res.session.access_token
refresh_token = res.session.refresh_token
store_session_check = supabase.table("Stores").select("*").filter("StoreID", "eq", user_id).execute()
store_id = None
if store_session_check and store_session_check.data:
store_id = store_session_check.data[0].get("StoreID")
userData = supabase.table("ConversAI_UserInfo").select("*").filter("user_id", "eq", user_id).execute().data
username = userData[0]["username"]
if not store_id:
response = (
supabase.table("Stores").insert(
{
"AccessToken": access_token,
"StoreID": user_id,
"RefreshToken": refresh_token,
"email": email
}
).execute()
)
message = {
"message": "Success",
"code": status.HTTP_200_OK,
"username": username,
"user_id": user_id,
"access_token": access_token,
"refresh_token": refresh_token,
}
return message
elif store_id == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="You are already signed in. Please sign out first to sign in again."
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Failed to sign in. Please check your credentials."
)
except HTTPException as http_exc:
raise http_exc
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An unexpected error occurred during sign-in: {str(e)}"
)
@app.post("/login_with_token")
async def login_with_token(access_token: str, refresh_token: str):
try:
decoded_token = jwt.decode(access_token, options={"verify_signature": False})
user_id_oauth = decoded_token.get("sub")
try:
user_id = supabase.table("ConversAI_UserInfo").select("*").filter("user_id", "eq", user_id_oauth).execute()
user_id = supabase.table("ConversAI_UserInfo").select("*").filter("email", "eq", user_id_oauth).execute()
user_name = user_id.data[0]["username"]
except:
user_name = ''
json = {
"code": status.HTTP_200_OK,
"user_id": decoded_token.get("sub"),
"email": decoded_token.get("email"),
"access_token": access_token,
"refresh_token": refresh_token,
"issued_at": decoded_token.get("iat"),
"expires_at": decoded_token.get("exp"),
"username": user_name
}
return json
except (ExpiredSignatureError, InvalidTokenError) as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e))
@app.post("/user_name")
async def user_name_(username: str, user_id: str, email: str):
r_ = createUser(user_id=user_id, username=username, email=email)
return r_
@app.post("/set-session-data")
async def set_session_data(access_token, refresh_token, user_id):
res = supabase.auth.set_session(access_token, refresh_token)
store_session_check = supabase.table("Stores").select("*").filter("StoreID", "eq", user_id).execute()
store_id = None
if store_session_check and store_session_check.data:
store_id = store_session_check.data[0].get("StoreID")
if not store_id:
response = (
supabase.table("Stores").insert(
{
"AccessToken": access_token,
"StoreID": user_id,
"RefreshToken": refresh_token,
}
).execute()
)
res = {
"message": "success",
"code": 200,
"session_data": res,
}
return res
@app.post("/logout")
async def sign_out(user_id):
try:
supabase.table("Stores").delete().eq(
"StoreID", user_id
).execute()
res = supabase.auth.sign_out()
response = {"message": "success"}
return response
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/oauth")
async def oauth():
res = supabase.auth.sign_in_with_oauth(
{"provider": "google", "options": {"redirect_to": "https://convers-ai-lac.vercel.app/"}})
return res
@app.post("/newChatbot")
async def newChatbot(chatbotName: str, username: str):
currentBotCount = len(listTables(username=username)["output"])
limit = supabase.table("ConversAI_UserConfig").select("chatbotLimit").eq("user_id", username).execute().data[0][
"chatbotLimit"]
if currentBotCount >= int(limit):
return {
"output": "CHATBOT LIMIT EXCEEDED"
}
supabase.table("ConversAI_ChatbotInfo").insert({"user_id": username, "chatbotname": chatbotName}).execute()
chatbotName = f"convai${username}${chatbotName}"
return createTable(tablename=chatbotName)
@app.post("/loadPDF")
async def loadPDF(vectorstore: str, pdf: UploadFile = File(...)):
username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2]
source = pdf.filename
pdf = await pdf.read()
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
temp_file.write(pdf)
temp_file_path = temp_file.name
text = extractTextFromPdf(temp_file_path)
os.remove(temp_file_path)
dct = {
"output": text,
"source": source
}
dct = json.dumps(dct, indent = 1).encode("utf-8")
fileName = createDataSourceName(sourceName = source)
response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json")
response = (
supabase.table("ConversAI_ChatbotDataSources")
.insert({"username": username,
"chatbotName": chatbotName,
"dataSourceName": fileName,
"sourceEndpoint": "\loadPDF",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")})
.execute()
)
return {
"output": "SUCCESS"
}
@app.post("/loadImagePDF")
async def loadImagePDF(vectorstore: str, pdf: UploadFile = File(...)):
username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2]
source = pdf.filename
pdf = await pdf.read()
text = getTextFromImagePDF(pdfBytes=pdf)
dct = {
"output": text,
"source": source
}
dct = json.dumps(dct, indent = 1).encode("utf-8")
fileName = createDataSourceName(sourceName = source)
response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json")
response = (
supabase.table("ConversAI_ChatbotDataSources")
.insert({"username": username,
"chatbotName": chatbotName,
"dataSourceName": fileName,
"sourceEndpoint": "\loadImagePDF",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")})
.execute()
)
return {
"output": "SUCCESS"
}
class AddText(BaseModel):
vectorstore: str
text: str
@app.post("/loadText")
async def loadText(addTextConfig: AddText):
vectorstore, text = addTextConfig.vectorstore, addTextConfig.text
username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2]
dct = {
"output": text,
"source": "Text"
}
dct = json.dumps(dct, indent = 1).encode("utf-8")
fileName = createDataSourceName(sourceName = "Text")
response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json")
response = (
supabase.table("ConversAI_ChatbotDataSources")
.insert({"username": username,
"chatbotName": chatbotName,
"dataSourceName": fileName,
"sourceEndpoint": "\loadText",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")})
.execute()
)
return {
"output": "SUCCESS"
}
@app.post("/addText")
async def addText(addTextConfig: AddText):
vectorstore, text, source = addTextConfig.vectorstore, addTextConfig.text, addTextConfig.source
text = base64.b64decode(text.encode("utf-8")).decode("utf-8")
username, chatbotname = vectorstore.split("$")[1], vectorstore.split("$")[2]
df = pd.DataFrame(supabase.table("ConversAI_ChatbotInfo").select("*").execute().data)
currentCount = df[(df["user_id"] == username) & (df["chatbotname"] == chatbotname)]["charactercount"].iloc[0]
newCount = currentCount + len(text)
limit = supabase.table("ConversAI_UserConfig").select("tokenLimit").eq("user_id", username).execute().data[0][
"tokenLimit"]
if newCount < int(limit):
supabase.table("ConversAI_ChatbotInfo").update({"charactercount": str(newCount)}).eq("user_id", username).eq(
"chatbotname", chatbotname).execute()
output = addDocuments(text=text, source=source, vectorstore=vectorstore)
return output
else:
return {
"output": "WEBSITE EXCEEDING LIMITS, PLEASE TRY WITH A SMALLER DOCUMENT."
}
class AddQAPair(BaseModel):
vectorstore: str
question: str
answer: str
@app.post("/addQAPair")
async def addQAPairData(addQaPair: AddQAPair):
username, chatbotname = addQaPair.vectorstore.split("$")[1], addQaPair.vectorstore.split("$")[2]
df = pd.DataFrame(supabase.table("ConversAI_ChatbotInfo").select("*").execute().data)
currentCount = df[(df["user_id"] == username) & (df["chatbotname"] == chatbotname)]["charactercount"].iloc[0]
qa = f"QUESTION: {addQaPair.question}\tANSWER: {addQaPair.answer}"
newCount = currentCount + len(qa)
limit = supabase.table("ConversAI_UserConfig").select("tokenLimit").eq("user_id", username).execute().data[0][
"tokenLimit"]
if newCount < int(limit):
supabase.table("ConversAI_ChatbotInfo").update({"charactercount": str(newCount)}).eq("user_id", username).eq(
"chatbotname", chatbotname).execute()
return addDocuments(text=qa, source="Q&A Pairs", vectorstore=addQaPair.vectorstore)
else:
return {
"output": "WEBSITE EXCEEDING LIMITS, PLEASE TRY WITH A SMALLER DOCUMENT."
}
class LoadWebsite(BaseModel):
vectorstore: str
urls: list[str]
source: str
@app.post("/loadWebURLs")
async def loadWebURLs(loadWebsite: LoadWebsite):
vectorstore, urls, source = loadWebsite.vectorstore, loadWebsite.urls, loadWebsite.source
username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2]
text = extractTextFromUrlList(urls=urls)
dct = {
"output": text,
"source": source
}
dct = json.dumps(dct, indent = 1).encode("utf-8")
fileName = createDataSourceName(sourceName = source)
response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json")
response = (
supabase.table("ConversAI_ChatbotDataSources")
.insert({"username": username,
"chatbotName": chatbotName,
"dataSourceName": fileName,
"sourceEndpoint": "\loadWebURLs",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")})
.execute()
)
return {
"output": "SUCCESS"
}
@app.post("/answerQuery")
async def answerQuestion(query: str, vectorstore: str, llmModel: str = "llama3-70b-8192"):
username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2]
output = answerQuery(query=query, vectorstore=vectorstore, llmModel=llmModel)
response = (
supabase.table("ConversAI_ChatHistory")
.insert({"username": username, "chatbotName": chatbotName, "llmModel": llmModel, "question": query,
"response": output["output"]})
.execute()
)
return output
@app.post("/deleteChatbot")
async def delete(chatbotName: str):
username, chatbotName = chatbotName.split("$")[1], chatbotName.split("$")[2]
supabase.table('ConversAI_ChatbotInfo').delete().eq('user_id', username).eq('chatbotname', chatbotName).execute()
return deleteTable(tableName=chatbotName)
@app.post("/listChatbots")
async def delete(username: str):
return listTables(username=username)
@app.post("/getLinks")
async def crawlUrl(baseUrl: str):
return {
"urls": getLinks(url=baseUrl, timeout=30),
"source": urlparse(baseUrl).netloc
}
@app.post("/getCurrentCount")
async def getCount(vectorstore: str):
username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2]
df = pd.DataFrame(supabase.table("ConversAI_ChatbotInfo").select("*").execute().data)
return {
"currentCount": df[(df['user_id'] == username) & (df['chatbotname'] == chatbotName)]['charactercount'].iloc[0]
}
class YtTranscript(BaseModel):
vectorstore: str
urls: list[str]
@app.post("/getYoutubeTranscript")
async def getYoutubeTranscript(ytTranscript: YtTranscript):
vectorstore, urls = ytTranscript.vectorstore, ytTranscript.urls
username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2]
text = getTranscript(urls=urls)
dct = {
"output": text,
"source": "www.youtube.com"
}
dct = json.dumps(dct, indent = 1).encode("utf-8")
fileName = createDataSourceName(sourceName = "youtube")
response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json")
response = (
supabase.table("ConversAI_ChatbotDataSources")
.insert({"username": username,
"chatbotName": chatbotName,
"dataSourceName": fileName,
"sourceEndpoint": "\getYoutubeTranscript",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")})
.execute()
)
return {
"output": "SUCCESS"
}
@app.post("/analyzeData")
async def analyzeAndAnswer(query: str, file: UploadFile = File(...)):
extension = file.filename.split(".")[-1]
try:
if extension in ["xls", "xlsx", "xlsm", "xlsb"]:
df = pd.read_excel(io.BytesIO(await file.read()))
response = analyzeData(query=query, dataframe=df)
elif extension == "csv":
df = pd.read_csv(io.BytesIO(await file.read()))
response = analyzeData(query=query, dataframe=df)
else:
response = "INVALID FILE TYPE"
return {
"output": response
}
except:
return {
"output": "UNABLE TO ANSWER QUERY"
}
@app.post("/getChatHistory")
async def chatHistory(vectorstore: str):
username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2]
response = supabase.table("ConversAI_ChatHistory").select("timestamp", "question", "response").eq("username",
username).eq(
"chatbotName", chatbotName).execute().data
return response
# @app.post("/trainChatbot")
# async def chatHistory(vectorstore: str):
# username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2]
# return response
def get_ip_info(ip: str):
try:
response = requests.get(f"https://ipinfo.io/{ip}/json")
data = response.json()
return data.get("city", "Unknown")
except Exception as e:
return "Unknown"
@app.post("/daily_chat_count")
async def daily_chat_count(days: int = 7):
response = supabase.table("ConversAI_ChatHistory").select("*").execute().data
seven_days_ago = datetime.now().astimezone() - timedelta(days=days)
dates = [
isoparse(i["timestamp"]).date()
for i in response
if isoparse(i["timestamp"]) >= seven_days_ago
]
date_count = Counter(dates)
data = [{"date": date.isoformat(), "count": count} for date, count in date_count.items()]
return {"data": data}
@app.post("/daily_active_end_user")
async def daily_active_end_user(days: int = 7):
response = supabase.table("ConversAI_ChatHistory").select("*").execute().data
seven_days_ago = datetime.now().astimezone() - timedelta(days=days)
ip_by_date = defaultdict(set)
for i in response:
timestamp = isoparse(i["timestamp"])
ip_address = i["IpAddress"]
if timestamp >= seven_days_ago:
date = timestamp.date()
ip_by_date[date].add(ip_address)
data = [{"date": date.isoformat(), "terminal": len(ips)} for date, ips in ip_by_date.items() if len(ips) > 1]
return {"data": data}
@app.post("/average_session_interaction")
async def average_session_interaction(days: int = 7):
response = supabase.table("ConversAI_ChatHistory").select("*").execute().data
seven_days_ago = datetime.now().astimezone() - timedelta(days=days)
total_messages_by_date = defaultdict(int)
unique_ips_by_date = defaultdict(set)
for i in response:
timestamp = isoparse(i["timestamp"])
ip_address = i["IpAddress"]
if timestamp >= seven_days_ago:
date = timestamp.date()
total_messages_by_date[date] += 1
unique_ips_by_date[date].add(ip_address)
data = []
for date in sorted(total_messages_by_date.keys()):
total_messages = total_messages_by_date[date]
unique_ips = len(unique_ips_by_date[date])
average_interactions = total_messages / unique_ips if unique_ips > 0 else 0
data.append({"date": date.isoformat(), "interactions": average_interactions})
return {"data": data}
@app.post("/token_usages")
async def token_usages(days: int = 7):
response = supabase.table("ConversAI_ChatHistory").select("*").execute().data
seven_days_ago = datetime.now().astimezone() - timedelta(days=days)
token_usage_by_date = defaultdict(int)
for i in response:
timestamp = isoparse(i["timestamp"])
if timestamp >= seven_days_ago:
date = timestamp.date()
response_token_count = i.get("ResponseTokenCount")
if response_token_count is not None:
token_usage_by_date[date] += response_token_count
data = [{"date": date.isoformat(), "total_tokens": total_tokens} for date, total_tokens in
token_usage_by_date.items()]
return {"data": data}
@app.post("/add_feedback")
async def add_feedback(request: Request, feedback: str, user_id: str):
client_ip = request.client.host
city = get_ip_info(client_ip)
response = supabase.table("ConversAI_Feedback").insert(
{"feedback": feedback, "user_id": user_id, "city": city, "ip": client_ip}).execute()
return {"message": "success"}
@app.post("/user_satisfaction_rate")
async def user_satisfaction_rate():
response = supabase.table("ConversAI_Feedback").select("*").execute().data
seven_days_ago = datetime.now().astimezone() - timedelta(days=7)
feedback_counts = defaultdict(lambda: {"like": 0, "dislike": 0})
for i in response:
timestamp = isoparse(i["timestamp"])
if timestamp >= seven_days_ago:
date = timestamp.date()
feedback = i.get("feedback")
if feedback == "like":
feedback_counts[date]["like"] += 1
elif feedback == "dislike":
feedback_counts[date]["dislike"] += 1
data = []
for date in sorted(feedback_counts.keys()):
like_count = feedback_counts[date]["like"]
dislike_count = feedback_counts[date]["dislike"]
total_feedback = like_count + dislike_count
satisfaction_rate = (like_count / total_feedback * 100) if total_feedback > 0 else 0
data.append({"date": date.isoformat(), "rate": satisfaction_rate})
return {"data": data}