AIdeaText commited on
Commit
a742668
1 Parent(s): 46ac0c4

Create auth.py

Browse files
Files changed (1) hide show
  1. modules/auth.py +104 -0
modules/auth.py ADDED
@@ -0,0 +1,104 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from azure.cosmos import CosmosClient, exceptions
2
+ import bcrypt
3
+ import os
4
+
5
+ # Azure Cosmos DB configuration
6
+ endpoint = os.environ.get("COSMOS_ENDPOINT")
7
+ key = os.environ.get("COSMOS_KEY")
8
+ database_name = "user_database"
9
+ container_name = "users"
10
+
11
+ # Initialize the Cosmos client
12
+ client = CosmosClient(endpoint, key)
13
+ database = client.get_database_client(database_name)
14
+ container = database.get_container_client(container_name)
15
+
16
+ def hash_password(password):
17
+ """Hash a password for storing."""
18
+ return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
19
+
20
+ def verify_password(stored_password, provided_password):
21
+ """Verify a stored password against one provided by user"""
22
+ return bcrypt.checkpw(provided_password.encode('utf-8'), stored_password.encode('utf-8'))
23
+
24
+ def register_user(username, password, role, additional_info=None):
25
+ """Register a new user."""
26
+ try:
27
+ # Check if user already exists
28
+ query = f"SELECT * FROM c WHERE c.id = '{username}'"
29
+ existing_user = list(container.query_items(query=query, enable_cross_partition_query=True))
30
+
31
+ if existing_user:
32
+ return False # User already exists
33
+
34
+ # Create new user document
35
+ new_user = {
36
+ 'id': username,
37
+ 'password': hash_password(password),
38
+ 'role': role,
39
+ 'additional_info': additional_info or {}
40
+ }
41
+
42
+ container.create_item(body=new_user)
43
+ return True
44
+ except exceptions.CosmosHttpResponseError:
45
+ return False
46
+
47
+ def authenticate_user(username, password):
48
+ """Authenticate a user."""
49
+ try:
50
+ query = f"SELECT * FROM c WHERE c.id = '{username}'"
51
+ results = list(container.query_items(query=query, enable_cross_partition_query=True))
52
+
53
+ if results:
54
+ stored_user = results[0]
55
+ if verify_password(stored_user['password'], password):
56
+ return True
57
+ except exceptions.CosmosHttpResponseError:
58
+ pass
59
+
60
+ return False
61
+
62
+ def get_user_role(username):
63
+ """Get the role of a user."""
64
+ try:
65
+ query = f"SELECT c.role FROM c WHERE c.id = '{username}'"
66
+ results = list(container.query_items(query=query, enable_cross_partition_query=True))
67
+
68
+ if results:
69
+ return results[0]['role']
70
+ except exceptions.CosmosHttpResponseError:
71
+ pass
72
+
73
+ return None
74
+
75
+ def update_user_info(username, new_info):
76
+ """Update user information."""
77
+ try:
78
+ query = f"SELECT * FROM c WHERE c.id = '{username}'"
79
+ results = list(container.query_items(query=query, enable_cross_partition_query=True))
80
+
81
+ if results:
82
+ user = results[0]
83
+ user['additional_info'].update(new_info)
84
+ container.upsert_item(user)
85
+ return True
86
+ except exceptions.CosmosHttpResponseError:
87
+ pass
88
+
89
+ return False
90
+
91
+ def delete_user(username):
92
+ """Delete a user."""
93
+ try:
94
+ query = f"SELECT * FROM c WHERE c.id = '{username}'"
95
+ results = list(container.query_items(query=query, enable_cross_partition_query=True))
96
+
97
+ if results:
98
+ user = results[0]
99
+ container.delete_item(item=user, partition_key=username)
100
+ return True
101
+ except exceptions.CosmosHttpResponseError:
102
+ pass
103
+
104
+ return False