File size: 2,761 Bytes
50553ea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from datetime import timedelta, datetime

import anyio
from fastapi import Depends, HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import jwt, JWTError
from passlib.context import CryptContext

from trauma.api.account.model import AccountModel
from trauma.core.config import settings


def verify_password(plain_password, hashed_password) -> bool:
    result = CryptContext(schemes=["bcrypt"], deprecated="auto").verify(plain_password, hashed_password)
    return result


def create_access_token(email: str, account_id: str):
    payload = {
        "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": email,
        "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": account_id,
        "accountId": account_id,
        "iss": settings.Issuer,
        "aud": settings.Audience,
        "exp": datetime.utcnow() + timedelta(days=30)
    }
    encoded_jwt = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
    return encoded_jwt


class PermissionDependency:
    def __init__(self, is_public: bool = False):
        self.is_public = is_public

    def __call__(
            self, credentials: HTTPAuthorizationCredentials | None = Depends(HTTPBearer(auto_error=False))
    ) -> AccountModel | None:
        if credentials is None:
            if self.is_public:
                return None
            else:
                raise HTTPException(status_code=403, detail="Permission denied")
        try:
            account_id = self.authenticate_jwt_token(credentials.credentials)
            account_data = anyio.from_thread.run(self.get_account_by_id, account_id)
            self.check_account_health(account_data)
            return account_data

        except JWTError:
            raise HTTPException(status_code=403, detail="Permission denied")

    async def get_account_by_id(self, account_id: str) -> AccountModel:
        account = await settings.DB_CLIENT.accounts.find_one({"id": account_id})
        return AccountModel.from_mongo(account)

    @staticmethod
    def check_account_health(account: AccountModel):
        if not account:
            raise HTTPException(status_code=403, detail="Permission denied")

    def authenticate_jwt_token(self, token: str) -> str:
        payload = jwt.decode(token,
                             settings.SECRET_KEY,
                             algorithms="HS256",
                             audience=settings.Audience)
        email: str = payload.get("http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name")
        account_id = payload.get("accountId")

        if email is None or account_id is None:
            raise HTTPException(status_code=403, detail="Permission denied")

        return account_id