|
|
import os |
|
|
from ..core.module import BaseModule |
|
|
from typing import Optional, Literal, Dict, Any, List |
|
|
from pydantic import Field, BaseModel |
|
|
import json |
|
|
from dotenv import load_dotenv |
|
|
import time |
|
|
|
|
|
from ..models import OpenAILLM, OpenAILLMConfig, BaseLLM |
|
|
from ..models.model_configs import LLMConfig |
|
|
from ..prompts.workflow.workflow_editor import WORKFLOW_EDITOR_PROMPT |
|
|
from ..core.logging import logger |
|
|
|
|
|
load_dotenv() |
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
|
|
|
|
|
|
|
class MockLLMConfig(LLMConfig): |
|
|
"""Mock LLM configuration for testing purposes""" |
|
|
llm_type: str = "MockLLM" |
|
|
model: str = "mock-model" |
|
|
|
|
|
|
|
|
class MockLLM(BaseLLM): |
|
|
"""Mock LLM implementation for testing purposes that passes pydantic type validation""" |
|
|
|
|
|
def __init__(self, config: MockLLMConfig = None, **kwargs): |
|
|
if config is None: |
|
|
config = MockLLMConfig( |
|
|
llm_type="MockLLM", |
|
|
model="mock-model", |
|
|
output_response=True |
|
|
) |
|
|
super().__init__(config, **kwargs) |
|
|
|
|
|
def init_model(self): |
|
|
"""Initialize the mock model (no-op)""" |
|
|
pass |
|
|
|
|
|
def formulate_messages(self, prompts: List[str], system_messages: Optional[List[str]] = None) -> List[List[dict]]: |
|
|
"""Mock implementation of formulate_messages""" |
|
|
result = [] |
|
|
for prompt in prompts: |
|
|
messages = [] |
|
|
if system_messages: |
|
|
for sys_msg in system_messages: |
|
|
messages.append({"role": "system", "content": sys_msg}) |
|
|
messages.append({"role": "user", "content": prompt}) |
|
|
result.append(messages) |
|
|
return result |
|
|
|
|
|
def single_generate(self, messages: List[dict], **kwargs) -> str: |
|
|
"""Mock implementation that returns a simple JSON response""" |
|
|
return '{"nodes": [], "edges": []}' |
|
|
|
|
|
def batch_generate(self, batch_messages: List[List[dict]], **kwargs) -> List[str]: |
|
|
"""Mock implementation for batch generation""" |
|
|
return [self.single_generate(messages, **kwargs) for messages in batch_messages] |
|
|
|
|
|
async def single_generate_async(self, messages: List[dict], **kwargs) -> str: |
|
|
"""Mock async implementation""" |
|
|
return self.single_generate(messages, **kwargs) |
|
|
|
|
|
|
|
|
def default_llm_config(): |
|
|
""" |
|
|
Create default LLM configuration. Uses MockLLM in testing environments |
|
|
or when OPENAI_API_KEY is not available. |
|
|
""" |
|
|
|
|
|
is_testing = ( |
|
|
os.getenv("PYTEST_CURRENT_TEST") is not None or |
|
|
os.getenv("CI") is not None or |
|
|
OPENAI_API_KEY is None or |
|
|
OPENAI_API_KEY.strip() == "" |
|
|
) |
|
|
|
|
|
if is_testing: |
|
|
|
|
|
mock_config = MockLLMConfig( |
|
|
llm_type="MockLLM", |
|
|
model="mock-model", |
|
|
output_response=True |
|
|
) |
|
|
return MockLLM(mock_config) |
|
|
else: |
|
|
|
|
|
llm_config = OpenAILLMConfig( |
|
|
model="gpt-4o", |
|
|
openai_key=OPENAI_API_KEY, |
|
|
stream=True, |
|
|
output_response=True |
|
|
) |
|
|
return OpenAILLM(llm_config) |
|
|
|
|
|
class WorkFlowEditorReturn(BaseModel): |
|
|
""" |
|
|
The return of the workflow editor. |
|
|
""" |
|
|
|
|
|
status: Literal["success", "failed", "exceeded_max_retries"] = Field( |
|
|
description="The status of the workflow editing operation" |
|
|
) |
|
|
|
|
|
workflow_json: Dict[str, Any] | None = Field( |
|
|
description="The workflow JSON structure after editing" |
|
|
) |
|
|
|
|
|
workflow_json_path: str | None = Field( |
|
|
description="The file path where the workflow JSON is saved" |
|
|
) |
|
|
|
|
|
error_message: Optional[str] | None = Field( |
|
|
default=None, |
|
|
description="Error message if the operation failed" |
|
|
) |
|
|
|
|
|
class WorkFlowEditor(BaseModule): |
|
|
""" |
|
|
This is a API oriented version of HITLOutsideConversationAgent, it can be used to edit the workflow json structure but in a interaction-free way. |
|
|
Attributes: |
|
|
save_dir (str): The directory to save the workflow json file. |
|
|
llm (BaseLLM): The LLM model to use for editing the workflow json file. |
|
|
max_retries (int): The maximum number of retries to edit the workflow json file. |
|
|
""" |
|
|
save_dir: str |
|
|
llm: Optional[BaseLLM] = Field(default=default_llm_config()) |
|
|
max_retries: Optional[int] = Field(default=3) |
|
|
|
|
|
def init_module(self): |
|
|
pass |
|
|
|
|
|
async def edit_workflow(self, file_path: str, instruction: str, new_file_path: Optional[str] = None): |
|
|
""" |
|
|
optimize or modify the workflow json file according to the instruction, using LLM's ability. |
|
|
Args: |
|
|
file_path (str): The path to the workflow json file or the file name in the save_dir. |
|
|
instruction (str): The instruction to edit the workflow json file. |
|
|
new_file_path (Optional[str]): The path to the new workflow json file. |
|
|
Returns: |
|
|
new_json_path (str): The path to the new workflow json file. |
|
|
""" |
|
|
if new_file_path is None: |
|
|
new_file_path = "new_json_for__" + os.path.split(file_path)[-1] + "__" + time.strftime("%Y%m%d_%H%M%S") + ".json" |
|
|
new_file_path = os.path.join(self.save_dir, new_file_path) |
|
|
else: |
|
|
|
|
|
path_split = os.path.split(new_file_path) |
|
|
if not path_split[0]: |
|
|
new_file_path = os.path.join(self.save_dir, new_file_path) |
|
|
else: |
|
|
if os.path.exists(path_split[0]) and path_split[1][:-5] == ".json": |
|
|
new_file_path = new_file_path |
|
|
else: |
|
|
raise FileNotFoundError(f"The directory {path_split[0]} does not exist or the file name is not a json file name.") |
|
|
|
|
|
|
|
|
with open(file_path, "r") as f: |
|
|
workflow_json = json.load(f) |
|
|
|
|
|
optimization_prompt = WORKFLOW_EDITOR_PROMPT.format( |
|
|
current_workflow_json=json.dumps(workflow_json, indent=2, ensure_ascii=False), |
|
|
user_advice=instruction |
|
|
) |
|
|
messages = [ |
|
|
{"role": "system", "content": "You are a helpful assistant that can optimize the workflow json structure."}, |
|
|
{"role": "user", "content": optimization_prompt} |
|
|
] |
|
|
try: |
|
|
response = await self.llm.single_generate_async(messages=messages, response_format={"type": "json_object"}) |
|
|
|
|
|
optimized_json = json.loads(response) |
|
|
except Exception as e: |
|
|
logger.error(f"LLM optimization failed: {e}") |
|
|
optimized_json = None |
|
|
|
|
|
if not optimized_json: |
|
|
return WorkFlowEditorReturn( |
|
|
status="failed", |
|
|
workflow_json=None, |
|
|
workflow_json_path=None, |
|
|
error_message="LLM optimization failed" |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
from ..workflow.workflow import WorkFlow |
|
|
from ..workflow.workflow_graph import WorkFlowGraph |
|
|
|
|
|
|
|
|
graph = WorkFlowGraph.from_dict(optimized_json) |
|
|
|
|
|
|
|
|
workflow = WorkFlow(graph=graph, llm=self.llm) |
|
|
except Exception as e: |
|
|
logger.error(f"Workflow json structure check failed: {e}") |
|
|
return WorkFlowEditorReturn( |
|
|
status="failed", |
|
|
workflow_json=None, |
|
|
workflow_json_path=None, |
|
|
error_message="Workflow json structure check failed" |
|
|
) |
|
|
del workflow |
|
|
|
|
|
|
|
|
with open(new_file_path, "w") as f: |
|
|
json.dump(optimized_json, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
return WorkFlowEditorReturn( |
|
|
status="success", |
|
|
workflow_json=optimized_json, |
|
|
workflow_json_path=new_file_path, |
|
|
error_message=None |
|
|
) |