import random import asyncio from .db import * from .events import Events from .constants import Constants class ResponseLogic: @staticmethod async def create_response( text: str, name: str, chat_id: int, language_code: str ) -> str: # Edge case check for "add\nSentence" line_split_text = text.split("\n") line_split_first_word = line_split_text[0] split_text = text.split(" ") first_word = split_text[0] user = UserCreate( name=name, telegram_chat_id=chat_id, ) if ResponseLogic.check_command_type( first_word, "add" ) or ResponseLogic.check_command_type(line_split_first_word, "add"): if ResponseLogic.check_command_type(line_split_first_word, "add"): memory = "\n".join(split_text[1:]) else: memory = " ".join(split_text[1:]) if str.isspace(memory) or memory == "": return Constants.Add.no_sentence(name, language_code) else: reminder = add_memory(user, memory) if reminder is None: return Constants.Common.inactive_user(name, language_code) elif reminder is False: return Constants.Add.already_added(name, language_code) else: memory = reminder.reminder return Constants.Add.success(name, language_code, memory) elif ResponseLogic.check_command_type(first_word, "start"): user = join_user(user) await Events.send_a_message_to_user(chat_id, Constants.hello) return Constants.Start.start_message(name, language_code) elif ResponseLogic.check_command_type(first_word, "help"): message = Constants.Help.help_message(name, language_code) return message elif ResponseLogic.check_command_type(first_word, "join"): user = join_user(user) if user is not None: return Constants.Join.successful_join(name, language_code) else: return Constants.Join.already_joined(name, language_code) elif ResponseLogic.check_command_type(first_word, "leave"): user = leave_user(user) if user is not None and user.telegram_chat_id == chat_id: return Constants.Leave.successful_leave(name, language_code) else: return Constants.Leave.already_left(name, language_code) elif ResponseLogic.check_command_type(first_word, "send"): if len(split_text) == 1: # send memory = select_random_memory(user) if memory is None: return Constants.Common.inactive_user(name, language_code) elif memory is False: return Constants.Common.no_memory_found(name, language_code) else: return memory.reminder else: # send number try: number_of_sending = int(split_text[1]) if not (1 <= number_of_sending < 50): return Constants.Send.send_count_out_of_bound( name, language_code ) except: return Constants.Send.send_count_out_of_bound(name, language_code) all_memories = list_memories(user) if len(all_memories) == 0: return Constants.Common.no_memory_found(name, language_code) send_count = min(len(all_memories), number_of_sending) selected_memories = random.sample(all_memories, send_count) for memory in selected_memories: # Send the memories in background asyncio.create_task( Events.send_a_message_to_user( user.telegram_chat_id, memory.reminder ) ) return "" elif ResponseLogic.check_command_type(first_word, "list"): memories = list_memories(user) memory_count = len(memories) if memories is None: return Constants.Common.inactive_user(name, language_code) elif memory_count == 0: return Constants.Common.no_memory_found(name, language_code) else: background_message_list = [] for message_id, reminder in enumerate(memories): background_message_list.append( f"\n{message_id}: {reminder.reminder}" ) asyncio.create_task( Events.send_message_list_at_background( telegram_chat_id=chat_id, message_list=background_message_list ) ) response_message = Constants.List.list_messages( name, memory_count, language_code ) return response_message elif ResponseLogic.check_command_type(first_word, "delete"): if len(split_text) < 2: return Constants.Delete.no_id(name, language_code) else: try: memory_id = int(split_text[1]) if memory_id < 0: return Constants.Delete.no_id(name, language_code) else: response = delete_memory(user, memory_id) if response is None: return Constants.Common.inactive_user(name, language_code) elif response is False: return Constants.Delete.no_id(name, language_code) else: memory = response return Constants.Delete.success(name, language_code, memory) except Exception as ex: return Constants.Delete.no_id(name, language_code) elif ResponseLogic.check_command_type(first_word, "schedule"): if len(split_text) == 1: schedule = get_schedule(user) if schedule is None: return Constants.Common.inactive_user(name, language_code) elif schedule == "": return Constants.Schedule.empty_schedule(name, language_code) else: return Constants.Schedule.success(name, language_code, schedule) elif len(split_text) > 1: if split_text[1] == "reset": new_schedule = reset_schedule(user) if new_schedule is None: return Constants.Common.inactive_user(name, language_code) else: return Constants.Schedule.success( name, language_code, new_schedule ) elif split_text[1] == "add": str_numbers = [] preceding_text = " ".join(split_text[2:]) if str.isspace(preceding_text) or preceding_text == "": return Constants.Schedule.no_number_found(name, language_code) number_check = True try: for str_number in split_text[2:]: number = int(str_number) str_numbers.append(number) if not (0 <= number <= 23): number_check = False break except: return Constants.Schedule.add_incorrect_number_input( name, language_code ) if not number_check: return Constants.Schedule.add_incorrect_number_input( name, language_code ) else: new_schedule = add_hours_to_the_schedule(user, str_numbers) if new_schedule is None: return Constants.Common.inactive_user(name, language_code) else: return Constants.Schedule.success( name, language_code, new_schedule ) elif split_text[1] == "remove": preceding_text = " ".join(split_text[2:]) number_check = True if str.isspace(preceding_text) or preceding_text == "": return Constants.Schedule.remove_incorrect_number_input( name, language_code ) else: try: str_number = split_text[2] number = int(str_number) if not (0 <= number <= 23): number_check = False except: return Constants.Schedule.remove_incorrect_number_input( name, language_code ) if not number_check: return Constants.Schedule.remove_incorrect_number_input( name, language_code ) else: new_schedule = remove_hour_from_schedule( user, int(str_number) ) if new_schedule is None: return Constants.Common.inactive_user( name, language_code ) else: return Constants.Schedule.success( name, language_code, new_schedule ) else: return Constants.Schedule.unknown_command(name, language_code) elif ResponseLogic.check_command_type(first_word, "gmt"): try: gmt = int(split_text[1]) except Exception as ex: return Constants.Gmt.incorrect_timezone(name, language_code) if not (-12 <= gmt <= 12): return Constants.Gmt.incorrect_timezone(name, language_code) else: user = update_gmt(user, gmt) if user is None: return Constants.Common.inactive_user(name, language_code) else: return Constants.Gmt.success(name, language_code, user.gmt) elif ResponseLogic.check_command_type(first_word, "broadcast"): if not chat_id == Constants.BROADCAST_CHAT_ID: return Constants.Broadcast.no_right(name, language_code) else: normalized_text = " ".join(split_text[1:]) if str.isspace(normalized_text) or normalized_text == "": return Constants.Broadcast.no_sentence_found(name, language_code) else: await Events.broadcast_message(normalized_text) return Constants.Broadcast.success(name, language_code) elif ResponseLogic.check_command_type(first_word, "status"): gmt, active = get_user_status(chat_id) if gmt is None: return Constants.Common.inactive_user(name, language_code) else: schedule = get_schedule(user) memory_count = len(list_memories(user)) return Constants.Status.get_status( name, language_code, gmt, active, schedule, memory_count ) elif ResponseLogic.check_command_type(first_word, "feedback"): message = " ".join(split_text[1:]) message_with_user_information = ( message + f"\nFrom the user: *{name}* \nchat id: *{chat_id}*" ) if str.isspace(message) or message == "": return Constants.Feedback.no_message(name, language_code) else: success = await Events.send_a_message_to_user( Constants.FEEDBACK_FORWARD_CHAT_ID, message_with_user_information ) if success: return Constants.Feedback.success(name, language_code, message) else: return Constants.Feedback.fail(name, language_code) elif ResponseLogic.check_command_type(first_word, "deletelast"): response = delete_last_memory(user) if response is None: return Constants.Common.inactive_user(name, language_code) elif response is False: return Constants.Common.no_memory_found(name, language_code) else: return Constants.Delete.success(name, language_code, response) elif ResponseLogic.check_command_type(first_word, "support"): return Constants.Support.support(name, language_code) elif ResponseLogic.check_command_type(first_word, "tutorial1"): return Constants.Tutorial.tutorial_1(name, language_code) elif ResponseLogic.check_command_type(first_word, "tutorial2"): return Constants.Tutorial.tutorial_2(name, language_code) elif ResponseLogic.check_command_type(first_word, "tutorial3"): return Constants.Tutorial.tutorial_3(name, language_code) else: return Constants.Common.unknown_command(name, language_code) @staticmethod def check_command_type(input_command: str, correct_command: str) -> bool: """ Checks the first word of the command, which starts the decision tree. Args: input_command: correct_command: Returns: bool """ input_command = input_command.lower() if ( input_command == correct_command or input_command == f"/{correct_command}" or input_command == f"/{correct_command}@memory_vault_bot" ): return True else: return False