import datetime from io import StringIO from random import sample from collections import defaultdict from streamlit import progress as st_progress from streamlit.elements import WIDGETS as ST_WIDGETS from utilities_language_general.esp_constants import st from utilities_language_bert.esp_sentence_bert import TASK from utilities_language_bert.esp_sentence_bert import SENTENCE from utilities_language_general.esp_utils import prepare_tasks from utilities_language_general.esp_constants import load_bert from streamlit.runtime.uploaded_file_manager import UploadedFile import utilities_language_general.esp_constants as esp_constants from utilities_language_general.esp_utils import prepare_target_words from utilities_language_general.esp_utils import compute_frequency_dict from utilities_language_general.esp_constants import BAD_USER_TARGET_WORDS def main_workflow( file: UploadedFile or None, text: str, logs: ST_WIDGETS, progress: st_progress, progress_d: st_progress, level: str, tw_mode_automatic_mode: str, target_words: str, num_distractors: int, save_name: str, global_bad_target_words=BAD_USER_TARGET_WORDS): """ This is the main course of the program. All processes and changes take place here. Partially works with the interface, displaying the success messages and download buttons. :param file: user's file to generate tasks in :param text: user's text input to generate tasks in :param logs: widget to output logs to :param progress: progress bar :param progress_d: sentences progress bar :param target_words: how target words are chosen: by user or automatically :param tw_mode_automatic_mode: :param level: user's specification of CEFR level of text :param num_distractors: how many distractors does the user want the task to contain :param save_name: user specifies name to save file in cloud :param global_bad_target_words: global bad target words :return: Dictionary with output data: filename, amount_mode, text_with_gaps, tasks_as_list, correct_answers, student_out, teacher_out, total_out, original_text """ # Clear bad target_words each time if global_bad_target_words: global_bad_target_words = [] # Define main global variables GLOBAL_DISTRACTORS = set() MAX_FREQUENCY = 0 logs.update(label='Загружаем языковые модели и другие данные', state='running') mask_filler = load_bert() # Get input text if file is not None: stringio = StringIO(file.getvalue().decode("utf-8")) current_text = stringio.read() elif text != '': current_text = text else: esp_constants.st.warning('Вы и текст не вставили, и файл не выбрали 😢') current_text = '' esp_constants.st.stop() # Process target words if tw_mode_automatic_mode == 'Самостоятельно': if target_words == '': esp_constants.st.warning('Вы не ввели целевые слова') esp_constants.st.stop() # Cannot make up paradigm, so only USER_TARGET_WORDS is used USER_TARGET_WORDS = prepare_target_words(target_words) tw_mode_automatic_mode = False else: USER_TARGET_WORDS = None tw_mode_automatic_mode = True # Text preprocessing original_text = current_text current_text = (current_text.replace('.', '. ').replace('. . .', '...') .replace(' ', ' ').replace('…', '...').replace('…', '...') .replace('—', '-').replace('\u2014', '-').replace('—', '-') .replace('-\n', '').replace('\n', '%^&*')) current_text_sentences = [sent.text.strip() for sent in esp_constants.nlp(current_text).sents] logs.update(label='Получили Ваш текст!', state='running') progress.progress(10) # Compute frequency dict FREQ_DICT = compute_frequency_dict(current_text) # Get maximum frequency (top 5% barrier) _frequency_barrier_percent = 0.05 for j, tp in enumerate(FREQ_DICT.items()): if j < len(FREQ_DICT) * _frequency_barrier_percent: MAX_FREQUENCY = tp[1] MAX_FREQUENCY = 3 if MAX_FREQUENCY < 3 else MAX_FREQUENCY logs.update(label="Посчитали немного статистики!", state='running') progress.progress(15) # Choose necessary language minimum according to user's input if level == 'A1': target_minimum = esp_constants.a1_target_set distractor_minimum = esp_constants.a1_distractor_set elif level == 'A2': target_minimum = esp_constants.a2_target_set distractor_minimum = esp_constants.a2_distractor_set elif level == 'B1': target_minimum = esp_constants.b1_target_set distractor_minimum = esp_constants.b1_distractor_set elif level == 'B2': target_minimum = esp_constants.b2_target_set distractor_minimum = esp_constants.b2_distractor_set elif level == 'C1': target_minimum = esp_constants.c1_target_set distractor_minimum = esp_constants.c1_distractor_set elif level == 'C2': target_minimum = esp_constants.c2_target_set distractor_minimum = esp_constants.c2_distractor_set elif level == 'Без уровня': target_minimum = None distractor_minimum = None else: target_minimum = None distractor_minimum = None logs.error('Вы не выбрали языковой уровень!') st.stop() # Start generation process workflow = [SENTENCE(original=sent.strip(), n_sentence=num, max_num_distractors=num_distractors) for num, sent in enumerate(current_text_sentences)] logs.update(label="Запускаем процесс генерации заданий!", state='running') progress.progress(20) for sentence in workflow: sentence.lemmatize_sentence() for sentence in workflow: sentence.bind_phrases() logs.update(label="Подготовили предложения для дальнейшей работы!", state='running') progress.progress(30) for j, sentence in enumerate(workflow): sentence.search_target_words(target_words_automatic_mode=tw_mode_automatic_mode, target_minimum=target_minimum, user_target_words=USER_TARGET_WORDS, frequency_dict=FREQ_DICT) progress.progress(int(30 + (j * (20 / len(workflow))))) progress.progress(50) DUPLICATE_TARGET_WORDS = defaultdict(list) for sentence in workflow: for target_word in sentence.target_words: DUPLICATE_TARGET_WORDS[target_word['lemma']].append(target_word) RESULT_TW = [] for tw_lemma, tw_data in DUPLICATE_TARGET_WORDS.items(): RESULT_TW.append(sample(tw_data, 1)[0]) for sentence in workflow: for target_word in sentence.target_words: if target_word not in RESULT_TW: global_bad_target_words.append(target_word['original_text']) sentence.target_words.remove(target_word) progress.progress(55) logs.update(label='Выбрали слова-пропуски!', state='running') for sentence in workflow: for i, target_word in enumerate(sentence.target_words): temp = current_text_sentences[:] temp[sentence.n_sentence] = target_word['masked_sentence'] sentence.text_with_masked_task = ' '.join(temp).replace('%^&*', '\n') sentence.target_words[i]['text_with_masked_task'] = ' '.join(temp).replace('%^&*', '\n') for sentence in workflow: sentence.filter_target_words(target_words_automatic_mode=tw_mode_automatic_mode) progress.progress(60) RESULT_TASKS = [] for sentence in workflow: for target_word in sentence.target_words: task = TASK(task_data=target_word, max_num_distractors=num_distractors) RESULT_TASKS.append(task) for num, task in enumerate(RESULT_TASKS): task.attach_distractors_to_target_word(model=mask_filler, level_name=level, global_distractors=GLOBAL_DISTRACTORS, distractor_minimum=distractor_minimum, max_frequency=MAX_FREQUENCY) progress_d.progress(num / len(RESULT_TASKS)) logs.update(label=f'Обработали {num}/{len(RESULT_TASKS)} целевых слов!', state='running') logs.update(label=f'Обработали {len(RESULT_TASKS)}/{len(RESULT_TASKS)} целевых слов!', state='running') progress_d.progress(100) progress.progress(70) logs.update(label='Подобрали неправильные варианты!', state='running') for task in RESULT_TASKS: task.inflect_distractors() progress.progress(80) logs.update(label='Просклоняли и проспрягали неправильные варианты!', state='running') for task in RESULT_TASKS: task.sample_distractors(num_distractors=num_distractors) progress.progress(85) RESULT_TASKS = list(filter(lambda t: not t.bad_target_word, RESULT_TASKS)) for task in RESULT_TASKS[::-1]: if task.bad_target_word: RESULT_TASKS.remove(task) # Compute number of final tasks if len(RESULT_TASKS) >= 20: NUMBER_TASKS = 20 else: if len(RESULT_TASKS) >= 15: NUMBER_TASKS = 15 else: if len(RESULT_TASKS) >= 10: NUMBER_TASKS = 10 else: NUMBER_TASKS = len(RESULT_TASKS) RESULT_TASKS = sample(RESULT_TASKS, NUMBER_TASKS) RESULT_TASKS = sorted(RESULT_TASKS, key=lambda t: (t.sentence_number, t.position_in_sentence)) for task in RESULT_TASKS: task.compile_task(max_num_distractors=num_distractors) progress.progress(90) logs.update(label='Отобрали лучшие задания!', state='running') TEXT_WITH_GAPS = [] VARIANTS = [] tasks_counter = 1 for i, sentence in enumerate(current_text_sentences): for task in RESULT_TASKS: if task.sentence_text == sentence: sentence = sentence.replace(task.original_text, f'__________({tasks_counter})') VARIANTS.append(task.variants) tasks_counter += 1 TEXT_WITH_GAPS.append(sentence) del RESULT_TASKS TEXT_WITH_GAPS = ' '.join([sentence for sentence in TEXT_WITH_GAPS]).replace('%^&*', '\n') PREPARED_TASKS = prepare_tasks(VARIANTS) STUDENT_OUT = f'{TEXT_WITH_GAPS}\n\n{"=" * 70}\n\n{PREPARED_TASKS["TASKS_STUDENT"]}' TEACHER_OUT = f'{TEXT_WITH_GAPS}\n\n{"=" * 70}\n\n{PREPARED_TASKS["TASKS_TEACHER"]}\n\n{"=" * 70}\n\n' \ f'{PREPARED_TASKS["KEYS_ONLY"]}' TOTAL_OUT = f'{original_text}\n\n{"$" * 70}\n\n{STUDENT_OUT}\n\n{"=" * 70}\n\n{PREPARED_TASKS["TASKS_TEACHER"]}' \ f'\n\n{"$" * 70}\n\n{PREPARED_TASKS["KEYS_ONLY"]}' logs.update(label='Сейчас все будет готово!', state='running') progress.progress(95) save_name = save_name if save_name != '' else f'{str(datetime.datetime.now())[:-7]}_{original_text[:20]}' out = { 'name': save_name, 'STUDENT_OUT': STUDENT_OUT, 'TEACHER_OUT': TEACHER_OUT, 'TEXT_WITH_GAPS': TEXT_WITH_GAPS, 'TASKS_ONLY': PREPARED_TASKS["RAW_TASKS"], 'KEYS_ONLY': PREPARED_TASKS["KEYS_ONLY"], 'KEYS_ONLY_RAW': PREPARED_TASKS["RAW_KEYS_ONLY"], 'TOTAL_OUT': TOTAL_OUT, 'ORIGINAL': original_text, 'BAD_USER_TARGET_WORDS': sorted(set(global_bad_target_words)) } return out