File size: 6,874 Bytes
bbf8ddd 585b429 1fa328e ad3e638 1fa328e bf76c5a ad3e638 bf76c5a ad3e638 bf76c5a 86b2b70 1fa328e fddc03f 0b4cbcf d61380b 1fa328e bf76c5a fddc03f 1fa328e bf76c5a 1fa328e d61380b bf76c5a fddc03f bf76c5a fddc03f 1fa328e d61380b 1fa328e ada2aa5 1fa328e ad3e638 ada2aa5 ad3e638 0b4cbcf 1fa328e 0b4cbcf 1fa328e 86b2b70 bf76c5a 1fa328e ada2aa5 bf76c5a d1cbdd7 ada2aa5 0b4cbcf 1fa328e ada2aa5 1fa328e 0b4cbcf ada2aa5 0b4cbcf bf76c5a 1fa328e 0b4cbcf 3ab9131 ada2aa5 1fa328e 3ab9131 1fa328e bf76c5a 1fa328e 0b4cbcf bf76c5a 3ab9131 ada2aa5 1fa328e ada2aa5 1fa328e 86b2b70 ada2aa5 86b2b70 ada2aa5 bf76c5a 3ab9131 ada2aa5 1fa328e ada2aa5 1fa328e 0b4cbcf 1fa328e bf76c5a 1fa328e ada2aa5 1fa328e ada2aa5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
#modules/database/sql_db.py
from .database_init import get_container
from datetime import datetime, timezone
import logging
import bcrypt
import uuid
logger = logging.getLogger(__name__)
def get_user(username, role=None):
container = get_container("users")
try:
# Consulta b谩sica por ID que es el partition key
items = list(container.query_items(
query=f"SELECT * FROM c WHERE c.id = @username",
parameters=[{"name": "@username", "value": username}],
partition_key=username
))
# Si se especifica rol, filtrar por rol
if role and items:
items = [item for item in items if item['role'] == role]
return items[0] if items else None
except Exception as e:
logger.error(f"Error al obtener usuario {username}: {str(e)}")
return None
def get_admin_user(username):
return get_user(username, role='Administrador')
def get_student_user(username):
return get_user(username, role='Estudiante')
def get_teacher_user(username):
return get_user(username, role='Profesor')
def create_user(username, password, role, additional_info=None):
"""Crea un nuevo usuario"""
container = get_container("users")
if not container:
logger.error("No se pudo obtener el contenedor de usuarios")
return False
try:
user_data = {
'id': username, # Este campo es el partition key
'password': password,
'role': role,
'timestamp': datetime.now(timezone.utc).isoformat(),
'additional_info': additional_info or {}
}
# Usar el id como partition key
container.create_item(
body=user_data,
partition_key=username
)
logger.info(f"Usuario {role} creado: {username}")
return True
except Exception as e:
logger.error(f"Error al crear usuario {role}: {str(e)}")
return False
def create_student_user(username, password, additional_info=None):
return create_user(username, password, 'Estudiante', additional_info)
def create_teacher_user(username, password, additional_info=None):
return create_user(username, password, 'Profesor', additional_info)
def create_admin_user(username, password, additional_info=None):
return create_user(username, password, 'Administrador', additional_info)
def record_login(username):
"""Registra el inicio de sesi贸n de un usuario"""
try:
container = get_container("users_sessions")
if not container:
logger.error("No se pudo obtener el contenedor users_sessions")
return None
session_id = str(uuid.uuid4())
session_doc = {
"id": session_id,
"type": "session",
"username": username,
"loginTime": datetime.now(timezone.utc).isoformat(),
"additional_info": {}
}
result = container.create_item(
body=session_doc,
enable_cross_partition_query=True
)
logger.info(f"Sesi贸n {session_id} registrada para {username}")
return session_id
except Exception as e:
logger.error(f"Error registrando login: {str(e)}")
return None
def record_logout(username, session_id):
"""Registra el cierre de sesi贸n y calcula la duraci贸n"""
try:
container = get_container("users_sessions")
if not container:
logger.error("No se pudo obtener el contenedor users_sessions")
return False
items = list(container.query_items(
query="SELECT * FROM c WHERE c.id = @id AND c.username = @username",
parameters=[
{"name": "@id", "value": session_id},
{"name": "@username", "value": username}
],
enable_cross_partition_query=True
))
if not items:
logger.warning(f"Sesi贸n no encontrada: {session_id}")
return False
session = items[0]
login_time = datetime.fromisoformat(session['loginTime'].rstrip('Z'))
logout_time = datetime.now(timezone.utc)
duration = int((logout_time - login_time).total_seconds())
session.update({
"logoutTime": logout_time.isoformat(),
"sessionDuration": duration
})
container.upsert_item(
body=session,
enable_cross_partition_query=True
)
logger.info(f"Sesi贸n {session_id} cerrada para {username}, duraci贸n: {duration}s")
return True
except Exception as e:
logger.error(f"Error registrando logout: {str(e)}")
return False
def get_recent_sessions(limit=10):
"""Obtiene las sesiones m谩s recientes"""
try:
container = get_container("users_sessions")
if not container:
logger.error("No se pudo obtener el contenedor users_sessions")
return []
sessions = list(container.query_items(
query="""
SELECT c.username, c.loginTime, c.logoutTime, c.sessionDuration
FROM c
WHERE c.type = 'session'
ORDER BY c.loginTime DESC
OFFSET 0 LIMIT @limit
""",
parameters=[{"name": "@limit", "value": limit}],
enable_cross_partition_query=True
))
clean_sessions = []
for session in sessions:
try:
clean_sessions.append({
"username": session["username"],
"loginTime": session["loginTime"],
"logoutTime": session.get("logoutTime", "Activo"),
"sessionDuration": session.get("sessionDuration", 0)
})
except KeyError as e:
logger.warning(f"Sesi贸n con datos incompletos: {e}")
continue
return clean_sessions
except Exception as e:
logger.error(f"Error obteniendo sesiones recientes: {str(e)}")
return []
def get_user_total_time(username):
"""Obtiene el tiempo total que un usuario ha pasado en la plataforma"""
try:
container = get_container("users_sessions")
if not container:
return None
result = list(container.query_items(
query="""
SELECT VALUE SUM(c.sessionDuration)
FROM c
WHERE c.type = 'session'
AND c.username = @username
AND IS_DEFINED(c.sessionDuration)
""",
parameters=[{"name": "@username", "value": username}],
enable_cross_partition_query=True
))
return result[0] if result and result[0] is not None else 0
except Exception as e:
logger.error(f"Error obteniendo tiempo total: {str(e)}")
return 0
|