selfevolveagent / evoagentx /hitl /workflow_editor.py
iLOVE2D's picture
Upload 2846 files
5374a2d verified
import os
from ..core.module import BaseModule
from typing import Optional, Literal, Dict, Any, List
from pydantic import Field, BaseModel
import json
from dotenv import load_dotenv
import time
from ..models import OpenAILLM, OpenAILLMConfig, BaseLLM
from ..models.model_configs import LLMConfig
from ..prompts.workflow.workflow_editor import WORKFLOW_EDITOR_PROMPT
from ..core.logging import logger
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
class MockLLMConfig(LLMConfig):
"""Mock LLM configuration for testing purposes"""
llm_type: str = "MockLLM"
model: str = "mock-model"
class MockLLM(BaseLLM):
"""Mock LLM implementation for testing purposes that passes pydantic type validation"""
def __init__(self, config: MockLLMConfig = None, **kwargs):
if config is None:
config = MockLLMConfig(
llm_type="MockLLM",
model="mock-model",
output_response=True
)
super().__init__(config, **kwargs)
def init_model(self):
"""Initialize the mock model (no-op)"""
pass
def formulate_messages(self, prompts: List[str], system_messages: Optional[List[str]] = None) -> List[List[dict]]:
"""Mock implementation of formulate_messages"""
result = []
for prompt in prompts:
messages = []
if system_messages:
for sys_msg in system_messages:
messages.append({"role": "system", "content": sys_msg})
messages.append({"role": "user", "content": prompt})
result.append(messages)
return result
def single_generate(self, messages: List[dict], **kwargs) -> str:
"""Mock implementation that returns a simple JSON response"""
return '{"nodes": [], "edges": []}'
def batch_generate(self, batch_messages: List[List[dict]], **kwargs) -> List[str]:
"""Mock implementation for batch generation"""
return [self.single_generate(messages, **kwargs) for messages in batch_messages]
async def single_generate_async(self, messages: List[dict], **kwargs) -> str:
"""Mock async implementation"""
return self.single_generate(messages, **kwargs)
def default_llm_config():
"""
Create default LLM configuration. Uses MockLLM in testing environments
or when OPENAI_API_KEY is not available.
"""
# Check if we're in a testing environment or if API key is missing
is_testing = (
os.getenv("PYTEST_CURRENT_TEST") is not None or # pytest environment
os.getenv("CI") is not None or # CI environment
OPENAI_API_KEY is None or # no API key
OPENAI_API_KEY.strip() == "" # empty API key
)
if is_testing:
# Return MockLLM for testing environments
mock_config = MockLLMConfig(
llm_type="MockLLM",
model="mock-model",
output_response=True
)
return MockLLM(mock_config)
else:
# Return real OpenAI LLM for production environments
llm_config = OpenAILLMConfig(
model="gpt-4o",
openai_key=OPENAI_API_KEY,
stream=True,
output_response=True
)
return OpenAILLM(llm_config)
class WorkFlowEditorReturn(BaseModel):
"""
The return of the workflow editor.
"""
status: Literal["success", "failed", "exceeded_max_retries"] = Field(
description="The status of the workflow editing operation"
)
workflow_json: Dict[str, Any] | None = Field(
description="The workflow JSON structure after editing"
)
workflow_json_path: str | None = Field(
description="The file path where the workflow JSON is saved"
)
error_message: Optional[str] | None = Field(
default=None,
description="Error message if the operation failed"
)
class WorkFlowEditor(BaseModule):
"""
This is a API oriented version of HITLOutsideConversationAgent, it can be used to edit the workflow json structure but in a interaction-free way.
Attributes:
save_dir (str): The directory to save the workflow json file.
llm (BaseLLM): The LLM model to use for editing the workflow json file.
max_retries (int): The maximum number of retries to edit the workflow json file.
"""
save_dir: str
llm: Optional[BaseLLM] = Field(default=default_llm_config())
max_retries: Optional[int] = Field(default=3)
def init_module(self):
pass
async def edit_workflow(self, file_path: str, instruction: str, new_file_path: Optional[str] = None):
"""
optimize or modify the workflow json file according to the instruction, using LLM's ability.
Args:
file_path (str): The path to the workflow json file or the file name in the save_dir.
instruction (str): The instruction to edit the workflow json file.
new_file_path (Optional[str]): The path to the new workflow json file.
Returns:
new_json_path (str): The path to the new workflow json file.
"""
if new_file_path is None:
new_file_path = "new_json_for__" + os.path.split(file_path)[-1] + "__" + time.strftime("%Y%m%d_%H%M%S") + ".json"
new_file_path = os.path.join(self.save_dir, new_file_path)
else:
# check if new_file_path is a file name or a path
path_split = os.path.split(new_file_path)
if not path_split[0]:
new_file_path = os.path.join(self.save_dir, new_file_path)
else:
if os.path.exists(path_split[0]) and path_split[1][:-5] == ".json":
new_file_path = new_file_path
else:
raise FileNotFoundError(f"The directory {path_split[0]} does not exist or the file name is not a json file name.")
# load the workflow json file
with open(file_path, "r") as f:
workflow_json = json.load(f)
optimization_prompt = WORKFLOW_EDITOR_PROMPT.format(
current_workflow_json=json.dumps(workflow_json, indent=2, ensure_ascii=False),
user_advice=instruction
)
messages = [
{"role": "system", "content": "You are a helpful assistant that can optimize the workflow json structure."},
{"role": "user", "content": optimization_prompt}
]
try:
response = await self.llm.single_generate_async(messages=messages, response_format={"type": "json_object"})
# try to parse the LLM response
optimized_json = json.loads(response)
except Exception as e:
logger.error(f"LLM optimization failed: {e}")
optimized_json = None
if not optimized_json:
return WorkFlowEditorReturn(
status="failed",
workflow_json=None,
workflow_json_path=None,
error_message="LLM optimization failed"
)
# check workflow json structure
try:
from ..workflow.workflow import WorkFlow
from ..workflow.workflow_graph import WorkFlowGraph
# create the workflow graph from the json
graph = WorkFlowGraph.from_dict(optimized_json)
# create the workflow instance
workflow = WorkFlow(graph=graph, llm=self.llm)
except Exception as e:
logger.error(f"Workflow json structure check failed: {e}")
return WorkFlowEditorReturn(
status="failed",
workflow_json=None,
workflow_json_path=None,
error_message="Workflow json structure check failed"
)
del workflow
# save the workflow json file
with open(new_file_path, "w") as f:
json.dump(optimized_json, f, indent=2, ensure_ascii=False)
return WorkFlowEditorReturn(
status="success",
workflow_json=optimized_json,
workflow_json_path=new_file_path,
error_message=None
)