File size: 3,415 Bytes
5421e10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7bb2097
5421e10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67ed1fb
 
 
 
 
6ec4644
67ed1fb
 
 
5421e10
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
from typing import Dict, Any
from flow_modules.Tachi67.CodeFileEditFlowModule import CodeFileEditAtomicFlow

class RunCodeFileEditAtomicFlow(CodeFileEditAtomicFlow):

    def _generate_content(self, code_str, comment_sign) -> str:
        if comment_sign == "<!-- -->":
            instruction = (
                "<!-- The below code will be executed.\n"
                "Change the code if you consider necessary.\n"
                "Shut this vscode session completely to start running the code. -->\n"
            )
        else:
            instruction = (
                f"{comment_sign} The below code will be executed.\n"
                f"{comment_sign} Change the code if you consider necessary.\n"
                f"{comment_sign} Shut this vscode session completely to start running the code.\n"
            )
        content = (
                instruction +
                "###########\n"
                "# Code:\n" +
                code_str +
                "\n############\n"
        )
        return content

    def _generate_temp_file_location(self, code_lib_location, file_extension):
        directory = os.path.dirname(code_lib_location)
        ret = os.path.join(directory, 'run_code_temp'+file_extension)
        return ret

    def _check_input(self, input_data: Dict[str, Any]):
        assert "code" in input_data, "code is not passed to RunCodeFileEditAtomicFlow"
        assert "memory_files" in input_data, "memory_files is not passed to RunCodeFileEditAtomicFlow"
        assert "language" in input_data, "language is not passed to RunCodeFileEditAtomicFlow"

    def _generate_file_extension_and_comment_sign(self, language: str):
        details = {
            "python": {"extension": ".py", "comment": "#"},
            "bash": {"extension": ".sh", "comment": "#"},
            "shell": {"extension": ".sh", "comment": "#"},
            "javascript": {"extension": ".js", "comment": "//"},
            "html": {"extension": ".html", "comment": "<!-- -->"},
            "applescript": {"extension": ".scpt", "comment": "--"},
            "r": {"extension": ".r", "comment": "#"},
            "powershell": {"extension": ".ps1", "comment": "#"}
        }
        if language.lower() not in details:
            raise NotImplemented
        return details.get(language.lower())

    def _generate_input_to_writer(self, input_data: Dict[str, Any]):
        code_str = input_data['code']
        code_lib_location = input_data["memory_files"]["code_library"]
        
        # if we are running python code and if the code is importing the code library after one modification, we need
        # to re-import the library.
        code_lang = input_data["language"].lower()
        code_lib_name = os.path.splitext(os.path.basename(code_lib_location))[0]
        if code_lang == 'python' and code_lib_name in code_str and 'importlib.reload' not in code_str:
            prepend_code = f"import importlib\nimport {code_lib_name}\nimportlib.reload({code_lib_name})\n"
            code_str = prepend_code + code_str
        
        lang_details = self._generate_file_extension_and_comment_sign(input_data["language"])
        content_to_write = self._generate_content(code_str, lang_details["comment"])
        file_location_to_write = self._generate_temp_file_location(code_lib_location, lang_details["extension"])
        return content_to_write, file_location_to_write