File size: 4,179 Bytes
95e89cb 8c3ba2c a742668 8c3ba2c a742668 8c3ba2c a742668 8c3ba2c a742668 b360814 a742668 b360814 a742668 8c3ba2c a742668 8c3ba2c a742668 8c3ba2c a742668 8c3ba2c a742668 1c6cfb4 a742668 8c3ba2c a742668 1c6cfb4 a742668 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
### auth.py
import os
from azure.cosmos import CosmosClient, exceptions
import bcrypt
import base64
def clean_and_validate_key(key):
key = key.strip()
while len(key) % 4 != 0:
key += '='
try:
base64.b64decode(key)
return key
except:
raise ValueError("La clave proporcionada no es válida")
# Azure Cosmos DB configuration
endpoint = os.environ.get("COSMOS_ENDPOINT")
key = os.environ.get("COSMOS_KEY")
if not endpoint or not key:
raise ValueError("Las variables de entorno COSMOS_ENDPOINT y COSMOS_KEY deben estar configuradas")
key = clean_and_validate_key(key)
try:
client = CosmosClient(endpoint, key)
database = client.get_database_client("user_database")
container = database.get_container_client("users")
# Prueba de conexión
database_list = list(client.list_databases())
print(f"Conexión exitosa. Bases de datos encontradas: {len(database_list)}")
except Exception as e:
print(f"Error al conectar con Cosmos DB: {str(e)}")
raise
def hash_password(password):
"""Hash a password for storing."""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(stored_password, provided_password):
"""Verify a stored password against one provided by user"""
return bcrypt.checkpw(provided_password.encode('utf-8'), stored_password.encode('utf-8'))
def register_user(username, password, role, additional_info=None):
"""Register a new user."""
try:
query = f"SELECT * FROM c WHERE c.id = '{username}'"
existing_user = list(container.query_items(query=query, enable_cross_partition_query=True))
if existing_user:
return False # User already exists
new_user = {
'id': username,
'password': hash_password(password),
'role': role,
'additional_info': additional_info or {}
}
# Modificamos esta línea para incluir el partition_key en el cuerpo del documento
new_user['partitionKey'] = username
# Ahora creamos el item sin especificar partition_key como un argumento separado
container.create_item(body=new_user)
return True
except exceptions.CosmosHttpResponseError as e:
print(f"Error al registrar usuario: {str(e)}")
return False
def authenticate_user(username, password):
"""Authenticate a user."""
try:
query = f"SELECT * FROM c WHERE c.id = '{username}'"
results = list(container.query_items(query=query, partition_key=username))
if results:
stored_user = results[0]
if verify_password(stored_user['password'], password):
return True
except exceptions.CosmosHttpResponseError:
pass
return False
def get_user_role(username):
"""Get the role of a user."""
try:
query = f"SELECT c.role FROM c WHERE c.id = '{username}'"
results = list(container.query_items(query=query, partition_key=username))
if results:
return results[0]['role']
except exceptions.CosmosHttpResponseError:
pass
return None
def update_user_info(username, new_info):
"""Update user information."""
try:
query = f"SELECT * FROM c WHERE c.id = '{username}'"
results = list(container.query_items(query=query, partition_key=username))
if results:
user = results[0]
user['additional_info'].update(new_info)
container.upsert_item(user, partition_key=username)
return True
except exceptions.CosmosHttpResponseError:
pass
return False
def delete_user(username):
"""Delete a user."""
try:
query = f"SELECT * FROM c WHERE c.id = '{username}'"
results = list(container.query_items(query=query, partition_key=username))
if results:
user = results[0]
container.delete_item(item=user['id'], partition_key=username)
return True
except exceptions.CosmosHttpResponseError:
pass
return False |