Kudasai / kudasai.py
Bikatr7's picture
moved to beta 3.4.0
ba92d48 verified
raw history blame
No virus
14.3 kB
## built-in libraries
import os
import sys
import json
import asyncio
import re
import typing
## third-party libraries
from kairyou import Kairyou
from kairyou import Indexer
from kairyou.types import NameAndOccurrence
## custom modules
from models.kaiseki import Kaiseki
from models.kijiku import Kijiku
from handlers.json_handler import JsonHandler
from modules.common.toolkit import Toolkit
from modules.common.file_ensurer import FileEnsurer
from modules.common.logger import Logger
##-------------------start-of-Kudasai---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
class Kudasai:
"""
Kudasai class is the main class for the Kudasai program. It handles all logic for CLI & Console versions of Kudasai.
"""
connection:bool
text_to_preprocess:str
replacement_json:dict
need_to_run_kairyou:bool = True
##-------------------start-of-boot()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@staticmethod
def boot() -> None:
"""
Does some logging and sets up the console window, regardless of whether the user is running the CLI, WebGUI, or Console version of Kudasai.
"""
os.system("title " + "Kudasai")
Toolkit.clear_console()
FileEnsurer.setup_needed_files()
Logger.log_barrier()
Logger.log_action("Kudasai started")
Logger.log_action("Current version: " + Toolkit.CURRENT_VERSION)
Logger.log_barrier()
try:
with open(FileEnsurer.config_kijiku_rules_path, "r") as kijiku_rules_file:
JsonHandler.current_kijiku_rules = json.load(kijiku_rules_file)
JsonHandler.validate_json()
assert JsonHandler.current_kijiku_rules != FileEnsurer.INVALID_KIJIKU_RULES_PLACEHOLDER
except:
print("Invalid kijiku_rules.json file. Please check the file for errors. If you are unsure, delete the file and run Kudasai again. Your kijiku rules file is located at: " + FileEnsurer.config_kijiku_rules_path)
Toolkit.pause_console()
raise Exception("Invalid kijiku_rules.json file. Please check the file for errors. If you are unsure, delete the file and run Kudasai again. Your kijiku rules file is located at: " + FileEnsurer.config_kijiku_rules_path)
##-------------------start-of-run_kairyou_indexer()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@staticmethod
def run_kairyou_indexer(text_to_index:str, replacement_json:typing.Union[dict,str]) -> typing.Tuple[str, str]:
"""
Runs the Kairyou Indexer.
Parameters:
text_to_index (str): The text to index.
replacement_json (dict): The replacement json.
Returns:
text_to_index (str): The indexed text.
indexing_log (str): The log of the indexing process.
"""
Toolkit.clear_console()
knowledge_base = input("Please enter the path to the knowledge base you would like to use for the indexer (can be text, a path to a txt file, or a path to a directory of txt files):\n").strip('"')
## unique names is a list of named tuples, with the fields name and occurrence
unique_names, indexing_log = Indexer.index(text_to_index, knowledge_base, replacement_json)
## for each name in unique_names, we need to replace that name in the text_to_process with >>>name<<<
## but since it returns the occurrence of the name, we only need to replace that occurrence of the name in the text_to_process
## So if a name has 42 occurrences, but only the 3rd and 4th occurrence were flagged, we only need to replace the 3rd and 4th occurrence of the name in the text_to_process
text_to_index = Kudasai.mark_indexed_names(text_to_index, unique_names)
return text_to_index, indexing_log
##-------------------start-of-mark_indexed_names()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@staticmethod
def mark_indexed_names(text:str, unique_names:typing.List[NameAndOccurrence]) -> str:
"""
Marks indexed names in the text.
This does not mark all names, but rather the specific occurrences of the names that were flagged by the indexer.
Parameters:
text (str): The text to mark.
unique_names (list - NameAndOccurrence): The list of unique names.
Returns:
str: The marked text.
"""
for name_tuple in unique_names:
name = name_tuple.name
pattern = re.compile(re.escape(name)) ## Prepare regex pattern, escaping the name to handle special characters
current_pos = 0
new_text = ""
last_end = 0
for match in pattern.finditer(text):
current_pos += 1
if(current_pos == name_tuple.occurrence):
new_text += text[last_end:match.start()] + f">>>{name}<<<"
last_end = match.end()
new_text += text[last_end:] # Append the rest of the text
text = new_text
return text
##-------------------start-of-run_kudasai()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@staticmethod
async def run_kudasai() -> None:
"""
Runs the Kudasai program. Used for CLI and Console versions of Kudasai. Not used for the WebGUI version of Kudasai.
"""
Kudasai.handle_update_check()
if(Kudasai.need_to_run_kairyou):
indexing_log = ""
if(Kudasai.replacement_json not in ["", FileEnsurer.blank_rules_path, FileEnsurer.standard_read_json(FileEnsurer.blank_rules_path)] and input("Would you like to use Kairyou's Indexer to index the preprocessed text? (1 for yes, 2 for no)\n") == "1"):
Kudasai.text_to_preprocess, indexing_log = Kudasai.run_kairyou_indexer(Kudasai.text_to_preprocess, Kudasai.replacement_json)
preprocessed_text, preprocessing_log, error_log = Kairyou.preprocess(Kudasai.text_to_preprocess, Kudasai.replacement_json)
## Need to set this so auto-translation can use the preprocessed text
Kudasai.text_to_preprocess = preprocessed_text
## add index log to preprocessing log
if(indexing_log != ""):
preprocessing_log = indexing_log + "\n\n" + preprocessing_log
print(preprocessing_log)
timestamp = Toolkit.get_timestamp(is_archival=True)
FileEnsurer.write_kairyou_results(preprocessed_text, preprocessing_log, error_log, timestamp)
Toolkit.pause_console("\nPress any key to continue to Auto-Translation...")
Toolkit.clear_console()
else:
print("(Preprocessing skipped)")
await Kudasai.determine_autotranslation_module()
Toolkit.pause_console("\nPress any key to exit...")
##-------------------start-of-handle_update_check()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@staticmethod
def handle_update_check() -> None:
"""
Checks for updates and prompts the user to update if there is an update available.
"""
Kudasai.connection, update_prompt = Toolkit.check_update()
if(update_prompt != ""):
print(update_prompt)
Toolkit.pause_console()
Toolkit.clear_console()
##-------------------start-of-determine_autotranslation_module()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@staticmethod
async def determine_autotranslation_module() -> None:
"""
If the user is running the CLI or Console version of Kudasai, this function is called to determine which autotranslation module to use.
"""
if(not Kudasai.connection):
Toolkit.clear_console()
print("You are not connected to the internet. Please connect to the internet to use the autotranslation feature.\n")
Toolkit.pause_console()
exit()
pathing = ""
pathing_msg = "Please select an auto-translation module:\n\n1.Kaiseki (deepL)\n2.Kijiku (OpenAI/Gemini)\n3.Exit\n\n"
pathing = input(pathing_msg)
Toolkit.clear_console()
if(pathing == "1"):
Kudasai.run_kaiseki()
elif(pathing == "2"):
await Kudasai.run_kijiku()
else:
Toolkit.clear_console()
exit()
##-------------------start-of-run_kaiseki()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@staticmethod
def run_kaiseki() -> None:
"""
If the user is running the CLI or Console version of Kudasai, this function is called to run the Kaiseki module.
"""
Logger.log_action("--------------------")
Logger.log_action("Kaiseki started")
Logger.log_action("--------------------")
Kaiseki.text_to_translate = [line for line in Kudasai.text_to_preprocess.splitlines()]
Kaiseki.translate()
Toolkit.clear_console()
print(Kaiseki.translation_print_result)
Kaiseki.write_kaiseki_results()
##-------------------start-of-run_kijiku()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@staticmethod
async def run_kijiku() -> None:
"""
If the user is running the CLI or Console version of Kudasai, this function is called to run the Kijiku module.
"""
Logger.log_action("--------------------")
Logger.log_action("Kijiku started")
Logger.log_action("--------------------")
Toolkit.clear_console()
Kijiku.text_to_translate = [line for line in Kudasai.text_to_preprocess.splitlines()]
await Kijiku.translate()
Toolkit.clear_console()
print(Kijiku.translation_print_result)
Kijiku.write_kijiku_results()
##-------------------start-of-main()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
async def main() -> None:
"""
The main function of the Kudasai program.
"""
try:
Kudasai.boot()
Toolkit.clear_console()
if(len(sys.argv) <= 1):
await run_console_version()
elif(len(sys.argv) in [2, 3]):
await run_cli_version()
else:
print_usage_statement()
except Exception as e:
FileEnsurer.handle_critical_exception(e)
##-------------------start-of-run_console_version()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
async def run_console_version():
"""
Runs the console version of Kudasai.
"""
try:
path_to_text_to_preprocess = input("Please enter the path to the input file to be processed:\n").strip('"')
Kudasai.text_to_preprocess = FileEnsurer.standard_read_file(path_to_text_to_preprocess)
Toolkit.clear_console()
path_to_replacement_json = input("Please enter the path to the replacement json file:\n").strip('"')
Kudasai.replacement_json = FileEnsurer.standard_read_json(path_to_replacement_json if path_to_replacement_json else FileEnsurer.blank_rules_path)
Toolkit.clear_console()
except Exception as e:
print_usage_statement()
raise e
await Kudasai.run_kudasai()
Logger.push_batch()
##-------------------start-of-run_cli_version()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
async def run_cli_version():
"""
Runs the CLI version of Kudasai.
"""
try:
Kudasai.text_to_preprocess = FileEnsurer.standard_read_file(sys.argv[1].strip('"'))
Kudasai.replacement_json = FileEnsurer.standard_read_json(sys.argv[2].strip('"') if(len(sys.argv) == 3) else FileEnsurer.blank_rules_path)
except Exception as e:
print_usage_statement()
raise e
if(len(sys.argv) == 2):
Kudasai.need_to_run_kairyou = False
await Kudasai.run_kudasai()
Logger.push_batch()
##-------------------start-of-print_usage_statement()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
def print_usage_statement():
"""
Prints the usage statement for the CLI version of Kudasai.
"""
Logger.log_action("Usage: python Kudasai.py <input_file> <replacement_json>", output=True, omit_timestamp=True)
Logger.log_action("or run Kudasai.py without any arguments to run the console version.", output=True, omit_timestamp=True)
print("\n")
##-------------------start-of-submain()---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
if(__name__ == '__main__'):
asyncio.run(main())