| from typing import Dict, Any | |
| import os | |
| import ast | |
| import importlib | |
| import inspect | |
| from aiflows.base_flows import AtomicFlow | |
| class MemoryReadingAtomicFlow(AtomicFlow): | |
| """A flow to read memory from given files. | |
| Any composite flow that uses this flow should have | |
| memory_files: Dict[str, str] which maps memory name to its memory file location in the flow_state | |
| *Input Interface*: | |
| - `memory_files` : name of the Dict which maps the memory name to its file location e.g. | |
| {"plan": "examples/JARVIS/plan.txt"} | |
| *Output_Interface*: | |
| - corresponding memory content, for example, `code_library`. There could be multiple memory content returned. | |
| """ | |
| def __init__(self, **kwargs): | |
| super().__init__(**kwargs) | |
| self.supported_mem_name = ["plan", "logs", "code_library"] | |
| def _check_input_data(self, input_data: Dict[str, Any]): | |
| """input data sanity check""" | |
| assert "memory_files" in input_data, "memory_files not passed to MemoryReadingAtomicFlow" | |
| for mem_name, mem_path in input_data["memory_files"].items(): | |
| assert mem_name in self.supported_mem_name, (f"{mem_name} is not supported in MemoryReadingAtomicFlow, " | |
| f"supported names are: {self.supported_mem_name}") | |
| assert os.path.exists(mem_path), f"{mem_path} does not exist." | |
| assert os.path.isfile(mem_path), f"{mem_path} is not a file." | |
| def _read_text(self, file_location): | |
| with open(file_location, 'r', encoding='utf-8') as file: | |
| content = file.read() | |
| return content | |
| def _get_pyfile_functions_metadata_from_file(self, file_location): | |
| def python_file_path_to_module_name(file_path): | |
| return os.path.basename(file_path).replace('.py', '') | |
| def extract_top_level_function_names(python_file_path): | |
| with open(python_file_path, 'r') as file: | |
| file_content = file.read() | |
| tree = ast.parse(file_content) | |
| functions = filter(lambda node: isinstance(node, ast.FunctionDef), ast.iter_child_nodes(tree)) | |
| return [node.name for node in functions] | |
| def load_module_from_file(file_path): | |
| module_name = python_file_path_to_module_name(file_path) | |
| spec = importlib.util.spec_from_file_location(module_name, file_path) | |
| module = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(module) | |
| return module | |
| def get_function_from_name(function_name, module): | |
| return getattr(module, function_name) | |
| def function_to_dict(function): | |
| if not callable(function): | |
| raise ValueError("Provided object is not a function.") | |
| function_dict = { | |
| "name": function.__name__, | |
| "doc": function.__doc__, | |
| "args": [] | |
| } | |
| signature = inspect.signature(function) | |
| for name, param in signature.parameters.items(): | |
| arg_info = { | |
| "name": name, | |
| "default": param.default if param.default is not inspect.Parameter.empty else None, | |
| "type": str(param.annotation) if param.annotation is not inspect.Parameter.empty else "unknown" | |
| } | |
| function_dict["args"].append(arg_info) | |
| return function_dict | |
| function_names = extract_top_level_function_names(file_location) | |
| module = load_module_from_file(file_location) | |
| functions = [get_function_from_name(name, module) for name in function_names] | |
| return [function_to_dict(function) for function in functions] | |
| def _format_metadata(self, metadata): | |
| lines = [] | |
| for function_data in metadata: | |
| lines.append(f"Function: {function_data['name']}") | |
| lines.append(f"Documentation: {function_data['doc']}") | |
| args = function_data.get('args', []) | |
| if args: | |
| lines.append("Arguments:") | |
| for arg in args: | |
| default = f" (default: {arg['default']})" if arg['default'] is not None else "" | |
| lines.append(f" - {arg['name']} (type: {arg['type']}){default}") | |
| else: | |
| lines.append("Arguments: None") | |
| lines.append("#########") | |
| return '\n'.join(lines) | |
| def _read_py_code_library(self, file_location): | |
| metadata = self._get_pyfile_functions_metadata_from_file(file_location) | |
| if len(metadata) == 0: | |
| return "No functions yet." | |
| formatted_metadata = self._format_metadata(metadata) | |
| return formatted_metadata | |
| def run( | |
| self, | |
| input_data: Dict[str, Any]): | |
| self._check_input_data(input_data) | |
| response = {} | |
| for mem_name, mem_path in input_data["memory_files"].items(): | |
| if mem_name in ['plan', 'logs']: | |
| response[mem_name] = self._read_text(mem_path) | |
| elif mem_name == 'code_library' and mem_path.endswith('.py'): | |
| response[mem_name] = self._read_py_code_library(mem_path) | |
| return response | |