Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import os | |
| import google.generativeai as genai | |
| import faiss | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| from pymongo import MongoClient | |
| import threading | |
| import time | |
| import uvicorn | |
| from fastapi.middleware.cors import CORSMiddleware | |
| app = FastAPI() | |
| # CORS Middleware Configuration | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Allow all origins (Frontend to Backend Communication) | |
| allow_methods=["*"], # Allow all HTTP methods (GET, POST, etc.) | |
| allow_headers=["*"], # Allow all headers | |
| ) | |
| # ✅ Configure Gemini API | |
| genai.configure(api_key=os.getenv("GEMINI_API_KEY")) | |
| # ✅ Sentence Transformer | |
| model = SentenceTransformer("all-MiniLM-L6-v2") | |
| # ✅ MongoDB | |
| MONGO_URI = os.getenv("MONGO_URI") | |
| client = MongoClient(MONGO_URI) | |
| db = client["AiWork"] | |
| class QueryRequest(BaseModel): | |
| email: str | |
| query: str | |
| # FAISS Index (Per User) | |
| user_indexes = {} # Stores FAISS index per user {email: FAISS index} | |
| user_sentence_mapping = {} # Maps user emails to (id, sentence) pairs | |
| def fetch_latest_data(): | |
| return { | |
| "users": list(db.users.find()), | |
| "teams": list(db.teams.find()), | |
| "projects": list(db.projects.find()), | |
| "modules": list(db.modules.find()), | |
| "documents": list(db.documents.find()), | |
| "schedules": list(db.schedules.find()) | |
| } | |
| def generate_sentences(data): | |
| users, teams, projects, modules, documents, schedules = ( | |
| data["users"], data["teams"], data["projects"], data["modules"], data["documents"], data["schedules"] | |
| ) | |
| user_sentences = {} # Store categorized sentences per user | |
| for user in users: | |
| username = user.get("username", "Unknown User") | |
| email = user.get("email", "Unknown Email") | |
| if email not in user_sentences: | |
| user_sentences[email] = { | |
| "Teams": [], | |
| "Projects": [], | |
| "Modules & Tasks": [], | |
| "Documents": [], | |
| "Schedules": [] | |
| } | |
| if not any(user_sentences[email].values()): # No data found for this user | |
| user_sentences[email]["General"] = [f"User {username} is registered but has no assigned data."] | |
| # User team ownership and membership | |
| owned_teams = [team for team in teams if team.get("owner", {}).get("email") == email] | |
| if owned_teams: | |
| team_names = ", ".join(f'"{team["teamName"]}"' for team in owned_teams) | |
| user_sentences[email]["Teams"].append(f"User {username} owns the teams: {team_names}.") | |
| member_teams = [team for team in teams if any(m["email"] == email for m in team.get("members", []))] | |
| if member_teams: | |
| team_names = ", ".join(f'"{team["teamName"]}"' for team in member_teams) | |
| user_sentences[email]["Teams"].append(f"User {username} is a member of the teams: {team_names}.") | |
| # Find projects in teams they own or are part of | |
| relevant_teams = owned_teams + member_teams | |
| team_ids = [str(team["_id"]) for team in relevant_teams] | |
| user_projects = [p for p in projects if str(p.get("owner", {}).get("teamId")) in team_ids] | |
| if user_projects: | |
| for project in user_projects: | |
| proj_name = project["projName"] | |
| team_creator = next((t["teamName"] for t in teams if str(t["_id"]) == str(project.get("owner", {}).get("teamId"))), "Unknown Team") | |
| user_sentences[email]["Projects"].append(f"User {username} is involved in project {proj_name}, created by team {team_creator}.") | |
| # Find modules under this project | |
| proj_modules = [m for m in modules if str(m.get("projId")) == str(project["_id"])] | |
| if proj_modules: | |
| for module in proj_modules: | |
| module_name = module["moduleName"] | |
| user_sentences[email]["Modules & Tasks"].append(f"In project {proj_name}, module {module_name} exists.") | |
| # Find tasks in this module assigned to the user | |
| assigned_tasks = [ | |
| task for task in module.get("tasks", []) | |
| if any(a["email"] == email for a in task.get("assignedTo", [])) | |
| ] | |
| if assigned_tasks: | |
| task_details = ", ".join( | |
| f'"{t["taskName"]}" (Status: {"Inactive" if t.get("status", False) else "Active"})' | |
| for t in assigned_tasks | |
| ) | |
| user_sentences[email]["Modules & Tasks"].append(f"Tasks assigned to {username} in {module_name}: {task_details}.") | |
| # Find documents in this project | |
| proj_docs = [d for d in documents if str(d.get("owner", {}).get("projId")) == str(project["_id"])] | |
| if proj_docs: | |
| doc_names = ", ".join(f'"{d["title"]}"' for d in proj_docs) | |
| user_sentences[email]["Documents"].append(f"Documents related to project {proj_name}: {doc_names}.") | |
| # Find meeting schedules related to their teams | |
| user_schedules = [s for s in schedules if str(s.get("teamId")) in team_ids] | |
| if user_schedules: | |
| for schedule in user_schedules: | |
| related_team = next((t["teamName"] for t in teams if str(t["_id"]) == str(schedule.get("teamId"))), "Unknown Team") | |
| related_project = next((p["projName"] for p in projects if str(p["_id"]) == str(schedule.get("projId"))), "Unknown Project") | |
| schedule_detail = f'{schedule["moto"]} scheduled on {schedule["date"]} at {schedule["time"]} for team {related_team} in project {related_project}.' | |
| user_sentences[email]["Schedules"].append(schedule_detail) | |
| return user_sentences | |
| def fetch_initial_data(): | |
| data = { | |
| "users": list(db.users.find()), | |
| "teams": list(db.teams.find()), | |
| "projects": list(db.projects.find()), | |
| "modules": list(db.modules.find()), | |
| "documents": list(db.documents.find()), | |
| "schedules": list(db.schedules.find()) | |
| } | |
| user_sentences = generate_sentences(data) | |
| user_count = 0 # Track users added to FAISS | |
| for email, categories in user_sentences.items(): | |
| sentences = sum(categories.values(), []) # Flatten categorized sentences | |
| print(f"User: {email}, Sentences Count: {len(sentences)}") # Debugging Output | |
| if sentences: | |
| user_count += 1 | |
| embedding_dim = model.get_sentence_embedding_dimension() | |
| user_indexes[email] = faiss.IndexFlatL2(embedding_dim) | |
| embeddings = model.encode(sentences, convert_to_numpy=True) | |
| user_indexes[email].add(embeddings) | |
| user_sentence_mapping[email] = [(idx, s) for idx, s in enumerate(sentences)] | |
| print(f"Total Users Indexed in FAISS: {user_count} / {len(data['users'])}") | |
| def update_user_embeddings(email): | |
| """ | |
| Regenerate structured sentences for the user, update FAISS index. | |
| """ | |
| data = { | |
| "users": list(db.users.find({"email": email})), | |
| "teams": list(db.teams.find()), | |
| "projects": list(db.projects.find()), | |
| "modules": list(db.modules.find()), | |
| "documents": list(db.documents.find()), | |
| "schedules": list(db.schedules.find()) | |
| } | |
| user_sentences = generate_sentences(data) | |
| if email in user_sentences: | |
| sentences = sum(user_sentences[email].values(), []) # Flatten structured sentences | |
| if sentences: | |
| embeddings = model.encode(sentences, convert_to_numpy=True) | |
| embedding_dim = model.get_sentence_embedding_dimension() | |
| # Rebuild FAISS index for this user | |
| user_indexes[email] = faiss.IndexFlatL2(embedding_dim) | |
| user_indexes[email].add(embeddings) | |
| user_sentence_mapping[email] = [(idx, s) for idx, s in enumerate(sentences)] | |
| print(f"Updated embeddings for {email}. Total sentences: {len(sentences)}") | |
| def watch_changes(): | |
| """Monitor MongoDB for changes, identify affected users, and update embeddings dynamically.""" | |
| print("Watching MongoDB for changes...") | |
| while True: | |
| try: | |
| with db.watch() as stream: # Watch the entire database | |
| for change in stream: | |
| print("Detected Change:", change) # Debugging print | |
| operation = change["operationType"] | |
| collection_name = change["ns"]["coll"] # Get the collection that changed | |
| doc_id = change["documentKey"]["_id"] | |
| emails = set() # Store affected user emails | |
| # Fetch user email based on the collection that was updated | |
| if collection_name == "users": | |
| full_doc = change.get("fullDocument", {}) | |
| if full_doc and "email" in full_doc: | |
| emails.add(full_doc["email"]) | |
| elif collection_name == "teams": | |
| team_doc = db.teams.find_one({"_id": doc_id}) | |
| if team_doc and "owner" in team_doc: | |
| emails.add(team_doc["owner"].get("email")) | |
| elif collection_name == "projects": | |
| project_doc = db.projects.find_one({"_id": doc_id}) | |
| if project_doc and "owner" in project_doc: | |
| emails.add(project_doc["owner"].get("email")) | |
| elif collection_name == "modules": | |
| module_doc = db.modules.find_one({"_id": doc_id}) | |
| if module_doc: | |
| # Fetch users assigned to the module | |
| for user in module_doc.get("assignedTo", []): | |
| if "email" in user: | |
| emails.add(user["email"]) | |
| elif collection_name == "documents": | |
| doc = db.documents.find_one({"_id": doc_id}) | |
| if doc and "owner" in doc: | |
| emails.add(doc["owner"].get("email")) | |
| elif collection_name == "schedules": | |
| schedule_doc = db.schedules.find_one({"_id": doc_id}) | |
| if schedule_doc: | |
| team_id = schedule_doc.get("teamId") | |
| team_doc = db.teams.find_one({"_id": team_id}) | |
| if team_doc and "owner" in team_doc: | |
| emails.add(team_doc["owner"].get("email")) | |
| if emails: | |
| for email in emails: | |
| print(f"Detected {operation} for user: {email}") | |
| if operation in ["insert", "update", "delete"]: | |
| update_user_embeddings(email) | |
| print(f"Updated user {email} due to {operation} operation.") | |
| else: | |
| print(f"Change detected in {collection_name}, but no associated email found.") | |
| except Exception as e: | |
| print(f"Error in watch_changes(): {e}") | |
| print("Reconnecting to MongoDB Change Stream in 5 seconds...") | |
| time.sleep(5) # Prevent infinite error loops | |
| def get_relevant_sentences(email, query): | |
| """Retrieve relevant sentences using FAISS for the given user and query.""" | |
| if email not in user_indexes: | |
| return ["User not found or no data available."] | |
| index = user_indexes[email] # FAISS index for the user | |
| sentence_data = user_sentence_mapping.get(email, []) # Sentence mapping | |
| if not sentence_data: | |
| return ["No stored sentences for this user."] | |
| # Compute query embedding | |
| query_embedding = model.encode([query], convert_to_numpy=True) | |
| # Perform FAISS search (top-k nearest neighbors) | |
| k = min(100, len(sentence_data)) # Limit k to available sentences | |
| distances, indices = index.search(query_embedding, k) | |
| # Set a similarity threshold to filter results | |
| threshold = 1.5 | |
| relevant_sentences = [sentence_data[idx][1] for dist, idx in zip(distances[0], indices[0]) if dist < threshold] | |
| return relevant_sentences if relevant_sentences else ["No relevant information found."] | |
| def generate_response(email, query): | |
| """Generate a natural language response using Gemini based on FAISS search results.""" | |
| relevant_sentences = get_relevant_sentences(email, query) | |
| # Construct prompt using relevant sentences | |
| prompt = f"Query: {query}\nContext:\n" + "\n".join(relevant_sentences) + "\nAnswer in a natural way." | |
| # Use Gemini API to generate response | |
| model = genai.GenerativeModel("gemini-1.5-flash") | |
| response = model.generate_content(prompt) | |
| return response.text if response.text else "I'm unable to find relevant information." | |
| async def chat(request: QueryRequest): | |
| try: | |
| response = generate_response(request.email, request.query) | |
| return {"response": response} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def home(): | |
| return {"message": "AI Workspace Backend Running"} | |
| if __name__ == "__main__": | |
| # Fetch initial data for FAISS indexing | |
| fetch_initial_data() | |
| # Start watching for real-time changes in a separate thread | |
| threading.Thread(target=watch_changes, daemon=True).start() | |
| print(f"Active Threads: {threading.active_count()}") # Debugging thread count | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |