ishaq101's picture
feat: chat history, room soft-delete, and user migration to PostgreSQL
08df5ae
import os
import re
import sys
import bcrypt
import dotenv
from src.config.env_constant import EnvFilepath
dotenv.load_dotenv(EnvFilepath.ENVPATH)
# from passlib.hash import bcrypt
from typing import Dict, Any
from src.models.security import ValidatePassword
# from app.utils.decorator import trace_runtime
# @trace_runtime
def validate_password(password: str) -> ValidatePassword:
"""
Validates a password based on specified constraints.
- Minimum 8 characters.
- At least one uppercase letter.
- At least one digit.
- At least one special character.
"""
# Regex untuk mengecek semua kriteria sekaligus
# ^ # Start of string
# (?=.*[A-Z]) # At least one uppercase letter
# (?=.*\d) # At least one digit
# (?=.*[!@#$%^&*()_+\-=\[\]{};':"\\|,.<>/?]) # At least one special character
# .{8,} # Minimum 8 characters
# $ # End of string
regex = r"^(?=.*[A-Z])(?=.*\d)(?=.*[!@#$%^&*()_+\-=\[\]{};':\"\\|,.<>/?]).{8,}$"
if not re.match(regex, password):
# return False
response = ValidatePassword(
status=400,
data=False,
error="Password must be at least 8 characters long and contain at least one uppercase letter, one digit, and one special character."
)
else:
# return True
response = ValidatePassword(
status=200,
data=True,
error=None
)
return response.model_dump()
# @trace_runtime
def hash_password(password: str) -> str:
"""
Menggunakan bcrypt untuk melakukan hashing pada password.
Args:
password (str): Password dalam bentuk teks biasa.
Returns:
str: Password yang sudah di-hash dan di-salt dengan aman.
"""
salt = bytes(os.environ.get('emarcal__bcrypt__salt'), encoding='utf-8')
bpassword = bytes(password, encoding='utf-8')
hashed_password = bcrypt.hashpw(bpassword, salt=salt)
hashed_password = hashed_password.decode('utf-8') # convert byte to str
return hashed_password
# @trace_runtime
def verify_password(password: str, hashed_password: str) -> bool:
"""
Memverifikasi password teks biasa terhadap password yang sudah di-hash.
Args:
password (str): Password dalam bentuk teks biasa yang ingin diverifikasi.
hashed_password (str): Password yang sudah di-hash yang diambil dari database.
Returns:
bool: True jika password cocok, False sebaliknya.
"""
# bpassword = bytes(password, encoding='utf-8')
# print(f"Password to verify: {password}")
# bhashed_password = bytes(hashed_password, encoding='utf-8')
# print(f"Hashed password: {hashed_password}")
try:
if hash_password(password)==hashed_password:
return True
else:
return False
# return bcrypt.checkpw(bpassword, bhashed_password)
except ValueError:
# Menangani kasus di mana format hashed_password tidak valid
return False
import jwt
import uuid
from datetime import datetime
from jwt.exceptions import ExpiredSignatureError, DecodeError
from sqlalchemy import select
from src.db.postgres.connection import AsyncSessionLocal
from src.db.postgres.models import User
# @trace_runtime
def encode_jwt(input:Dict) -> str:
safe_payload = {
k: (str(v) if isinstance(v, (uuid.UUID, datetime)) else v)
for k, v in input.items()
}
encoded_jwt = jwt.encode(safe_payload, os.environ.get("emarcal__jwt__secret_key"), algorithm=os.environ.get("emarcal__jwt__algorithm"))
return encoded_jwt
# @trace_runtime
def decode_jwt(encoded_input:str) -> Any:
decoded_payload = jwt.decode(encoded_input, os.environ.get("emarcal__jwt__secret_key"), algorithms=[os.environ.get("emarcal__jwt__algorithm")])
return decoded_payload
async def get_user(email: str) -> dict | None:
try:
async with AsyncSessionLocal() as session:
result = await session.execute(select(User).where(User.email == email))
user = result.scalars().first()
if user:
return {c.key: getattr(user, c.key) for c in user.__mapper__.column_attrs}
return None
except Exception as E:
print(f"❌ get user error, {E}")
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
raise