File size: 5,205 Bytes
6fbf1d8
 
6ba073a
 
 
bbb15db
6fbf1d8
 
 
 
 
 
 
 
7b3e6d8
 
6fbf1d8
7b3e6d8
 
 
6fbf1d8
 
10d8b43
 
6fbf1d8
 
 
 
10d8b43
 
 
6fbf1d8
 
 
 
 
 
 
 
 
 
6ba073a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6fbf1d8
6ba073a
6fbf1d8
 
6ba073a
 
6fbf1d8
 
 
 
 
 
10d8b43
6fbf1d8
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from typing import Dict, Any
import os
import ast
import importlib
import inspect
from aiflows.base_flows import AtomicFlow


class MemoryReadingAtomicFlow(AtomicFlow):
    """A flow to read memory from given files.

    Any composite flow that uses this flow should have
    memory_files: Dict[str, str] which maps memory name to its memory file location in the flow_state

    *Input Interface*:
    - `memory_files` : name of the Dict which maps the memory name to its file location e.g.
    {"plan": "examples/JARVIS/plan.txt"}

    *Output_Interface*:
    - corresponding memory content, for example, `code_library`. There could be multiple memory content returned.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.supported_mem_name = ["plan", "logs", "code_library"]

    def _check_input_data(self, input_data: Dict[str, Any]):
        """input data sanity check"""
        assert "memory_files" in input_data, "memory_files not passed to MemoryReadingAtomicFlow"

        for mem_name, mem_path in input_data["memory_files"].items():
            assert mem_name in self.supported_mem_name, (f"{mem_name} is not supported in MemoryReadingAtomicFlow, "
                                                         f"supported names are: {self.supported_mem_name}")
            assert os.path.exists(mem_path), f"{mem_path} does not exist."
            assert os.path.isfile(mem_path), f"{mem_path} is not a file."

    def _read_text(self, file_location):
        with open(file_location, 'r', encoding='utf-8') as file:
            content = file.read()
        return content

    def _get_pyfile_functions_metadata_from_file(self, file_location):
        def python_file_path_to_module_name(file_path):
            return os.path.basename(file_path).replace('.py', '')

        def extract_top_level_function_names(python_file_path):
            with open(python_file_path, 'r') as file:
                file_content = file.read()
                tree = ast.parse(file_content)
            functions = filter(lambda node: isinstance(node, ast.FunctionDef), ast.iter_child_nodes(tree))
            return [node.name for node in functions]

        def load_module_from_file(file_path):
            module_name = python_file_path_to_module_name(file_path)
            spec = importlib.util.spec_from_file_location(module_name, file_path)
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)
            return module

        def get_function_from_name(function_name, module):
            return getattr(module, function_name)

        def function_to_dict(function):
            if not callable(function):
                raise ValueError("Provided object is not a function.")

            function_dict = {
                "name": function.__name__,
                "doc": function.__doc__,
                "args": []
            }

            signature = inspect.signature(function)
            for name, param in signature.parameters.items():
                arg_info = {
                    "name": name,
                    "default": param.default if param.default is not inspect.Parameter.empty else None,
                    "type": str(param.annotation) if param.annotation is not inspect.Parameter.empty else "unknown"
                }
                function_dict["args"].append(arg_info)

            return function_dict

        function_names = extract_top_level_function_names(file_location)
        module = load_module_from_file(file_location)
        functions = [get_function_from_name(name, module) for name in function_names]
        return [function_to_dict(function) for function in functions]

    def _format_metadata(self, metadata):
        lines = []
        for function_data in metadata:
            lines.append(f"Function: {function_data['name']}")
            lines.append(f"Documentation: {function_data['doc']}")

            args = function_data.get('args', [])
            if args:
                lines.append("Arguments:")
                for arg in args:
                    default = f" (default: {arg['default']})" if arg['default'] is not None else ""
                    lines.append(f"  - {arg['name']} (type: {arg['type']}){default}")
            else:
                lines.append("Arguments: None")

            lines.append("#########")

        return '\n'.join(lines)
    def _read_py_code_library(self, file_location):
        metadata = self._get_pyfile_functions_metadata_from_file(file_location)
        if len(metadata) == 0:
            return "No functions yet."
        formatted_metadata = self._format_metadata(metadata)
        return formatted_metadata

    def run(
            self,
            input_data: Dict[str, Any]):
        self._check_input_data(input_data)
        response = {}
        for mem_name, mem_path in input_data["memory_files"].items():
            if mem_name in ['plan', 'logs']:
                response[mem_name] = self._read_text(mem_path)
            elif mem_name == 'code_library' and mem_path.endswith('.py'):
                response[mem_name] = self._read_py_code_library(mem_path)
        return response