WorkspaceAI / app.py
ADKU's picture
Update the inverted completion status bug
d7f4d04 verified
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import os
import google.generativeai as genai
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from pymongo import MongoClient
import threading
import time
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# CORS Middleware Configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins (Frontend to Backend Communication)
allow_methods=["*"], # Allow all HTTP methods (GET, POST, etc.)
allow_headers=["*"], # Allow all headers
)
# ✅ Configure Gemini API
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
# ✅ Sentence Transformer
model = SentenceTransformer("all-MiniLM-L6-v2")
# ✅ MongoDB
MONGO_URI = os.getenv("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client["AiWork"]
class QueryRequest(BaseModel):
email: str
query: str
# FAISS Index (Per User)
user_indexes = {} # Stores FAISS index per user {email: FAISS index}
user_sentence_mapping = {} # Maps user emails to (id, sentence) pairs
def fetch_latest_data():
return {
"users": list(db.users.find()),
"teams": list(db.teams.find()),
"projects": list(db.projects.find()),
"modules": list(db.modules.find()),
"documents": list(db.documents.find()),
"schedules": list(db.schedules.find())
}
def generate_sentences(data):
users, teams, projects, modules, documents, schedules = (
data["users"], data["teams"], data["projects"], data["modules"], data["documents"], data["schedules"]
)
user_sentences = {} # Store categorized sentences per user
for user in users:
username = user.get("username", "Unknown User")
email = user.get("email", "Unknown Email")
if email not in user_sentences:
user_sentences[email] = {
"Teams": [],
"Projects": [],
"Modules & Tasks": [],
"Documents": [],
"Schedules": []
}
if not any(user_sentences[email].values()): # No data found for this user
user_sentences[email]["General"] = [f"User {username} is registered but has no assigned data."]
# User team ownership and membership
owned_teams = [team for team in teams if team.get("owner", {}).get("email") == email]
if owned_teams:
team_names = ", ".join(f'"{team["teamName"]}"' for team in owned_teams)
user_sentences[email]["Teams"].append(f"User {username} owns the teams: {team_names}.")
member_teams = [team for team in teams if any(m["email"] == email for m in team.get("members", []))]
if member_teams:
team_names = ", ".join(f'"{team["teamName"]}"' for team in member_teams)
user_sentences[email]["Teams"].append(f"User {username} is a member of the teams: {team_names}.")
# Find projects in teams they own or are part of
relevant_teams = owned_teams + member_teams
team_ids = [str(team["_id"]) for team in relevant_teams]
user_projects = [p for p in projects if str(p.get("owner", {}).get("teamId")) in team_ids]
if user_projects:
for project in user_projects:
proj_name = project["projName"]
team_creator = next((t["teamName"] for t in teams if str(t["_id"]) == str(project.get("owner", {}).get("teamId"))), "Unknown Team")
user_sentences[email]["Projects"].append(f"User {username} is involved in project {proj_name}, created by team {team_creator}.")
# Find modules under this project
proj_modules = [m for m in modules if str(m.get("projId")) == str(project["_id"])]
if proj_modules:
for module in proj_modules:
module_name = module["moduleName"]
user_sentences[email]["Modules & Tasks"].append(f"In project {proj_name}, module {module_name} exists.")
# Find tasks in this module assigned to the user
assigned_tasks = [
task for task in module.get("tasks", [])
if any(a["email"] == email for a in task.get("assignedTo", []))
]
if assigned_tasks:
task_details = ", ".join(
f'"{t["taskName"]}" (Status: {"Inactive" if t.get("status", False) else "Active"})'
for t in assigned_tasks
)
user_sentences[email]["Modules & Tasks"].append(f"Tasks assigned to {username} in {module_name}: {task_details}.")
# Find documents in this project
proj_docs = [d for d in documents if str(d.get("owner", {}).get("projId")) == str(project["_id"])]
if proj_docs:
doc_names = ", ".join(f'"{d["title"]}"' for d in proj_docs)
user_sentences[email]["Documents"].append(f"Documents related to project {proj_name}: {doc_names}.")
# Find meeting schedules related to their teams
user_schedules = [s for s in schedules if str(s.get("teamId")) in team_ids]
if user_schedules:
for schedule in user_schedules:
related_team = next((t["teamName"] for t in teams if str(t["_id"]) == str(schedule.get("teamId"))), "Unknown Team")
related_project = next((p["projName"] for p in projects if str(p["_id"]) == str(schedule.get("projId"))), "Unknown Project")
schedule_detail = f'{schedule["moto"]} scheduled on {schedule["date"]} at {schedule["time"]} for team {related_team} in project {related_project}.'
user_sentences[email]["Schedules"].append(schedule_detail)
return user_sentences
def fetch_initial_data():
data = {
"users": list(db.users.find()),
"teams": list(db.teams.find()),
"projects": list(db.projects.find()),
"modules": list(db.modules.find()),
"documents": list(db.documents.find()),
"schedules": list(db.schedules.find())
}
user_sentences = generate_sentences(data)
user_count = 0 # Track users added to FAISS
for email, categories in user_sentences.items():
sentences = sum(categories.values(), []) # Flatten categorized sentences
print(f"User: {email}, Sentences Count: {len(sentences)}") # Debugging Output
if sentences:
user_count += 1
embedding_dim = model.get_sentence_embedding_dimension()
user_indexes[email] = faiss.IndexFlatL2(embedding_dim)
embeddings = model.encode(sentences, convert_to_numpy=True)
user_indexes[email].add(embeddings)
user_sentence_mapping[email] = [(idx, s) for idx, s in enumerate(sentences)]
print(f"Total Users Indexed in FAISS: {user_count} / {len(data['users'])}")
def update_user_embeddings(email):
"""
Regenerate structured sentences for the user, update FAISS index.
"""
data = {
"users": list(db.users.find({"email": email})),
"teams": list(db.teams.find()),
"projects": list(db.projects.find()),
"modules": list(db.modules.find()),
"documents": list(db.documents.find()),
"schedules": list(db.schedules.find())
}
user_sentences = generate_sentences(data)
if email in user_sentences:
sentences = sum(user_sentences[email].values(), []) # Flatten structured sentences
if sentences:
embeddings = model.encode(sentences, convert_to_numpy=True)
embedding_dim = model.get_sentence_embedding_dimension()
# Rebuild FAISS index for this user
user_indexes[email] = faiss.IndexFlatL2(embedding_dim)
user_indexes[email].add(embeddings)
user_sentence_mapping[email] = [(idx, s) for idx, s in enumerate(sentences)]
print(f"Updated embeddings for {email}. Total sentences: {len(sentences)}")
def watch_changes():
"""Monitor MongoDB for changes, identify affected users, and update embeddings dynamically."""
print("Watching MongoDB for changes...")
while True:
try:
with db.watch() as stream: # Watch the entire database
for change in stream:
print("Detected Change:", change) # Debugging print
operation = change["operationType"]
collection_name = change["ns"]["coll"] # Get the collection that changed
doc_id = change["documentKey"]["_id"]
emails = set() # Store affected user emails
# Fetch user email based on the collection that was updated
if collection_name == "users":
full_doc = change.get("fullDocument", {})
if full_doc and "email" in full_doc:
emails.add(full_doc["email"])
elif collection_name == "teams":
team_doc = db.teams.find_one({"_id": doc_id})
if team_doc and "owner" in team_doc:
emails.add(team_doc["owner"].get("email"))
elif collection_name == "projects":
project_doc = db.projects.find_one({"_id": doc_id})
if project_doc and "owner" in project_doc:
emails.add(project_doc["owner"].get("email"))
elif collection_name == "modules":
module_doc = db.modules.find_one({"_id": doc_id})
if module_doc:
# Fetch users assigned to the module
for user in module_doc.get("assignedTo", []):
if "email" in user:
emails.add(user["email"])
elif collection_name == "documents":
doc = db.documents.find_one({"_id": doc_id})
if doc and "owner" in doc:
emails.add(doc["owner"].get("email"))
elif collection_name == "schedules":
schedule_doc = db.schedules.find_one({"_id": doc_id})
if schedule_doc:
team_id = schedule_doc.get("teamId")
team_doc = db.teams.find_one({"_id": team_id})
if team_doc and "owner" in team_doc:
emails.add(team_doc["owner"].get("email"))
if emails:
for email in emails:
print(f"Detected {operation} for user: {email}")
if operation in ["insert", "update", "delete"]:
update_user_embeddings(email)
print(f"Updated user {email} due to {operation} operation.")
else:
print(f"Change detected in {collection_name}, but no associated email found.")
except Exception as e:
print(f"Error in watch_changes(): {e}")
print("Reconnecting to MongoDB Change Stream in 5 seconds...")
time.sleep(5) # Prevent infinite error loops
def get_relevant_sentences(email, query):
"""Retrieve relevant sentences using FAISS for the given user and query."""
if email not in user_indexes:
return ["User not found or no data available."]
index = user_indexes[email] # FAISS index for the user
sentence_data = user_sentence_mapping.get(email, []) # Sentence mapping
if not sentence_data:
return ["No stored sentences for this user."]
# Compute query embedding
query_embedding = model.encode([query], convert_to_numpy=True)
# Perform FAISS search (top-k nearest neighbors)
k = min(100, len(sentence_data)) # Limit k to available sentences
distances, indices = index.search(query_embedding, k)
# Set a similarity threshold to filter results
threshold = 1.5
relevant_sentences = [sentence_data[idx][1] for dist, idx in zip(distances[0], indices[0]) if dist < threshold]
return relevant_sentences if relevant_sentences else ["No relevant information found."]
def generate_response(email, query):
"""Generate a natural language response using Gemini based on FAISS search results."""
relevant_sentences = get_relevant_sentences(email, query)
# Construct prompt using relevant sentences
prompt = f"Query: {query}\nContext:\n" + "\n".join(relevant_sentences) + "\nAnswer in a natural way."
# Use Gemini API to generate response
model = genai.GenerativeModel("gemini-1.5-flash")
response = model.generate_content(prompt)
return response.text if response.text else "I'm unable to find relevant information."
@app.post("/chat")
async def chat(request: QueryRequest):
try:
response = generate_response(request.email, request.query)
return {"response": response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
def home():
return {"message": "AI Workspace Backend Running"}
if __name__ == "__main__":
# Fetch initial data for FAISS indexing
fetch_initial_data()
# Start watching for real-time changes in a separate thread
threading.Thread(target=watch_changes, daemon=True).start()
print(f"Active Threads: {threading.active_count()}") # Debugging thread count
uvicorn.run(app, host="0.0.0.0", port=7860)