Soham Chandratre commited on
Commit
f1495be
1 Parent(s): 204811a

Add application file

Browse files
.gitignore ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ .venv
2
+ .env
Dockerfile ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # read the doc: https://huggingface.co/docs/hub/spaces-sdks-docker
2
+ # you will also find guides on how best to write your Dockerfile
3
+
4
+ FROM python:3.9
5
+
6
+ WORKDIR /code
7
+
8
+ COPY ./requirements.txt /code/requirements.txt
9
+
10
+ RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
11
+
12
+ COPY . .
13
+
14
+ CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7860"]
config/__pycache__/database.cpython-311.pyc ADDED
Binary file (730 Bytes). View file
 
config/database.py ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pymongo
2
+ import gridfs
3
+
4
+
5
+ # Connect to MongoDB with the custom SSL context
6
+ client = pymongo.MongoClient("mongodb+srv://soham:Soham2001@cluster0.pgjgiww.mongodb.net/?retryWrites=true&w=majority", ssl=True)
7
+
8
+ db = client.database
9
+
10
+ # Define collections
11
+ admin_collection = db.admin_users
12
+ user_collection = db.normal_users
13
+ notification_collection = db.notifications
14
+ pothole_image_collection = db.pothole_images
15
+ fs = gridfs.GridFS(db) # Initialize GridFS bucket
main.py ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI
2
+ from routes.route import router
3
+ from fastapi.middleware.cors import CORSMiddleware
4
+ app = FastAPI()
5
+
6
+ app.add_middleware(
7
+ CORSMiddleware,
8
+ allow_origins=["*"], # Allow requests from any origin
9
+ allow_credentials=True,
10
+ allow_methods=["GET", "POST", "PUT", "DELETE"],
11
+ allow_headers=["Authorization", "Content-Type"],
12
+ )
13
+
14
+ app.include_router(router)
model/__pycache__/pothole_model.cpython-311.pyc ADDED
Binary file (1.33 kB). View file
 
model/pothole_model.py ADDED
@@ -0,0 +1,67 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # from ultralyticsplus import YOLO, render_result
2
+ # from PIL import Image
3
+ # import numpy as np
4
+
5
+ # def load_model(image):
6
+ # # image_bytes = image.content
7
+ # model = YOLO('keremberke/yolov8n-pothole-segmentation')
8
+ # model.overrides['conf'] = 0.25
9
+ # model.overrides['iou'] = 0.45
10
+ # model.overrides['agnostic_nms'] = False
11
+ # model.overrides['max_det'] = 1000
12
+
13
+ # # Load image using PIL
14
+ # image = Image.open((image))
15
+ # image_array = np.array(image)
16
+ # # pil_image = pil_image.convert("RGB") # Ensure image is in RGB format
17
+
18
+ # # Convert PIL image to bytes
19
+ # # with io.BytesIO() as output:
20
+ # # pil_image.save(output, format='JPEG')
21
+ # # image_bytes = output.getvalue()
22
+
23
+ # results = model.predict(image_array)
24
+ # for result in results:
25
+ # boxes = result.boxes.xyxy
26
+ # conf = result.boxes.conf
27
+ # cls = result.boxes.cls
28
+ # obj_info = []
29
+ # for i, bbox in enumerate(boxes):
30
+ # label = result.names[int(cls[i])]
31
+ # obj_info.append({
32
+ # "Object": i+1,
33
+ # "Label": label,
34
+ # "Confidence": conf[i],
35
+ # "Bounding Box": bbox
36
+ # })
37
+ # render = render_result(model=model, image=image, result=results[0])
38
+ # if label:
39
+ # print(label)
40
+ # render.show()
41
+ # return label
42
+
43
+
44
+ from PIL import Image
45
+ from io import BytesIO
46
+ from transformers import AutoImageProcessor, AutoModelForImageClassification
47
+
48
+ # Load model
49
+ processor = AutoImageProcessor.from_pretrained("taroii/pothole-detection-model")
50
+ model = AutoModelForImageClassification.from_pretrained("taroii/pothole-detection-model")
51
+
52
+ # Function to predict if an image contains a pothole
53
+ def predict_pothole(image_url):
54
+ image = Image.open(BytesIO(image_url))
55
+ inputs = processor(images=image, return_tensors="pt")
56
+
57
+ # Perform inference
58
+ outputs = model(**inputs)
59
+ logits = outputs.logits
60
+ probabilities = logits.softmax(dim=1)
61
+
62
+ # Get predicted class (0: No pothole, 1: Pothole)
63
+ predicted_class = probabilities.argmax().item()
64
+ confidence = probabilities[0, predicted_class].item()
65
+
66
+ return predicted_class
67
+
pothole-detection-c31a0-firebase-adminsdk-xirxs-4ad5f554ca.json ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ {
2
+ "type": "service_account",
3
+ "project_id": "pothole-detection-c31a0",
4
+ "private_key_id": "4ad5f554ca85b7542a5565af3b7edf63ee2367ed",
5
+ "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCSevQfEy+Rx5zn\nZ6XuYj+6/QjTroD2qL2T2p5ctytKGUqh4mSNHPcZqqKPjWesxo7ThNXqFFzKWFYe\nW4QCctIObe72T6zx/03jL2yhjGg9QhwefCJQ53CRmlnvg7amWEZL7ruNu2ede/ZQ\nsgngBhiqavN6yOYTVQKrxFO2j2NKHEHkBUSUDJtkOVzz2nXJtq4LCkKD8mgqup31\nB8ecEV1x32vGtaq/a1Y9HtMlVG+0Dmj1lL0pwHad8YPgjxaCzWvbhCWD2zF4R6FK\ns8mNIlRb8o42gAC+oLjX2gcktm0hDq+8t+z9nAbHdRFtYXHEFhan+Vw5xt7jvnLT\nqqndGybzAgMBAAECggEAJv+uTDG2+gibuS5qyqeE0Tt6JKWytzkg1CMiA4xO7bkD\nGkukur9J+J7qDe83eZQZxa35qAtI0ySmtQlngaFhVK8nLpPy9zEiYwim0vIHdLvL\nscAaANFFsrZWGINIV0xsVt4WODt7cD3nS//Cyk7FNWYpMfkX7HC3N9Ua9qGBbWLm\nbtsNMR39Wa9e5X5MWCSmIopiSJQq0AEVAaFbjMhcjwpbawket9joWkpz6zv3hSV+\nD9I7brQQVZNv504JfCNHdr4Ztth5Au5pMX3AQnGjGQ+SEv3XFE1QnMuAmmmPfZL9\n8bBygd5QpEJAlYYuOwZjDytpixvDoHf57nmiceALbQKBgQDHECqGax9RizwT7vN4\nRuWVxQ3ohuqoj8q7Z7L9coqsusWU6Jq74eefmW981oyoBPEniEMrM+3CuI2s3DQT\n2YnEtt8MQiwDh9v3lAy09ooZu2uHVhXF0cLW80d/peQu+OKRuDCsQIShOZOTBCLr\nZBXJjpjHC+w/QoN3mpfGf7pBFwKBgQC8YI6Hm5sgjSmdXnhw57jJd/jcupjpjnt2\n6OCpAIM3JYWRNw1X4tppx02tzMpWJSI2KW97bOYmSiTtgVhSOxj6h5rbwN71yNC4\nWZkQsof6vGxdxsb/pFiIOZBrBM8Ac/HgosHkmk6qPwszVsxHk7AiLRT0PpFktkSm\nx8jvtf0ahQKBgQC3ir1bASx4WGQFkR4mOWB4jp+7VaLJ1wM7dmr/63vXtcJ2AV6i\ne7HAY437UiuQxDXCmI4uKIxCLT5cMxFFeB/iJCdWuzCv9LWp+nUY6fT9suR7fbFH\nm5gF73xuQvw5Hzxw1Z7zQ+3GZjsepAK6fablGDjf1qt2zJJBVmY5HQ4T8wKBgQCW\nZfDlzBKadpnXDunSGu+pyqNid4hGUH+6fEuCIuqgSNDPE8kPdgszkPAv5uVUyej6\nnTJotJU71M6O4UoGvTBANawp718TlPUvejl+30s5oN5UMLQIvsoAWRU4nGo4zlk7\nJDbxsVFFE3h50L7gFsX1Q1ELgDjM08kT70Y6PG6LkQKBgQCGVZliwU+D90nWdY5N\ndSpVuE4V5jSUZ2W9jT1nX82Oy0leB4wkGosyUZOnMa8WNF24IJYPRP3U5GPsgAoH\npSwKGZAsuc7HWQ+q9bJ3b6hpWh0umaROhYOl5xm5KQLDII+mUk54SS2GGfWL8EEC\ncf5yQfuvn0pwmGymirQkqguS6g==\n-----END PRIVATE KEY-----\n",
6
+ "client_email": "firebase-adminsdk-xirxs@pothole-detection-c31a0.iam.gserviceaccount.com",
7
+ "client_id": "109748377203744803931",
8
+ "auth_uri": "https://accounts.google.com/o/oauth2/auth",
9
+ "token_uri": "https://oauth2.googleapis.com/token",
10
+ "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
11
+ "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/firebase-adminsdk-xirxs%40pothole-detection-c31a0.iam.gserviceaccount.com",
12
+ "universe_domain": "googleapis.com"
13
+ }
requirement.txt ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ fastapi
2
+ uvicorn
3
+ pydantic
4
+ pymongo[srv]
5
+ passlib[bcrypt]
6
+ python-jose[cryptography]
7
+ python-multipart
8
+ certifi
9
+ firebase-admin
10
+ transformers
11
+ pillow
12
+ torch
routes/__pycache__/route.cpython-311.pyc ADDED
Binary file (14 kB). View file
 
routes/route.py ADDED
@@ -0,0 +1,273 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException,Depends,File, UploadFile
2
+ from fastapi.responses import JSONResponse
3
+ from config.database import admin_collection, user_collection,notification_collection,pothole_image_collection
4
+ from model.pothole_model import predict_pothole
5
+ from utils.auth import create_access_token, hash_password, verify_password, verify_token
6
+ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
7
+ from schema.model import Admin, PoholeInfo, PotInfoById, PotholeFilters, PotholeModel, UpdatePotholeInfo, User, UserLogin, VerifyOtp
8
+ from utils.email_validator import send_activation_email
9
+ import requests
10
+ import uuid
11
+ from bson import ObjectId
12
+ import random
13
+ security_scheme = HTTPBearer()
14
+ router = APIRouter()
15
+ activation_tokens={}
16
+ import firebase_admin
17
+ from firebase_admin import credentials, storage
18
+
19
+ # Initialize Firebase Admin SDK
20
+ cred = credentials.Certificate("pothole-detection-c31a0-firebase-adminsdk-xirxs-4ad5f554ca.json")
21
+ firebase_admin.initialize_app(cred, {
22
+ 'storageBucket': 'pothole-detection-c31a0.appspot.com'
23
+ })
24
+
25
+ def generate_otp():
26
+ return str(random.randint(100000, 999999))
27
+
28
+ #----------------------------------------Admin Collections----------------------------------------
29
+ # Admin Registration Api
30
+ @router.post("/api/admin/registerAdmin", tags=["admin"])
31
+ async def register_admin(admin: Admin):
32
+ hashed_password = hash_password(admin.password)
33
+ admin_dict = admin.dict()
34
+ admin_dict['password'] = hashed_password
35
+ activation_otp = generate_otp()
36
+ admin_dict['otp'] = activation_otp
37
+ result = admin_collection.insert_one(admin_dict)
38
+ return {"message": "Admin created successfully", "user_id": str(result.inserted_id)}
39
+
40
+ #Api to get all admins
41
+ # @router.get("/api/admins/getAllAdmin", tags=["admin"])
42
+ # async def get_all_admins():
43
+ # admins = admin_collection.find({})
44
+ # admin_list = []
45
+ # for admin in admins:
46
+ # admin["_id"] = str(admin["_id"])
47
+ # admin_list.append(admin)
48
+ # return admin_list
49
+
50
+
51
+
52
+
53
+
54
+ #-------------------------------------------User Collections-------------------------------------------
55
+ # User Login API
56
+ @router.post("/api/user/userLogin", tags=["user"])
57
+ async def user_login(userLogin: UserLogin):
58
+ # Check if the provided username and password are for admin
59
+ if userLogin.userName == "admin001" and userLogin.password == "Admin@123":
60
+ collection = admin_collection
61
+ else:
62
+ collection = user_collection
63
+
64
+ user = collection.find_one({"userName": userLogin.userName})
65
+ if not user or not verify_password(userLogin.password, user.get('password', '')):
66
+ raise HTTPException(status_code=401, detail="Incorrect username or password")
67
+
68
+ # Check if the user is verified
69
+ if not user.get('isVerified'):
70
+ raise HTTPException(status_code=403, detail="Account is not verified")
71
+
72
+ # Extract additional user data
73
+ user_data = {
74
+ "email": user.get("email"),
75
+ "username": user.get("userName"),
76
+ "id": str(user.get("_id")),
77
+ "role":user.get("role")
78
+ }
79
+
80
+ token = create_access_token({"sub": userLogin.userName})
81
+ response_data = {"access_token": token, "token_type": "bearer", "user_data": user_data}
82
+ return response_data
83
+
84
+
85
+
86
+ # User Registration Api
87
+ @router.post("/api/user/registerUser", tags=["user"])
88
+ async def create_user(user: User):
89
+ hashed_password = hash_password(user.password)
90
+ user_dict = user.dict()
91
+ user_dict['password'] = hashed_password
92
+ activation_otp = generate_otp()
93
+ user_dict['otp'] = activation_otp
94
+ # Send activation email
95
+ send_activation_email(user.email, activation_otp)
96
+ # result = user_collection.insert_one(user_dict)
97
+ # return {"message": "User created successfully. Activation email sent.",}
98
+ result = user_collection.insert_one(user_dict)
99
+ return {"message": "User created successfully", "userId": str(result.inserted_id)}
100
+
101
+ # Otp Verification Api
102
+ @router.post("/api/user/verifyOtp", tags=["user"])
103
+ async def verify_otp(verifyOtp:VerifyOtp):
104
+ # Check if the user exists with the given email
105
+ user = user_collection.find_one({"email": verifyOtp.email})
106
+ if user is None:
107
+ raise HTTPException(status_code=404, detail="User not found")
108
+ # Check if the provided OTP matches the stored OTP
109
+ if user.get('otp') != verifyOtp.otp:
110
+ raise HTTPException(status_code=400, detail="Invalid OTP")
111
+ # Update the user's isVerified field to True
112
+ user_collection.update_one({"email": verifyOtp.email}, {"$set": {"isVerified": True}})
113
+ return {"message": "OTP verified successfully and user is verified"}
114
+
115
+
116
+ # Api to show the registered users list
117
+ @router.get("/api/user/getAllUsers", tags=["user"], dependencies=[Depends(security_scheme)])
118
+ async def get_all_user(token: HTTPAuthorizationCredentials = Depends(security_scheme)):
119
+ verify_token(token.credentials)
120
+ users = user_collection.find({})
121
+ user_list = []
122
+ for user in users:
123
+ user["_id"] = str(user["_id"]) # Convert ObjectId to string
124
+ user_list.append(user)
125
+ return user_list
126
+
127
+
128
+
129
+
130
+ #-------------------------------------------Pothole Collections-------------------------------------------
131
+
132
+ # Api to submit the information about pothole
133
+ @router.post("/api/information/submitInformation", tags=["pothole"], dependencies=[Depends(security_scheme)])
134
+ async def upload_file(potholeInfo:PoholeInfo,token: HTTPAuthorizationCredentials = Depends(security_scheme)):
135
+ verify_token(token.credentials)
136
+ pothole_info = potholeInfo.dict()
137
+ notification_collection.insert_one(pothole_info)
138
+
139
+ return {"message": "information submitted sucesssuccessfully"}
140
+
141
+
142
+ # Api to update the information about pothole
143
+ @router.put("/api/information/updateInformation", tags=["pothole"], dependencies=[Depends(security_scheme)])
144
+ async def update_pothole_information(update_data: UpdatePotholeInfo, token: HTTPAuthorizationCredentials = Depends(security_scheme)):
145
+ verify_token(token.credentials)
146
+
147
+ try:
148
+ # Update the pothole information in the collection
149
+ result = notification_collection.update_one({"_id": ObjectId(update_data.infoID)}, {"$set": {"status": update_data.status, "assignee": update_data.assignee}})
150
+
151
+ if result.modified_count == 1:
152
+ return {"message": "Pothole information updated successfully"}
153
+ else:
154
+ return {"message": "No changes were made. Pothole information remains unchanged."}
155
+ except Exception as e:
156
+ return JSONResponse(content={"message": f"Error occurred: {str(e)}"}, status_code=500)
157
+
158
+
159
+ # @router.get("/api/information/getAllFiles", tags=["pothole"], dependencies=[Depends(security_scheme)])
160
+ # async def get_all_user(token: HTTPAuthorizationCredentials = Depends(security_scheme)):
161
+ # verify_token(token.credentials)
162
+ # files = pothole_image_collection.find({})
163
+ # file_list = []
164
+ # for file in files:
165
+ # file["_id"] = str(file["_id"]) # Convert ObjectId to string
166
+ # file_list.append(file)
167
+ # return file_list
168
+
169
+
170
+ # Function to upload file to Firebase Storage
171
+ import os
172
+
173
+ async def upload_file_to_firebase(file: UploadFile, token: HTTPAuthorizationCredentials):
174
+ verify_token(token.credentials)
175
+ try:
176
+ # Generate a unique ID for the file
177
+ fileID = uuid.uuid4().hex
178
+ # Extract the filename from the path provided by Flutter
179
+ filename = os.path.basename(file.filename)
180
+ # Get reference to Firebase Storage bucket
181
+ bucket = storage.bucket()
182
+ # Create a blob object with the filename
183
+ blob = bucket.blob(fileID + "-" + filename)
184
+ # Upload the file
185
+ blob.upload_from_file(file.file)
186
+
187
+ blob.make_public()
188
+ # Get the public URL of the uploaded file
189
+ url = blob.public_url
190
+ # Insert the document into the collection with Firebase URL
191
+ pothole_image_collection.insert_one({"fileID": fileID, "url": url, "filename": filename})
192
+ return JSONResponse(content={"message": "file uploaded successfully", "file_id": fileID})
193
+ except Exception as e:
194
+ return JSONResponse(content={"message": f"Error occurred: {str(e)}"}, status_code=500)
195
+
196
+ @router.post("/api/information/fileUpload", tags=["pothole"])
197
+ async def upload_file_api(file: UploadFile = File(...), token: HTTPAuthorizationCredentials = Depends(security_scheme)):
198
+ return await upload_file_to_firebase(file, token)
199
+
200
+
201
+
202
+ # Api to get all information about pothole
203
+ @router.post("/api/information/getAllPotholeInformation", tags=["pothole"], dependencies=[Depends(security_scheme)])
204
+ async def get_all_info_with_filters(filters: PotholeFilters, token: HTTPAuthorizationCredentials = Depends(security_scheme)):
205
+ verify_token(token.credentials)
206
+ # Construct a filter query based on provided parameters
207
+ filter_query = {}
208
+ if filters.userID == "66142a506d8f2a0f116fd613":
209
+ # If userID is the specific ID, do not filter by userID
210
+ pass
211
+ elif filters.userID:
212
+ filter_query["userId"] = filters.userID
213
+ # Always apply the status filter
214
+ if filters.status:
215
+ filter_query["status"] = filters.status
216
+ # Apply the filter query to find relevant pothole information
217
+ pothole_informations = notification_collection.find(filter_query)
218
+ info_list = []
219
+ for info in pothole_informations:
220
+ info["_id"] = str(info["_id"])
221
+ file_id = info.get('fileID')
222
+ if file_id:
223
+ image_data = pothole_image_collection.find_one({"fileID": file_id})
224
+ if image_data:
225
+ info['image'] = image_data.get('url')
226
+ else:
227
+ print("No image data found for file ID:", file_id) # Debugging print
228
+ info_list.append(info)
229
+ return info_list
230
+
231
+
232
+
233
+ # Api to get information about pothole by unique id
234
+ @router.post("/api/information/getPotholeInformationById", tags=["pothole"], dependencies=[Depends(security_scheme)])
235
+ async def get_data_by_id(potHoleInfoById:PotInfoById,token: HTTPAuthorizationCredentials = Depends(security_scheme)):
236
+ verify_token(token.credentials)
237
+ try:
238
+ object_id = ObjectId(potHoleInfoById.infoID)
239
+ data = notification_collection.find_one({"_id": object_id})
240
+ if data:
241
+ data["_id"] = str(data["_id"])
242
+ image_data = pothole_image_collection.find_one({"fileID": data["fileID"]})
243
+ if image_data:
244
+ data["image"] = image_data["url"] # Append image to the response
245
+ return data
246
+ else:
247
+ raise HTTPException(status_code=404, detail="Data not found")
248
+ except:
249
+ raise HTTPException(status_code=400, detail="Invalid ID format")
250
+
251
+
252
+
253
+
254
+ # Api to to verify the given object has pothole or not
255
+ @router.post("/api/information/verifyPothole", tags=["pothole"], dependencies=[Depends(security_scheme)])
256
+ async def verify_pothole(potholeModel: PotholeModel, token: HTTPAuthorizationCredentials = Depends(security_scheme)):
257
+ verify_token(token.credentials)
258
+
259
+ try:
260
+ # Get image bytes from URL
261
+ response = requests.get(potholeModel.image)
262
+ image_bytes = response.content
263
+
264
+ # Pass image bytes to your model function
265
+ results = predict_pothole(image_bytes)
266
+
267
+ if results == 1:
268
+ return JSONResponse(content={"response": "Pothole"})
269
+ else:
270
+ return JSONResponse(content={"response": "notPothole"})
271
+ except Exception as e:
272
+ return JSONResponse(content={"response": f"{e}"})
273
+
schema/__pycache__/model.cpython-311.pyc ADDED
Binary file (3.22 kB). View file
 
schema/__pycache__/todos.cpython-311.pyc ADDED
Binary file (918 Bytes). View file
 
schema/model.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic import BaseModel
2
+ from typing import Optional
3
+
4
+ class Admin(BaseModel):
5
+ name : str
6
+ userName : str
7
+ email :str
8
+ password :str
9
+ phoneNo :str
10
+ otp : int
11
+ isVerified : bool = True
12
+ role:str = 'admin'
13
+
14
+
15
+ class User(BaseModel):
16
+ name : str
17
+ userName : str
18
+ email :str
19
+ password :str
20
+ phoneNo :str
21
+ otp : int
22
+ isVerified : bool = False
23
+ role:str = 'user'
24
+
25
+
26
+ class PoholeInfo(BaseModel):
27
+ userId : str
28
+ name: str
29
+ email: str
30
+ phoneNo :str
31
+ date: str
32
+ description: str
33
+ address: str
34
+ status: str
35
+ assignee: str
36
+ fileID: str
37
+ # isActive: bool
38
+ # version: int
39
+
40
+ class UpdatePotholeInfo(BaseModel):
41
+ infoID: str
42
+ status: str
43
+ assignee: str
44
+
45
+ class UserLogin(BaseModel):
46
+ userName : str
47
+ password : str
48
+
49
+ class VerifyOtp(BaseModel):
50
+ email : str
51
+ otp:str
52
+
53
+ class PotInfoById(BaseModel):
54
+ infoID : str
55
+
56
+ class PotholeModel(BaseModel):
57
+ image : str
58
+
59
+ class PotholeFilters(BaseModel):
60
+ userID: Optional[str]
61
+ status: Optional[str]
utils/__pycache__/auth.cpython-311.pyc ADDED
Binary file (2.53 kB). View file
 
utils/__pycache__/email_validator.cpython-311.pyc ADDED
Binary file (1.91 kB). View file
 
utils/auth.py ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import HTTPException, status
2
+ from fastapi.security import HTTPBearer
3
+ from datetime import datetime, timedelta
4
+ from jose import JWTError, jwt
5
+ import bcrypt
6
+
7
+ security = HTTPBearer()
8
+
9
+ # Secret key for JWT token
10
+ SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
11
+ ALGORITHM = "HS256"
12
+ ACCESS_TOKEN_EXPIRE_MINUTES = 30
13
+
14
+ # Function to generate JWT token
15
+ def create_access_token(data: dict):
16
+ to_encode = data.copy()
17
+ expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
18
+ to_encode.update({"exp": expire})
19
+ encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
20
+ return encoded_jwt
21
+
22
+ # Function to hash a password
23
+ def hash_password(password: str) -> str:
24
+ hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
25
+ return hashed_password
26
+
27
+ # Function to verify a password
28
+ def verify_password(plain_password: str, hashed_password: bytes) -> bool:
29
+ return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password)
30
+
31
+
32
+ def verify_token(token: str):
33
+ try:
34
+ payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
35
+ return payload
36
+ except JWTError:
37
+ raise HTTPException(
38
+ status_code=status.HTTP_401_UNAUTHORIZED,
39
+ detail="Invalid or expired token"
40
+ )
utils/email_validator.py ADDED
@@ -0,0 +1,37 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from email.mime.text import MIMEText
2
+ from email.mime.multipart import MIMEMultipart
3
+ import smtplib
4
+
5
+ # Function to send activation email with HTML content
6
+ def send_activation_email(email: str, activation_otp: int):
7
+ sender_email = "chandratresoham@gmail.com" # Update with your email address
8
+ sender_password = "wbuc okcv hzzn iwyx" # Update with your email password
9
+ # Update with your website URL
10
+
11
+ # HTML content for the email body
12
+ email_body = f"""
13
+ <html>
14
+ <body>
15
+ <p>
16
+ Hello,<br><br>
17
+ You have successfully registered to the system.<br><br>
18
+ Please enter below otp to verify your accout<br><br>
19
+ <p style="font-size:17px; font-weight:bold">{activation_otp}</p><br><br>
20
+ Thank you!<br>
21
+ </p>
22
+ </body>
23
+ </html>
24
+ """
25
+
26
+ # Create MIMEText object with HTML content
27
+ message = MIMEMultipart("alternative")
28
+ message['From'] = sender_email
29
+ message['To'] = email
30
+ message['Subject'] = "Activate your account"
31
+ message.attach(MIMEText(email_body, 'html'))
32
+
33
+ # Connect to SMTP server and send email
34
+ with smtplib.SMTP('smtp.gmail.com', 587) as server: # Update with your SMTP server details
35
+ server.starttls()
36
+ server.login(sender_email, sender_password)
37
+ server.sendmail(sender_email, email, message.as_string())