|
from typing import Dict, Any |
|
import os |
|
import ast |
|
import importlib |
|
import inspect |
|
from aiflows.base_flows import AtomicFlow |
|
|
|
|
|
class MemoryReadingAtomicFlow(AtomicFlow): |
|
"""A flow to read memory from given files. |
|
|
|
Any composite flow that uses this flow should have |
|
memory_files: Dict[str, str] which maps memory name to its memory file location in the flow_state |
|
|
|
*Input Interface*: |
|
- `memory_files` : name of the Dict which maps the memory name to its file location e.g. {"plan": "examples/JARVIS/plan.txt"} |
|
|
|
*Output_Interface*: |
|
- corresponding memory content, for example, `code_library`. There could be multiple memory content returned. |
|
|
|
*Configuration Parameters*: |
|
- `input_interface`: input interface of the flow |
|
- `output_interface`: output interface of the flow |
|
|
|
""" |
|
|
|
def __init__(self, **kwargs): |
|
""" |
|
This is the constructor of the :class:`MemoryReadingAtomicFlow` class. |
|
:param kwargs: additional arguments to pass to the :class:`AtomicFlow` constructor |
|
:type kwargs: Dict[str, Any] |
|
""" |
|
super().__init__(**kwargs) |
|
self.supported_mem_name = ["plan", "logs", "code_library"] |
|
|
|
def _check_input_data(self, input_data: Dict[str, Any]): |
|
"""input data sanity check |
|
:param input_data: input data to the flow |
|
:type input_data: Dict[str, Any] |
|
:raises AssertionError: if the input data does not contain the required keys |
|
:raises AssertionError: if the memory file does not exist |
|
:raises AssertionError: if the memory file is not a file |
|
:raises AssertionError: if the memory name is not supported |
|
""" |
|
assert "memory_files" in input_data, "memory_files not passed to MemoryReadingAtomicFlow" |
|
|
|
for mem_name, mem_path in input_data["memory_files"].items(): |
|
assert mem_name in self.supported_mem_name, (f"{mem_name} is not supported in MemoryReadingAtomicFlow, " |
|
f"supported names are: {self.supported_mem_name}") |
|
assert os.path.exists(mem_path), f"{mem_path} does not exist." |
|
assert os.path.isfile(mem_path), f"{mem_path} is not a file." |
|
|
|
def _read_text(self, file_location): |
|
""" |
|
Read text from a file. |
|
:param file_location: the file location |
|
:type file_location: str |
|
:return: the content of the file |
|
:rtype: str |
|
""" |
|
with open(file_location, 'r', encoding='utf-8') as file: |
|
content = file.read() |
|
return content |
|
|
|
def _get_pyfile_functions_metadata_from_file(self, file_location): |
|
""" |
|
Get the metadata of all the functions in a python file. |
|
:param file_location: the file location |
|
:type file_location: str |
|
:return: the metadata of all the functions in a python file |
|
:rtype: List[Dict[str, Any]] |
|
""" |
|
def python_file_path_to_module_name(file_path): |
|
""" |
|
Convert a python file path to its module name. |
|
:param file_path: the file path |
|
:type file_path: str |
|
:return: the module name |
|
:rtype: str |
|
""" |
|
return os.path.basename(file_path).replace('.py', '') |
|
|
|
def extract_top_level_function_names(python_file_path): |
|
""" |
|
Extract the top level function names from a python file. |
|
:param python_file_path: the python file path |
|
:type python_file_path: str |
|
:return: the top level function names |
|
:rtype: List[str] |
|
""" |
|
with open(python_file_path, 'r') as file: |
|
file_content = file.read() |
|
tree = ast.parse(file_content) |
|
functions = filter(lambda node: isinstance(node, ast.FunctionDef), ast.iter_child_nodes(tree)) |
|
return [node.name for node in functions] |
|
|
|
def load_module_from_file(file_path): |
|
""" |
|
Load a module from a file. |
|
:param file_path: the file path |
|
:type file_path: str |
|
:return: the module |
|
:rtype: ModuleType |
|
""" |
|
module_name = python_file_path_to_module_name(file_path) |
|
spec = importlib.util.spec_from_file_location(module_name, file_path) |
|
module = importlib.util.module_from_spec(spec) |
|
spec.loader.exec_module(module) |
|
return module |
|
|
|
def get_function_from_name(function_name, module): |
|
""" |
|
Get a function from its name. |
|
:param function_name: the function name |
|
:type function_name: str |
|
:param module: the module |
|
:type module: ModuleType |
|
:return: the function |
|
:rtype: Callable |
|
""" |
|
return getattr(module, function_name) |
|
|
|
def function_to_dict(function): |
|
""" |
|
Convert a function to a dictionary. |
|
:param function: the function |
|
:type function: Callable |
|
:return: the dictionary |
|
:rtype: Dict[str, Any] |
|
""" |
|
if not callable(function): |
|
raise ValueError("Provided object is not a function.") |
|
|
|
function_dict = { |
|
"name": function.__name__, |
|
"doc": function.__doc__, |
|
"args": [] |
|
} |
|
|
|
signature = inspect.signature(function) |
|
for name, param in signature.parameters.items(): |
|
arg_info = { |
|
"name": name, |
|
"default": param.default if param.default is not inspect.Parameter.empty else None, |
|
"type": str(param.annotation) if param.annotation is not inspect.Parameter.empty else "unknown" |
|
} |
|
function_dict["args"].append(arg_info) |
|
|
|
return function_dict |
|
|
|
function_names = extract_top_level_function_names(file_location) |
|
module = load_module_from_file(file_location) |
|
functions = [get_function_from_name(name, module) for name in function_names] |
|
return [function_to_dict(function) for function in functions] |
|
|
|
def _format_metadata(self, metadata): |
|
""" |
|
Format the metadata. |
|
:param metadata: the metadata |
|
:type metadata: List[Dict[str, Any]] |
|
:return: the formatted metadata |
|
:rtype: str |
|
""" |
|
lines = [] |
|
for function_data in metadata: |
|
lines.append(f"Function: {function_data['name']}") |
|
lines.append(f"Documentation: {function_data['doc']}") |
|
|
|
args = function_data.get('args', []) |
|
if args: |
|
lines.append("Arguments:") |
|
for arg in args: |
|
default = f" (default: {arg['default']})" if arg['default'] is not None else "" |
|
lines.append(f" - {arg['name']} (type: {arg['type']}){default}") |
|
else: |
|
lines.append("Arguments: None") |
|
|
|
lines.append("#########") |
|
|
|
return '\n'.join(lines) |
|
def _read_py_code_library(self, file_location): |
|
""" |
|
Read the python code library from a file. |
|
:param file_location: the file location |
|
:type file_location: str |
|
:return: the python code library |
|
:rtype: str |
|
""" |
|
metadata = self._get_pyfile_functions_metadata_from_file(file_location) |
|
if len(metadata) == 0: |
|
return "No functions yet." |
|
formatted_metadata = self._format_metadata(metadata) |
|
return formatted_metadata |
|
|
|
def run( |
|
self, |
|
input_data: Dict[str, Any]): |
|
""" |
|
Run the flow. |
|
:param input_data: the input data |
|
:type input_data: Dict[str, Any] |
|
:return: the output data |
|
:rtype: Dict[str, Any] |
|
""" |
|
self._check_input_data(input_data) |
|
response = {} |
|
for mem_name, mem_path in input_data["memory_files"].items(): |
|
if mem_name in ['plan', 'logs']: |
|
response[mem_name] = self._read_text(mem_path) |
|
elif mem_name == 'code_library' and mem_path.endswith('.py'): |
|
response[mem_name] = self._read_py_code_library(mem_path) |
|
return response |
|
|