intent_recog / orchestrator.py
OrangeWalking's picture
Create orchestrator.py
721d179 verified
import json
import logging
from enum import Enum, auto
from llm_client import LLMClient
from prompts import Prompts
# ==============================================================================
# --- 日志系统配置 (双日志系统) ---
# ==============================================================================
# 1. 调试日志 (orchestrator.log) - 记录所有技术细节
# 用于开发者调试,包含LLM返回的原始信息、错误堆栈等。
debug_logger = logging.getLogger('orchestrator_logger')
debug_logger.setLevel(logging.INFO)
if not debug_logger.handlers:
# 使用 mode='a' 来追加日志,而不是覆盖
file_handler = logging.FileHandler('orchestrator.log', mode='a', encoding='utf-8')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
debug_logger.addHandler(file_handler)
# 阻止日志向上传播,避免在控制台输出
debug_logger.propagate = False
# 在每次程序启动时写入一个分隔符,方便区分不同的运行会话
debug_logger.info("\n" + "="*20 + " APPLICATION STARTED " + "="*20)
# 2. 演示日志 (demo_show.log) - 只记录用户输入和状态变化
# 用于展示和快速回顾对话流程。
demo_logger = logging.getLogger('demo_logger')
demo_logger.setLevel(logging.INFO)
if not demo_logger.handlers:
# 使用 mode='a' 来追加日志
demo_file_handler = logging.FileHandler('demo_show.log', mode='a', encoding='utf-8')
# 使用更简洁的格式,只包含时间戳和消息
demo_formatter = logging.Formatter('%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
demo_file_handler.setFormatter(demo_formatter)
demo_logger.addHandler(demo_file_handler)
# 同样阻止日志向上传播
demo_logger.propagate = False
# 在演示日志中也添加会话启动分隔符
demo_logger.info("\n" + "="*20 + " NEW DEMO SESSION STARTED " + "="*20)
# ==============================================================================
# --- 核心代码 ---
# ==============================================================================
class DialogueState(Enum):
"""对话状态枚举"""
REQUIREMENT_ELICITATION = auto() # 需求梳理
AI_MODELING = auto() # AI建模
class Orchestrator:
"""
对话编排器,负责管理对话流程、状态和调用判别器。
"""
def __init__(self, model="gpt-4-turbo"):
self.llm_client = LLMClient(model=model)
self.conversation_history = []
self.state = DialogueState.REQUIREMENT_ELICITATION
self.prompts = Prompts()
# 记录初始化信息到各自的日志
debug_logger.info(f"Orchestrator initialized. Initial state: {self.state.name}")
demo_logger.info(f"Initial State: {self.state.name}")
def _format_history_for_prompt(self) -> str:
"""将对话历史格式化为字符串"""
if not self.conversation_history:
return "对话尚未开始。"
return "\n".join([f"{msg['role']}: {msg['content']}" for msg in self.conversation_history])
def _check_information_sufficiency(self) -> bool:
"""
【判别器部分-隐式触发】
使用大模型判断对话历史中信息是否足够建模。
"""
if len(self.conversation_history) < 2:
return False
history_str = self._format_history_for_prompt()
prompt = self.prompts.INFORMATION_SUFFICIENCY_CHECK.format(conversation_history=history_str)
try:
response_str = self.llm_client.chat(messages=[{"role": "user", "content": prompt}], temperature=0.1)
debug_logger.info(f"Sufficiency check response: {response_str}")
json_response = json.loads(response_str.strip())
if json_response.get("sufficient") is True:
debug_logger.info(f"Information deemed sufficient. Reason: {json_response.get('reason')}")
return True
else:
debug_logger.info(f"Information insufficient. Missing: {json_response.get('missing_elements')}")
return False
except (json.JSONDecodeError, KeyError, Exception) as e:
debug_logger.error(f"Error during sufficiency check: {e}")
return False
def _check_explicit_trigger(self, user_input: str) -> bool:
"""
【判别器部分-显式触发】
使用大模型判断用户是否明确要求建模。
"""
prompt = self.prompts.EXPLICIT_TRIGGER_CHECK.format(user_input=user_input)
try:
intent = self.llm_client.identify_intent(prompt, temperature=0.1)
debug_logger.info(f"Explicit trigger check intent: '{intent}'")
if "StartModeling" in intent:
debug_logger.info("Explicit trigger to model detected.")
return True
return False
except Exception as e:
debug_logger.error(f"Error during explicit trigger check: {e}")
return False
def _discriminator(self, user_input: str) -> bool:
"""
判别器主函数。
判断是否应该从“需求梳理”切换到“AI建模”。
"""
# 规则1:最高优先级,检查用户是否明确要求建模
if self._check_explicit_trigger(user_input):
return True
# 定义一个阈值来判断是否为“长文本”
LONG_TEXT_THRESHOLD = 200 # 字符数,你可以根据需要调整
# 规则2:如果用户输入的是长文本,使用专门的单文本分析器
if len(user_input) > LONG_TEXT_THRESHOLD:
debug_logger.info(f"Long input detected (length: {len(user_input)}), using single text sufficiency check.")
if self._check_single_text_sufficiency(user_input):
return True
# 规则3:对于常规的、渐进式对话,使用基于历史的分析器
# 注意:只有在不是长文本的情况下,或者长文本分析不充分时,才会走到这一步
if self._check_information_sufficiency():
return True
return False
def _check_single_text_sufficiency(self, text_block: str) -> bool:
"""
【判别器部分-单文本分析】
使用大模型判断单一大段文本中的信息是否足够建模。
"""
prompt = self.prompts.SINGLE_TEXT_SUFFICIENCY_CHECK.format(text_block=text_block)
try:
response_str = self.llm_client.chat(messages=[{"role": "user", "content": prompt}], temperature=0.1)
debug_logger.info(f"Single text sufficiency check response: {response_str}")
json_response = json.loads(response_str.strip())
if json_response.get("sufficient") is True:
debug_logger.info(f"Information in single text deemed sufficient. Reason: {json_response.get('reason')}")
return True
else:
debug_logger.info(f"Information in single text insufficient. Missing: {json_response.get('missing_elements')}")
return False
except (json.JSONDecodeError, KeyError, Exception) as e:
debug_logger.error(f"Error during single text sufficiency check: {e}")
return False
def process_user_message(self, user_input: str) -> str:
"""
处理单轮用户输入,并返回助手的回复。
"""
# 在演示日志中记录用户输入
demo_logger.info(f"User Input: {user_input}")
self.conversation_history.append({"role": "user", "content": user_input})
# 如果已在建模阶段,直接返回提示信息
if self.state == DialogueState.AI_MODELING:
response_content = "当前已处于建模阶段,请等待模型生成结果。"
# 记录当前状态到演示日志
demo_logger.info(f"Current State: {self.state.name} (No change)")
return response_content
# 调用判别器判断是否切换状态
should_switch = self._discriminator(user_input)
if should_switch:
self.state = DialogueState.AI_MODELING
# 同时记录到两个日志文件
debug_logger.info(f"State changed to: {self.state.name}")
demo_logger.info(f"State Changed To: {self.state.name}")
response_content = self.prompts.AI_MODELING_NOTICE
else:
# 同样,同时记录状态信息
debug_logger.info(f"State remains: {self.state.name}")
demo_logger.info(f"State Remains: {self.state.name}")
system_prompt = self.prompts.REQUIREMENT_ELICITATION_SYSTEM_PROMPT
messages = [{"role": "system", "content": system_prompt}] + self.conversation_history
response_content = self.llm_client.chat(messages=messages)
self.conversation_history.append({"role": "assistant", "content": response_content})
#demo_logger.info(f"Assistant Response: {response_content}")
return response_content
def reset(self):
"""重置对话"""
self.conversation_history = []
self.state = DialogueState.REQUIREMENT_ELICITATION
# 在两个日志中都记录重置事件
debug_logger.info("Orchestrator has been reset.")
demo_logger.info("="*20 + " SESSION RESET " + "="*20)
demo_logger.info(f"Initial State: {self.state.name}")