Spaces:
Runtime error
Runtime error
from datetime import datetime, timedelta | |
from jose import JWTError, jwt, ExpiredSignatureError | |
# from passlib.context import CryptContext | |
class AuthService: | |
def __init__(self, account_repo, secret_key="123"): | |
assert account_repo is not None | |
assert secret_key is not None | |
self.account_repo = account_repo | |
self.secret_key = secret_key | |
self.encode_alg = "HS256" | |
# self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
async def authenticate_user(self, email, password): | |
key, user = self.account_repo.find({"email": email}) | |
if not user: | |
return False | |
assert 'password' in user.keys() | |
if user['password'] == password: | |
return True | |
# if not self.pwd_context.verify(password, user["hashed_password"]): | |
# return False | |
return user | |
async def create_token(self, email): | |
expire = datetime.utcnow() + timedelta(minutes=30) | |
encoded_jwt = jwt.encode( | |
{"sub": email, "exp": expire}, | |
self.secret_key, | |
algorithm=self.encode_alg | |
) | |
return encoded_jwt | |
async def validate_token(self, encoded_token): | |
try: | |
decoded_token = jwt.decode(encoded_token, self.secret_key, algorithms=[self.encode_alg]) | |
key, user = self.account_repo.find({"email": decoded_token['sub']}) | |
return user['email'] | |
except ExpiredSignatureError: | |
return False # Expired | |
except JWTError: | |
return False | |
except Exception as e: | |
raise e | |