File size: 4,238 Bytes
d32c69c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import logging
import os
from typing import Optional

from fastapi import Depends, HTTPException, Request, status
from fastapi.security import APIKeyHeader

from src.db.init_db import get_session
from src.db.schemas.models import User as DBUser
from src.schemas.user_schemas import User
from src.utils.logger import Logger

logger = Logger("user_manager", see_time=True, console_log=False)

# Define API key header for authentication
API_KEY_NAME = "X-API-Key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)


async def get_current_user(
    request: Request,
    api_key: Optional[str] = Depends(api_key_header)
) -> Optional[User]:
    """
    Dependency to get the current authenticated user.
    Returns None if no user is authenticated.
    """
    # If no API key is provided, return None (anonymous user)
    if not api_key:
        # Check for API key in query parameters (fallback)
        api_key = request.query_params.get("api_key")
        if not api_key:
            return None
    
    try:
        # In a real application, you'd validate the API key against stored user keys
        # For this example, we'll use a simple lookup using user id
        session = get_session()
        
        try:
            # Simplified example: assume API key is the user_id for demonstration
            # In a real app, you'd do a secure lookup
            try:
                # Check if api_key is actually a string before converting to int
                if isinstance(api_key, str):
                    user_id = int(api_key)
                    db_user = session.query(DBUser).filter(DBUser.user_id == user_id).first()
                else:
                    # Handle the case where api_key is not a string (like Depends object)
                    logger.log_message("API key is not a string", level=logging.ERROR)
                    return None
            except ValueError:
                # If api_key isn't a number, maybe check by username or something else
                logger.log_message(f"API key is not a number: {api_key}", level=logging.ERROR)
                db_user = session.query(DBUser).filter(DBUser.username == api_key).first()
            
            if not db_user:
                logger.log_message("User not found", level=logging.ERROR)
                return None
                
            return User(
                user_id=db_user.user_id,
                username=db_user.username,
                email=db_user.email
            )
            
        finally:
            session.close()
            
    except Exception as e:
        logger.log_message(f"Error authenticating user: {str(e)}", level=logging.ERROR)
        return None

# Function to create a new user
def create_user(username: str, email: str) -> User:
    """Create a new user in the database"""
    session = get_session()
    try:
        # Check if user with this email already exists
        existing_user = session.query(DBUser).filter(DBUser.email == email).first()
        if existing_user:
            return User(
                user_id=existing_user.user_id,
                username=existing_user.username,
                email=existing_user.email
            )
            
        # Create new user
        new_user = DBUser(
            username=username,
            email=email
        )
        session.add(new_user)
        session.commit()
        session.refresh(new_user)
        
        return User(
            user_id=new_user.user_id,
            username=new_user.username,
            email=new_user.email
        )
    
    except Exception as e:
        session.rollback()
        logger.log_message(f"Error creating user: {str(e)}", logging.ERROR)
        raise
    
    finally:
        session.close() 

def get_user_by_email(email: str) -> Optional[User]:
    """Get a user by email"""
    session = get_session()
    try:
        user = session.query(DBUser).filter(DBUser.email == email).first()
        return User(
            user_id=user.user_id,
            username=user.username,
            email=user.email
        )
    except Exception as e:
        logger.log_message(f"Error getting user by email: {str(e)}", logging.ERROR)
        return None