import re import secrets import pandas as pd import streamlit as st from trycourier import Courier from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError ph = PasswordHasher() def check_usr_pass(user_log_in_database, user_name: str, password: str) -> bool: """ Authenticates the user_name and password. """ registered_user = user_log_in_database.fetch({'user_name': user_name}).items try: passwd_verification_bool = ph.verify(registered_user[0]['password'], password) if passwd_verification_bool: return True return False except VerifyMismatchError: pass return False def check_valid_name(name_sign_up: str) -> bool: """ Checks if the user entered a valid name while creating the account. """ name_regex_eng = r'^[A-Za-z_]\w *' name_regex_rus = r'^[А-Яа-я_][А-Яа-я0-9_] *' if re.search(name_regex_eng, name_sign_up) or re.search(name_regex_rus, name_sign_up): return True return False def check_valid_email(email_sign_up: str) -> bool: """ Checks if the user entered a valid e-mail while creating the account. """ regex = re.compile(r'([A-Za-z0-9]+[.-_])*[A-Za-z0-9]+@[A-Za-z0-9-]+(\.[A-Z|a-z]{2,})+') if re.fullmatch(regex, email_sign_up): return True return False def check_unique_email(user_log_in_database, email_sign_up: str) -> bool: """ Checks if the e-mail already exists (since e-mail needs to be unique). """ authorized_users_data = user_log_in_database.fetch({'e-mail': email_sign_up}).items if len(authorized_users_data) == 0: return True return False def non_empty_str_check(user_name_sign_up: str) -> bool: """ Checks for non-empty strings. """ empty_count = 0 for i in user_name_sign_up: if i == ' ': empty_count = empty_count + 1 if empty_count == len(user_name_sign_up): return False if not user_name_sign_up: return False return True def check_unique_usr(user_log_in_database, user_name_sign_up: str): """ Checks if the user_name already exists (since user_name needs to be unique), also checks for non-empty user_name. """ authorized_users_data = user_log_in_database.fetch({'user_name': user_name_sign_up}).items if len(authorized_users_data) != 0: return False if user_name_sign_up in authorized_users_data: return False non_empty_check = non_empty_str_check(user_name_sign_up) if not non_empty_check: return None return True def register_new_usr(user_log_in_database, name_sign_up: str, email_sign_up: str, user_name_sign_up: str, password_sign_up: str, professional_level: str, timestamp: str) -> None: """ Saves the information of the new user in the _secret_auth.json file. """ new_usr_data = {'user_name': user_name_sign_up, 'name': name_sign_up, 'e-mail': email_sign_up, 'password': ph.hash(password_sign_up), 'professional_level': professional_level, 'time_stamp': timestamp} return user_log_in_database.put(new_usr_data) def check_user_name_exists(user_log_in_database, user_name: str) -> bool: """ Checks if the user_name exists in the _secret_auth.json file. """ authorized_users_data = user_log_in_database.fetch({'user_name': user_name}).items if len(authorized_users_data) == 1: return True return False def check_email_exists(user_log_in_database, email_forgot_passwd: str): """ Checks if the e-mail entered is present in the _secret_auth.json file. """ authorized_users_data = user_log_in_database.fetch({'e-mail': email_forgot_passwd}).items if len(authorized_users_data) == 1: return True, authorized_users_data[0]['user_name'] return False, None def generate_random_passwd() -> str: """ Generates a random password to be sent in e-mail. """ password_length = 10 return secrets.token_urlsafe(password_length) def send_passwd_in_email(auth_token: str, user_name_forgot_passwd: str, email_forgot_passwd: str, company_name: str, random_password: str) -> None: """ Triggers an e-mail to the user containing the randomly generated password. """ client = Courier(auth_token=auth_token) client.send_message( message={ "to": { "email": email_forgot_passwd }, "content": { "title": f'{company_name}: Login Password!', "body": f'Hi! {user_name_forgot_passwd},\n\nYour temporary login password is: {random_password}\n\n' + '{{info}}' }, "data": { "info": "Please reset your password at the earliest for security reasons." } } ) def change_passwd(user_log_in_database, email_forgot_passwd: str, random_password: str) -> None: """ Replaces the old password with the newly generated password. """ user_key = user_log_in_database.fetch({'e-mail': email_forgot_passwd}).items[0]['key'] updates = {'password': ph.hash(random_password)} return user_log_in_database.update(updates, user_key) def check_current_passwd(user_log_in_database, email_reset_passwd: str, current_passwd: str = None) -> bool: """ Authenticates the password entered against the user_name when resetting the password. """ authorized_user_data = user_log_in_database.fetch({'e-mail': email_reset_passwd}).items[0] if current_passwd is None: current_passwd = 'b' try: if ph.verify(authorized_user_data['password'], current_passwd): return True except VerifyMismatchError: pass return False def save_data_in_database(user_task_database, save_type, save_name, cefr_level, time_stamp, creator_name=None, generated_result=None, test_taker_name=None, test_taker_answers=None, test_taker_result=None, comments=None, distractor_model=None, allow=False): already_saved_names = user_task_database.fetch({'creator_name': creator_name, 'save_name': save_name, 'cefr_level': cefr_level}).items already_saved_tasks = user_task_database.fetch({'creator_name': creator_name, 'generated_result': generated_result, 'cefr_level': cefr_level}).items already_saved_tests = user_task_database.fetch({'test_taker_name': test_taker_name, 'save_name': save_name, 'cefr_level': cefr_level}).items if save_name == '' and save_type == 'download': save_name = generated_result['name'] if len(already_saved_names) != 0 and save_type == 'download': return st.success('Файл с таким названием уже существует! Введите другое название и повторите попытку.') elif len(already_saved_tasks) != 0 and save_type == 'download': return st.error(f'Вы уже сохраняли эти задания под именем {already_saved_tasks[0]["save_name"]}. ') elif (len(already_saved_tests) != 0 and save_type == 'online_test'): # and int(test_taker_result) == int(already_saved_tests[0]["user_points"]) return st.error('Вы уже решали данный тест!') else: if save_type == 'download': new_save_data = { 'save_type': save_type, 'save_name': save_name, 'cefr_level': cefr_level, 'time_stamp': time_stamp, 'creator_name': creator_name, 'generated_result': generated_result, 'distractor_model': distractor_model } else: new_save_data = { 'save_type': save_type, 'save_name': save_name, 'cefr_level': cefr_level, 'time_stamp': time_stamp, 'creator_name': creator_name, 'test_taker_name': test_taker_name, 'test_taker_answers': test_taker_answers, 'generated_result': generated_result, 'test_taker_result': test_taker_result, 'comments': comments} user_task_database.put(new_save_data) if save_type == 'download': if allow: return st.success('Задания успешно сохранены! Можете переходить на следующие вкладки') elif save_type == 'online_test': return st.success('Ответы успешно сохранены!') def load_user_tasks_data(user_task_database, save_type, creator_name=None, test_taker_name=None): if save_type == 'download': user_data = user_task_database.fetch({'creator_name': creator_name, 'save_type': save_type}).items names = [item['save_name'] for item in user_data] cefr_level = [item['cefr_level'] for item in user_data] time_stamps = [item['time_stamp'] for item in user_data] return_data = pd.DataFrame([names, cefr_level, time_stamps]).transpose() return_data.columns = ['Название', 'Уровень', 'Время создания'] else: user_data = user_task_database.fetch({'test_taker_name': test_taker_name, 'save_type': save_type}).items names = [item['save_name'] for item in user_data] cefr_level = [item['cefr_level'] for item in user_data] time_stamps = [item['time_stamp'] for item in user_data] creator_name = [item['creator_name'] for item in user_data] test_taker_result = [item['test_taker_result'] for item in user_data] return_data = pd.DataFrame([names, cefr_level, test_taker_result, time_stamps, creator_name]).transpose() return_data.columns = ['Название', 'Уровень', 'Оценка', 'Дата прохождения', 'Автор заданий'] return return_data def load_users_particular_task(user_task_database, load_mode, creator_name, save_name, cefr_level,): return_data = user_task_database.fetch({'creator_name': creator_name, 'save_name': save_name, 'save_type': load_mode, 'cefr_level': cefr_level}).items[0]['generated_result'] return return_data