| from datetime import datetime, timedelta, timezone |
|
|
| from jose import JWTError, jwt |
| from passlib.context import CryptContext |
|
|
| from app.config import get_settings |
|
|
|
|
| |
| |
| pwd_context = CryptContext( |
| schemes=["pbkdf2_sha256", "bcrypt"], |
| deprecated="auto", |
| ) |
| settings = get_settings() |
|
|
|
|
| def hash_password(password: str) -> str: |
| return pwd_context.hash(password) |
|
|
|
|
| def verify_password(password: str, password_hash: str) -> bool: |
| return pwd_context.verify(password, password_hash) |
|
|
|
|
| def create_access_token(subject: str) -> str: |
| expires_delta = timedelta(minutes=settings.access_token_expire_minutes) |
| expire = datetime.now(timezone.utc) + expires_delta |
| payload = {"sub": subject, "exp": expire} |
| return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm) |
|
|
|
|
| def decode_access_token(token: str) -> str | None: |
| try: |
| payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) |
| subject = payload.get("sub") |
| return str(subject) if subject is not None else None |
| except JWTError: |
| return None |
|
|