# backend/app/database.py from motor.motor_asyncio import AsyncIOMotorClient import datetime from typing import Optional, List from .models import User, FileUpload from bson import Binary import os # Get MongoDB connection string from environment variable MONGO_URI = os.getenv("MONGODB_URI", "mongodb://localhost:27017") DB_NAME = os.getenv("MONGODB_DB", "aithon") client = AsyncIOMotorClient(MONGO_URI) db = client[DB_NAME] # Collections users_collection = db.users files_collection = db.files async def get_user_by_username(username: str) -> Optional[User]: """ Retrieve a user by username """ user_doc = await users_collection.find_one({"username": username}) if user_doc: return User( username=user_doc["username"], email=user_doc["email"], password=user_doc["password"] ) return None async def get_user_by_email(email: str) -> Optional[User]: """ Retrieve a user by email """ user_doc = await users_collection.find_one({"email": email}) if user_doc: return User( username=user_doc["username"], email=user_doc["email"], password=user_doc["password"] ) return None async def create_user(user: User) -> bool: """ Create a new user Returns True if successful, False if user already exists """ try: # Check if username or email already exists if await get_user_by_username(user.username) or await get_user_by_email(user.email): return False user_doc = { "username": user.username, "email": user.email, "password": user.password, "created_at": datetime.datetime.now(datetime.UTC) } await users_collection.insert_one(user_doc) return True except Exception as e: print(f"Error creating user: {e}") return False async def save_file(username: str, records: any, filename: str) -> bool: """ Save a file to the database """ try: current_time = datetime.datetime.now(datetime.UTC) file_doc = { "username": username, "filename": filename, "content": records, "created_at": current_time, "updated_at": current_time, "file_type": filename.split('.')[-1] if '.' in filename else 'unknown' } # Update if exists, insert if not result = await files_collection.update_one( {"username": username, "filename": filename}, {"$set": { **file_doc, "updated_at": current_time }}, upsert=True ) return bool(result.modified_count or result.upserted_id) except Exception as e: print(f"Error saving file: {e}") return False async def get_user_files(username: str) -> List[FileUpload]: """ Retrieve all files belonging to a user """ try: cursor = files_collection.find({"username": username}) files = [] async for doc in cursor: files.append( FileUpload( filename=doc["filename"], content=doc["content"], created_at=doc["created_at"], updated_at=doc["updated_at"] ) ) return files except Exception as e: print(f"Error retrieving files: {e}") return [] async def delete_file(username: str, filename: str) -> bool: """ Delete a file from the database """ try: result = await files_collection.delete_one({ "username": username, "filename": filename }) return bool(result.deleted_count) except Exception as e: print(f"Error deleting file: {e}") return False async def get_file_by_name(username: str, filename: str) -> Optional[FileUpload]: """ Retrieve a specific file by username and filename """ try: doc = await files_collection.find_one({ "username": username, "filename": filename }) if doc: return FileUpload( filename=doc["filename"], content=doc["content"].decode() if isinstance(doc["content"], Binary) else str(doc["content"]), created_at=doc["created_at"], updated_at=doc["updated_at"] ) return None except Exception as e: print(f"Error retrieving file: {e}") return None async def update_user(username: str, update_data: dict) -> bool: """ Update user information """ try: result = await users_collection.update_one( {"username": username}, {"$set": { **update_data, "updated_at": datetime.utcnow() }} ) return bool(result.modified_count) except Exception as e: print(f"Error updating user: {e}") return False # Index creation function - call this during application startup async def create_indexes(): """ Create necessary indexes for the collections """ try: # Users indexes await users_collection.create_index("username", unique=True) await users_collection.create_index("email", unique=True) # Files indexes await files_collection.create_index([("username", 1), ("filename", 1)], unique=True) await files_collection.create_index("created_at") await files_collection.create_index("updated_at") return True except Exception as e: print(f"Error creating indexes: {e}") return False # Optional: Add these to your requirements.txt # motor==3.3.1 # pymongo==4.5.0