File size: 3,792 Bytes
b7a7f32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import binascii
import json
import os
import secrets
from hashlib import sha1

from fastapi import Request
from passlib.context import CryptContext
from starlette.exceptions import HTTPException
from starlette.status import HTTP_102_PROCESSING, HTTP_404_NOT_FOUND, HTTP_425_TOO_EARLY

from core.config import settings
from core.db import redis_session_client
from models import User

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


async def create_sesssion_token(user: User, remember_me: bool, request: Request) -> str:
    session_token = secrets.token_hex(nbytes=16)
    expire_time = (
        settings.SESSION_EXPIRE_TIME_EXTENDED
        if remember_me
        else settings.SESSION_EXPIRE_TIME
    )

    active_sessions = await redis_session_client.client.get(
        f"user_{user.id}_sessions", encoding="utf-8"
    )

    if active_sessions:
        active_sessions = json.loads(active_sessions)
    else:
        active_sessions = {"uid": user.id, "sessions": []}

    active_sessions["sessions"].append(
        {
            "token": session_token,
            "ua": request.headers.get("user-agent"),
            "ip": request.client.host,
        }
    )

    data = {
        session_token: user.id,
        f"user_{user.id}_sessions": json.dumps(active_sessions),
    }

    await redis_session_client.client.mset(data)
    await redis_session_client.client.expire(session_token, expire_time)
    return session_token


async def create_2fa_temp_token(user: User, remember_me: bool) -> str:
    session_token = secrets.token_hex(nbytes=16)

    await redis_session_client.client.setex(
        f"two_fa_temp_{session_token}",
        settings.TWO_FA_TIMEOUT * 1000,
        json.dumps({"user": user.id, "remember_me": remember_me}),
    )

    return session_token


async def create_passwordless_create_token() -> str:
    token = secrets.token_hex(nbytes=16)

    await redis_session_client.client.setex(
        f"password_less_{token}",
        settings.PASSWORD_LESS_CREATE_TIMEOUT * 1000,
        "-1",
    )

    return token


async def authorize_passwordless_token(user: User, token: str) -> bool:
    value = await redis_session_client.client.get(
        f"password_less_{token}",
    )

    if value == None:
        raise HTTPException(status_code=HTTP_404_NOT_FOUND,
                            detail="Invalid token!")
    elif int(value) == -1:
        await redis_session_client.client.setex(
            f"password_less_{token}",
            settings.PASSWORD_LESS_CREATE_TIMEOUT * 1000,
            user.id,
        )
        return True


async def verify_passwordless_token(token: str) -> int:
    value = (await redis_session_client.client.get(
        f"password_less_{token}",
    )).decode("UTF-8")

    if value == None:
        raise HTTPException(
            status_code=HTTP_404_NOT_FOUND,
            detail="Invalid token!"
        )

    elif value == "-1":
        raise HTTPException(
            status_code=HTTP_425_TOO_EARLY,
            detail="Waiting for authorization!"
        )
    else:
        await redis_session_client.client.delete(
            f"password_less_{token}",
        )
        return int(value)


async def create_2fa_enable_temp_token(user: User, totp_secret: str):
    await redis_session_client.client.setex(
        f"two_fa_enable_temp_{user.id}",
        settings.TWO_FA_TIMEOUT * 1000,
        totp_secret
    )
    return


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)


def get_uid_hash(uid: str) -> str:
    hasher = sha1()
    hasher.update(bytes(f"uid_{uid}", "utf-8"))
    return hasher.hexdigest()[3:10]