File size: 3,054 Bytes
971ac20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import time
import importlib
from toolbox import trimmed_format_exc, gen_time_str, get_log_folder
from toolbox import CatchException, update_ui, gen_time_str, trimmed_format_exc, is_the_upload_folder
from toolbox import promote_file_to_downloadzone, get_log_folder, update_ui_lastest_msg
import multiprocessing

def get_class_name(class_string):
    import re
    # Use regex to extract the class name
    class_name = re.search(r'class (\w+)\(', class_string).group(1)
    return class_name

def try_make_module(code, chatbot):
    module_file = 'gpt_fn_' + gen_time_str().replace('-','_')
    fn_path = f'{get_log_folder(plugin_name="gen_plugin_verify")}/{module_file}.py'
    with open(fn_path, 'w', encoding='utf8') as f: f.write(code)
    promote_file_to_downloadzone(fn_path, chatbot=chatbot)
    class_name = get_class_name(code)
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    p = multiprocessing.Process(target=is_function_successfully_generated, args=(fn_path, class_name, return_dict))
    # only has 10 seconds to run
    p.start(); p.join(timeout=10)
    if p.is_alive(): p.terminate(); p.join()
    p.close()
    return return_dict["success"], return_dict['traceback']

# check is_function_successfully_generated
def is_function_successfully_generated(fn_path, class_name, return_dict):
    return_dict['success'] = False
    return_dict['traceback'] = ""
    try:
        # Create a spec for the module
        module_spec = importlib.util.spec_from_file_location('example_module', fn_path)
        # Load the module
        example_module = importlib.util.module_from_spec(module_spec)
        module_spec.loader.exec_module(example_module)
        # Now you can use the module
        some_class = getattr(example_module, class_name)
        # Now you can create an instance of the class
        instance = some_class()
        return_dict['success'] = True
        return 
    except:
        return_dict['traceback'] = trimmed_format_exc()
        return
    
def subprocess_worker(code, file_path, return_dict):
    return_dict['result'] = None
    return_dict['success'] = False
    return_dict['traceback'] = ""
    try:
        module_file = 'gpt_fn_' + gen_time_str().replace('-','_')
        fn_path = f'{get_log_folder(plugin_name="gen_plugin_run")}/{module_file}.py'
        with open(fn_path, 'w', encoding='utf8') as f: f.write(code)
        class_name = get_class_name(code)
        # Create a spec for the module
        module_spec = importlib.util.spec_from_file_location('example_module', fn_path)
        # Load the module
        example_module = importlib.util.module_from_spec(module_spec)
        module_spec.loader.exec_module(example_module)
        # Now you can use the module
        some_class = getattr(example_module, class_name)
        # Now you can create an instance of the class
        instance = some_class()
        return_dict['result'] = instance.run(file_path)
        return_dict['success'] = True
    except:
        return_dict['traceback'] = trimmed_format_exc()