|
|
|
from fastapi import Request, Depends, HTTPException |
|
from fastapi.responses import JSONResponse |
|
from motor.motor_asyncio import AsyncIOMotorClient |
|
import datetime |
|
from typing import Optional, List |
|
from .models import User, FileUpload, Opportunity |
|
from bson import Binary, ObjectId |
|
import os |
|
import json |
|
|
|
|
|
MONGO_URI = os.getenv("MONGODB_URI", "mongodb://localhost:27017") |
|
DB_NAME = os.getenv("MONGODB_DB", "aithon") |
|
|
|
client = AsyncIOMotorClient(MONGO_URI) |
|
db = client[DB_NAME] |
|
|
|
|
|
users_collection = db.users |
|
files_collection = db.files |
|
opportunities_collection = db.opportunities |
|
|
|
async def get_user_by_username(username: str) -> Optional[User]: |
|
""" |
|
Retrieve a user by username |
|
""" |
|
user_doc = await users_collection.find_one({"username": username}) |
|
if user_doc: |
|
return User( |
|
username=user_doc["username"], |
|
email=user_doc["email"], |
|
password=user_doc["password"] |
|
) |
|
return None |
|
|
|
async def get_user_by_email(email: str) -> Optional[User]: |
|
""" |
|
Retrieve a user by email |
|
""" |
|
user_doc = await users_collection.find_one({"email": email}) |
|
if user_doc: |
|
return User( |
|
username=user_doc["username"], |
|
email=user_doc["email"], |
|
password=user_doc["password"] |
|
) |
|
return None |
|
|
|
async def create_user(user: User) -> bool: |
|
""" |
|
Create a new user |
|
Returns True if successful, False if user already exists |
|
""" |
|
try: |
|
|
|
if await get_user_by_username(user.username) or await get_user_by_email(user.email): |
|
return False |
|
|
|
user_doc = { |
|
"username": user.username, |
|
"email": user.email, |
|
"password": user.password, |
|
"created_at": datetime.datetime.now(datetime.UTC) |
|
} |
|
|
|
await users_collection.insert_one(user_doc) |
|
return True |
|
except Exception as e: |
|
print(f"Error creating user: {e}") |
|
return False |
|
|
|
async def save_file(username: str, records: any, filename: str) -> bool: |
|
""" |
|
Save a file to the database |
|
""" |
|
try: |
|
current_time = datetime.datetime.now(datetime.UTC) |
|
file_doc = { |
|
"username": username, |
|
"filename": filename, |
|
"content": records, |
|
"created_at": current_time, |
|
"updated_at": current_time, |
|
"file_type": filename.split('.')[-1] if '.' in filename else 'unknown' |
|
} |
|
|
|
|
|
result = await files_collection.update_one( |
|
{"username": username, "filename": filename}, |
|
{"$set": { |
|
**file_doc, |
|
"updated_at": current_time |
|
}}, |
|
upsert=True |
|
) |
|
|
|
async for content in records: |
|
opportunity = Opportunity( |
|
opportunityId=content["Opportunity ID"], |
|
opportunityName=content["Opportunity Name"], |
|
opportunityState=content["Opportunity Stage"], |
|
opportunityValue=content["Opportunity Value"], |
|
customerName=content["Customer Name"], |
|
customerContact=content["Customer Contact"], |
|
customerContactRole=content["Customer Contact Role"], |
|
nextSteps=content["Next Steps"], |
|
opportunityDescription=content["Opportunity Description"], |
|
activity=content["Activity"], |
|
closeDate=content["Close Date"], |
|
created_at=current_time, |
|
updated_at=current_time, |
|
username=username |
|
) |
|
await create_opportunity(opportunity) |
|
|
|
return bool(result.modified_count or result.upserted_id) |
|
except Exception as e: |
|
print(f"Error saving file: {e}") |
|
return False |
|
|
|
async def get_user_files(username: str) -> List[FileUpload]: |
|
""" |
|
Retrieve all files belonging to a user |
|
""" |
|
try: |
|
cursor = files_collection.find({"username": username}) |
|
files = [] |
|
async for doc in cursor: |
|
files.append( |
|
FileUpload( |
|
filename=doc["filename"], |
|
content=doc["content"], |
|
created_at=doc["created_at"], |
|
updated_at=doc["updated_at"] |
|
) |
|
) |
|
return files |
|
except Exception as e: |
|
print(f"Error retrieving files: {e}") |
|
return [] |
|
|
|
async def delete_file(username: str, filename: str) -> bool: |
|
""" |
|
Delete a file from the database |
|
""" |
|
try: |
|
result = await files_collection.delete_one({ |
|
"username": username, |
|
"filename": filename |
|
}) |
|
return bool(result.deleted_count) |
|
except Exception as e: |
|
print(f"Error deleting file: {e}") |
|
return False |
|
|
|
async def get_file_by_name(username: str, filename: str) -> Optional[FileUpload]: |
|
""" |
|
Retrieve a specific file by username and filename |
|
""" |
|
try: |
|
doc = await files_collection.find_one({ |
|
"username": username, |
|
"filename": filename |
|
}) |
|
if doc: |
|
return FileUpload( |
|
filename=doc["filename"], |
|
content=doc["content"].decode() if isinstance(doc["content"], Binary) else str(doc["content"]), |
|
created_at=doc["created_at"], |
|
updated_at=doc["updated_at"] |
|
) |
|
return None |
|
except Exception as e: |
|
print(f"Error retrieving file: {e}") |
|
return None |
|
|
|
async def update_user(username: str, update_data: dict) -> bool: |
|
""" |
|
Update user information |
|
""" |
|
try: |
|
result = await users_collection.update_one( |
|
{"username": username}, |
|
{"$set": { |
|
**update_data, |
|
"updated_at": datetime.utcnow() |
|
}} |
|
) |
|
return bool(result.modified_count) |
|
except Exception as e: |
|
print(f"Error updating user: {e}") |
|
return False |
|
|
|
|
|
async def get_opportunities(username: str, skip: int = 0, limit: int = 100) -> List[Opportunity]: |
|
""" |
|
Retrieve opportunities belonging to a user with pagination |
|
""" |
|
cursor = opportunities_collection.find({"username": username}).skip(skip).limit(limit) |
|
opportunities = await cursor.to_list(length=None) |
|
return [Opportunity(**doc) for doc in opportunities] |
|
|
|
|
|
async def get_opportunity_count(username: str) -> int: |
|
""" |
|
Get the total number of opportunities for a user |
|
""" |
|
return await opportunities_collection.count_documents({"username": username}) |
|
|
|
|
|
async def create_opportunity(opportunity: Opportunity) -> bool: |
|
""" |
|
Create a new opportunity |
|
""" |
|
|
|
|
|
print("opportunity********", opportunity) |
|
await opportunities_collection.insert_one(opportunity.model_dump()) |
|
return True |
|
|
|
|
|
async def create_indexes(): |
|
""" |
|
Create necessary indexes for the collections |
|
""" |
|
try: |
|
|
|
await users_collection.create_index("username", unique=True) |
|
await users_collection.create_index("email", unique=True) |
|
|
|
|
|
await files_collection.create_index([("username", 1), ("filename", 1)], unique=True) |
|
await files_collection.create_index("created_at") |
|
await files_collection.create_index("updated_at") |
|
|
|
|
|
await opportunities_collection.create_index("username") |
|
await opportunities_collection.create_index("created_at") |
|
await opportunities_collection.create_index("updated_at") |
|
|
|
return True |
|
except Exception as e: |
|
print(f"Error creating indexes: {e}") |
|
return False |
|
|
|
|
|
|
|
|
|
|