| | |
| | |
| |
|
| | import os |
| | import sys |
| | import json |
| | import getpass |
| | from datetime import datetime |
| | from pathlib import Path |
| | from typing import Dict, List, Optional, Any |
| | from dataclasses import dataclass, field |
| | import uuid |
| |
|
| | |
| | from auth.user_manager import UnifiedUserManager |
| | from auth.session_handler import SessionHandler |
| | from agents.agent_registry import AgentRegistry |
| | from discussion.discussion_engine import ClinicalDiscussionEngine |
| | from storage.discussion_storage import DiscussionStorage |
| | from storage.user_data import UserDataManager |
| | from utils.config import ClinicalConfig |
| | from utils.logger import setup_logger |
| | from auth import User |
| |
|
| | project_root = Path(__file__).parent.parent |
| | sys.path.insert(0, str(project_root)) |
| |
|
| | class ClinicalCLI: |
| | """临床多智能体讨论系统命令行界面""" |
| | |
| | def __init__(self, config_path: str = "config.json"): |
| | |
| | self.config = ClinicalConfig(config_path) |
| | self.cli_interface = CLIInterface(self) |
| |
|
| | |
| | self.logger = setup_logger("clinical_cli") |
| | |
| | |
| | try: |
| | self.user_manager = UnifiedUserManager() |
| | self.session_handler = SessionHandler() |
| | self.agent_registry = AgentRegistry() |
| | self.discussion_storage = DiscussionStorage() |
| | self.user_data_manager = UserDataManager() |
| | except Exception as e: |
| | self.logger.error(f"组件初始化失败: {e}") |
| | |
| | if "api_base" in str(e): |
| | self.logger.error("请检查配置文件中模型API设置是否正确") |
| | raise |
| | |
| | |
| | self.current_session = None |
| | self.current_user = None |
| | self.current_discussion = None |
| | |
| | |
| | self.discussion_config = { |
| | "rounds": self.config.discussion.default_rounds, |
| | "user_participation": False, |
| | "auto_save": True, |
| | "export_format": "json" |
| | } |
| |
|
| | def get_model_args(self): |
| | """获取模型参数 - 修复这个方法""" |
| | |
| | class Args: |
| | def __init__(self, config): |
| | self.model = config.model.engine |
| | self.llm_name = config.model.model_name |
| | self.url = config.model.api_base |
| | self.temp = config.model.temperature |
| | self.discussion_rounds = getattr(config.discussion, 'default_rounds', 3) |
| | |
| | return Args(self.config) |
| | |
| | def clear_screen(self): |
| | """清屏""" |
| | os.system('cls' if os.name == 'nt' else 'clear') |
| | |
| | def print_header(self, title: str): |
| | """打印标题""" |
| | self.clear_screen() |
| | print("=" * 60) |
| | print(f"临床MDT智能模拟助手 - {title}") |
| | print("=" * 60) |
| | print() |
| | |
| | def wait_for_enter(self, message: str = "按回车键继续..."): |
| | """等待用户按回车""" |
| | input(f"\n{message}") |
| | |
| | def authenticate_user(self) -> bool: |
| | """用户认证流程""" |
| | while True: |
| | self.print_header("用户认证") |
| | print("1. 用户登录") |
| | print("2. 用户注册") |
| | print("3. 退出系统") |
| | print() |
| | |
| | choice = self.cli_interface.get_user_input("请选择操作: ", required=False) |
| | |
| | if choice == "1": |
| | return self.user_login() |
| | elif choice == "2": |
| | self.user_register() |
| | elif choice == "3" or choice.lower() == "exit": |
| | print("感谢使用临床多智能体讨论系统!") |
| | sys.exit(0) |
| | else: |
| | print("无效选择,请重新输入。") |
| | self.wait_for_enter() |
| | |
| | def user_login(self) -> bool: |
| | """用户登录""" |
| | self.print_header("用户登录") |
| | |
| | username = self.cli_interface.get_user_input("用户名: ") |
| | password = self.cli_interface.get_user_input("密码: ", password=False) |
| | |
| | try: |
| | user_info = self.user_manager.authenticate(username, password) |
| | if user_info: |
| | |
| | self.current_user = { |
| | 'user_id': user_info.user_id, |
| | 'username': user_info.username, |
| | 'full_name': getattr(user_info, 'full_name', ''), |
| | 'department': getattr(user_info, 'department', ''), |
| | 'role': getattr(user_info, 'role', 'user') |
| | } |
| | |
| | self.current_session = self.session_handler.create_session(user_info.user_id) |
| | |
| | print(f"\n登录成功!欢迎回来,{user_info.username}!") |
| | self.logger.info(f"用户 {username} 登录成功") |
| | self.wait_for_enter() |
| | return True |
| | else: |
| | print("\n登录失败:用户名或密码错误。") |
| | self.wait_for_enter() |
| | return False |
| | |
| | except Exception as e: |
| | print(f"\n登录失败:{e}") |
| | self.wait_for_enter() |
| | return False |
| |
|
| | def user_register(self): |
| | """用户注册""" |
| | self.print_header("用户注册") |
| | |
| | username = self.cli_interface.get_user_input("请输入用户名: ") |
| | |
| | |
| | if self.user_manager.user_exists(username): |
| | print("该用户名已存在,请选择其他用户名。") |
| | self.wait_for_enter() |
| | return |
| | |
| | password = self.cli_interface.get_user_input("请输入密码: ", password=False) |
| | confirm_password = self.cli_interface.get_user_input("请确认密码: ", password=False) |
| | |
| | if password != confirm_password: |
| | print("密码不一致,请重新输入。") |
| | self.wait_for_enter() |
| | return |
| | |
| | |
| | full_name = self.cli_interface.get_user_input("请输入真实姓名(可选): ", required=False) |
| | department = self.cli_interface.get_user_input("请输入所在科室(可选): ", required=False) |
| | |
| | try: |
| | |
| | success, result = self.user_manager.create_user( |
| | username=username, |
| | password=password, |
| | full_name=full_name, |
| | department=department |
| | ) |
| | |
| | if success: |
| | print(f"\n注册成功!欢迎使用临床多智能体讨论系统,{username}!") |
| | self.logger.info(f"新用户注册: {username}") |
| | else: |
| | print(f"\n注册失败:{result}") |
| | |
| | self.wait_for_enter() |
| | |
| | except Exception as e: |
| | print(f"\n注册失败:{e}") |
| | self.wait_for_enter() |
| |
|
| | def show_main_menu(self): |
| | """显示主菜单""" |
| | while True: |
| | self.print_header("主菜单") |
| | print(f"当前用户: {self.current_user['username']}") |
| | if self.current_session: |
| | print(f"会话ID: {self.current_session[:8]}...") |
| | print() |
| | |
| | print("1. 开始新的讨论") |
| | print("2. 查看历史讨论") |
| | print("3. 管理智能体") |
| | print("4. 系统设置") |
| | print("5. 用户信息") |
| | print("6. 退出系统") |
| | print() |
| | |
| | choice = self.cli_interface.get_user_input("请选择操作: ", required=False) |
| | |
| | if choice == "1": |
| | self.start_new_discussion() |
| | elif choice == "2": |
| | self.view_discussion_history() |
| | elif choice == "3": |
| | self.manage_agents() |
| | elif choice == "4": |
| | self.system_settings() |
| | elif choice == "5": |
| | self.user_information() |
| | elif choice == "5": |
| | self.user_information() |
| | elif choice == "6": |
| | if self.cli_interface.confirm_action("确定要退出系统吗?"): |
| | print("感谢使用临床多智能体讨论系统!") |
| | break |
| | else: |
| | print("无效选择,请重新输入。") |
| | self.wait_for_enter() |
| |
|
| | def start_new_discussion(self): |
| | """开始新的讨论""" |
| | self.print_header("开始新的讨论") |
| | |
| | |
| | available_agents = self.agent_registry.get_available_agents(self.current_session) |
| | agent_names = list(available_agents.keys()) |
| | |
| | if not agent_names: |
| | print("当前没有可用的智能体,请先添加智能体。") |
| | self.wait_for_enter() |
| | return |
| | |
| | print("步骤1: 选择参与讨论的智能体") |
| | print("提示:可以输入多个编号,用空格或逗号分隔,如:1 3 5 或 1,3,5") |
| | |
| | selected_agents = self.cli_interface.select_from_list( |
| | agent_names, |
| | "请选择要参与讨论的智能体(可多选):", |
| | allow_multiple=True |
| | ) |
| | |
| | if not selected_agents: |
| | print("未选择任何智能体,取消讨论。") |
| | self.wait_for_enter() |
| | return |
| | |
| | print(f"已选择 {len(selected_agents)} 个智能体: {', '.join(selected_agents)}") |
| |
|
| | |
| | self.print_header("输入病历信息") |
| | print("步骤2: 输入病历信息") |
| | print("请输入患者的病历信息,包括主诉、现病史、既往史、体格检查、辅助检查等。") |
| | print("(输入完成后请双击回车结束输入)") |
| | print() |
| | |
| | medical_record = self.cli_interface.get_multiline_input("病历信息:") |
| | |
| | if not medical_record.strip(): |
| | print("病历信息不能为空。") |
| | self.wait_for_enter() |
| | return |
| | |
| | |
| | self.print_header("输入讨论问题") |
| | print("步骤3: 输入讨论问题") |
| | question = self.cli_interface.get_user_input("请输入需要讨论的临床问题: ") |
| | |
| | if not question.strip(): |
| | print("讨论问题不能为空。") |
| | self.wait_for_enter() |
| | return |
| | |
| | |
| | self.configure_discussion_settings() |
| | |
| | |
| | self.print_header("确认讨论信息") |
| | print("请确认以下讨论信息:") |
| | print(f"参与智能体: {', '.join(selected_agents)}") |
| | print(f"讨论问题: {question}") |
| | print(f"讨论轮数: {self.discussion_config['rounds']}") |
| | print(f"用户参与: {'是' if self.discussion_config['user_participation'] else '否'}") |
| | print() |
| | |
| | if not self.cli_interface.confirm_action("确认开始讨论吗?"): |
| | print("讨论已取消。") |
| | self.wait_for_enter() |
| | return |
| | |
| | |
| | try: |
| | self.run_discussion(selected_agents, medical_record, question) |
| | except Exception as e: |
| | print(f"讨论过程中出现错误: {e}") |
| | self.logger.error(f"讨论错误: {e}") |
| | self.wait_for_enter() |
| |
|
| | def configure_discussion_settings(self): |
| | """配置讨论设置""" |
| | self.print_header("讨论设置") |
| | |
| | print("当前设置:") |
| | print(f"1. 讨论轮数: {self.discussion_config['rounds']}") |
| | print(f"2. 用户参与讨论: {'是' if self.discussion_config['user_participation'] else '否'}") |
| | print(f"3. 自动保存: {'是' if self.discussion_config['auto_save'] else '否'}") |
| | print(f"4. 导出格式: {self.discussion_config['export_format']}") |
| | print() |
| | |
| | change_settings = self.cli_interface.confirm_action("是否修改讨论设置?") |
| | |
| | if change_settings: |
| | |
| | try: |
| | rounds_input = self.cli_interface.get_user_input( |
| | f"讨论轮数 (当前: {self.discussion_config['rounds']}, 范围: 1-5): ", |
| | required=False |
| | ) |
| | if rounds_input: |
| | new_rounds = int(rounds_input) |
| | if 1 <= new_rounds <= 5: |
| | self.discussion_config['rounds'] = new_rounds |
| | print(f"讨论轮数已设置为: {new_rounds}") |
| | except ValueError: |
| | print("讨论轮数必须在1-5之间,将使用默认值。") |
| | |
| | |
| | print("\n用户参与设置:") |
| | print("如果选择'是',讨论过程中会提示您介入;") |
| | print("如果选择'否',系统将自动完成整个讨论过程。") |
| | participation = self.cli_interface.confirm_action("是否允许用户参与讨论?") |
| | self.discussion_config['user_participation'] = participation |
| | print(f"用户参与已设置为: {'是' if participation else '否'}") |
| | |
| | |
| | auto_save = self.cli_interface.confirm_action("是否自动保存讨论记录?") |
| | self.discussion_config['auto_save'] = auto_save |
| | |
| | |
| | formats = ["json", "docx", "txt"] |
| | print("可选导出格式: " + ", ".join(formats)) |
| | export_format = self.cli_interface.get_user_input(f"导出格式 ({self.discussion_config['export_format']}): ", required=False) |
| | if export_format and export_format.lower() in formats: |
| | self.discussion_config['export_format'] = export_format.lower() |
| |
|
| | def show_discussion_result(self, discussion_result: Dict): |
| | """显示讨论结果 - 修复未定义字段问题""" |
| | print("\n" + "=" * 80) |
| | print("临床多智能体讨论汇总报告") |
| | print("=" * 80) |
| |
|
| | |
| | |
| | status = discussion_result.get("status", "completed") |
| | if status == "interrupted": |
| | print("讨论被用户中断") |
| | metadata = discussion_result.get("metadata", {}) |
| | rounds_completed = metadata.get("rounds_completed", 0) |
| | print(f"已完成轮次: {rounds_completed}") |
| | return |
| | elif status == "error": |
| | error_msg = discussion_result.get("error", "未知错误") |
| | print(f"讨论过程中出现错误: {error_msg}") |
| | return |
| |
|
| | |
| | metadata = discussion_result.get('metadata', {}) |
| | print(f"讨论ID: {metadata.get('discussion_id', '未知')}") |
| | print(f"参与智能体: {', '.join(metadata.get('agents_used', []))}") |
| | print(f"讨论轮数: {metadata.get('rounds', 0)}") |
| | print(f"生成时间: {metadata.get('timestamp', '未知')}") |
| | print("-" * 80) |
| |
|
| | |
| | clinical_summary = discussion_result.get('clinical_summary', {}) |
| | if clinical_summary: |
| | print("\n临床总结:") |
| | print("-" * 40) |
| | |
| | |
| | primary_diagnosis = clinical_summary.get('primary_diagnosis', |
| | clinical_summary.get('diagnosis', |
| | clinical_summary.get('final_decision', '未知'))) |
| | print(f"主要诊断: {primary_diagnosis}") |
| | |
| | |
| | treatment_plan = clinical_summary.get('treatment_plan', {}) |
| | if treatment_plan: |
| | print("\n治疗方案:") |
| | if isinstance(treatment_plan, dict): |
| | for category, plans in treatment_plan.items(): |
| | if plans: |
| | print(f" {category}:") |
| | if isinstance(plans, list): |
| | for plan in plans: |
| | print(f" • {plan}") |
| | else: |
| | print(f" • {plans}") |
| | elif isinstance(treatment_plan, list): |
| | for plan in treatment_plan: |
| | print(f" • {plan}") |
| | elif isinstance(treatment_plan, str): |
| | print(f" {treatment_plan}") |
| | |
| | |
| | quality_assessment = discussion_result.get('evaluation_metrics', {}) |
| | if quality_assessment: |
| | print("\n质量评估:") |
| | for metric, score in quality_assessment.items(): |
| | if isinstance(score, (int, float)): |
| | print(f" {metric}: {score}/100") |
| | |
| | |
| | discussion_process = discussion_result.get('discussion_process', {}) |
| | discussion_log = discussion_process.get('discussion_log', []) |
| | if discussion_log: |
| | total_contributions = 0 |
| | for round_data in discussion_log: |
| | contributions = round_data.get('contributions', []) |
| | total_contributions += len(contributions) |
| | |
| | print(f"\n讨论统计: 共{len(discussion_log)}轮,{total_contributions}次发言") |
| | |
| | print("\n" + "=" * 80) |
| |
|
| | def run_discussion(self, agent_names: List[str], medical_record: str, question: str): |
| | """运行讨论 - 修复导出方法调用""" |
| | self.print_header("讨论进行中") |
| | |
| | print("正在初始化讨论环境...") |
| | |
| | |
| | discussion_engine = ClinicalDiscussionEngine( |
| | args=self.get_model_args(), |
| | user_session={ |
| | 'session_id': self.current_session, |
| | 'user_id': self.current_user['user_id'] |
| | }, |
| | interface=self.cli_interface |
| | ) |
| |
|
| | if hasattr(discussion_engine, 'set_discussion_config'): |
| | discussion_engine.set_discussion_config(self.discussion_config) |
| | discussion_engine.discussion_config = self.discussion_config |
| | discussion_engine.max_rounds = self.discussion_config['rounds'] |
| |
|
| | discussion_engine.initialize_discussion(medical_record, question, agent_names) |
| | |
| | print("讨论开始...") |
| | print(f"病历信息: {medical_record}") |
| | print(f"讨论问题: {question}") |
| | print(f"参与智能体: {', '.join(agent_names)}") |
| | print(f"计划讨论轮数: {self.discussion_config['rounds']}") |
| | print("-" * 60) |
| | |
| | |
| | self.logger.info(f"开始讨论,参与智能体: {agent_names}") |
| | discussion_result = discussion_engine.start_discussion() |
| | |
| | |
| | self.logger.info(f"讨论结果类型: {type(discussion_result)}") |
| | if isinstance(discussion_result, dict): |
| | self.logger.info(f"讨论结果键: {list(discussion_result.keys())}") |
| | |
| | |
| | if self.discussion_config['auto_save'] and isinstance(discussion_result, dict): |
| | try: |
| | |
| | save_data = { |
| | "agents": agent_names, |
| | "rounds": self.discussion_config['rounds'], |
| | "medical_record": medical_record, |
| | "question": question, |
| | "log": discussion_result.get("discussion_process", {}).get("discussion_log", []), |
| | "summary": discussion_result.get("clinical_summary", {}), |
| | "interventions": discussion_result.get("discussion_process", {}).get("user_interventions", []), |
| | "metrics": discussion_result.get("evaluation_metrics", {}), |
| | "metadata": discussion_result.get("metadata", { |
| | "discussion_id": str(uuid.uuid4())[:8], |
| | "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), |
| | "agents_used": agent_names, |
| | "rounds": self.discussion_config['rounds'] |
| | }), |
| | "medical_context": discussion_result.get("medical_context", { |
| | "medical_record": medical_record, |
| | "question": question |
| | }), |
| | "discussion_process": discussion_result.get("discussion_process", { |
| | "discussion_log": discussion_result.get("discussion_process", {}).get("discussion_log", []), |
| | "user_interventions": discussion_result.get("discussion_process", {}).get("user_interventions", []) |
| | }) |
| | } |
| | |
| | discussion_id = self.discussion_storage.save_discussion( |
| | self.current_user['user_id'], |
| | save_data |
| | ) |
| | discussion_result['discussion_id'] = discussion_id |
| | self.logger.info(f"讨论记录已保存,ID: {discussion_id}") |
| | |
| | except Exception as e: |
| | self.logger.error(f"保存讨论记录失败: {e}") |
| | discussion_result['save_error'] = str(e) |
| | |
| | |
| | self.show_discussion_result(discussion_result) |
| | |
| | |
| | self.handle_discussion_export(discussion_result) |
| | |
| | self.current_discussion = discussion_result |
| | self.wait_for_enter("讨论结束,按回车键返回主菜单...") |
| |
|
| | def handle_discussion_export(self, discussion_result: Dict): |
| | """处理讨论导出 - 新增方法,替代export_discussion_result""" |
| | if not self.cli_interface.confirm_action("是否导出讨论结果?"): |
| | return |
| | |
| | |
| | export_formats = [ |
| | {"name": "JSON格式", "value": "json"}, |
| | {"name": "Word文档", "value": "docx"}, |
| | {"name": "HTML标准版", "value": "html"}, |
| | {"name": "HTML简洁版", "value": "simple_html"}, |
| | {"name": "文本文件", "value": "txt"} |
| | ] |
| | |
| | format_names = [fmt["name"] for fmt in export_formats] |
| | selected_formats = self.cli_interface.select_from_list( |
| | format_names, |
| | "请选择导出格式(可多选):", |
| | allow_multiple=True |
| | ) |
| | |
| | if not selected_formats: |
| | print("取消导出。") |
| | return |
| | |
| | |
| | default_filename = f"clinical_discussion_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
| | export_base_path = self.cli_interface.get_user_input( |
| | f"导出路径(默认: ./exports/{default_filename}): ", |
| | required=False |
| | ) |
| | |
| | if not export_base_path: |
| | export_base_path = f"./exports/{default_filename}" |
| | |
| | |
| | os.makedirs(os.path.dirname(export_base_path), exist_ok=True) |
| | |
| | success_exports = [] |
| | failed_exports = [] |
| | |
| | try: |
| | for format_name in selected_formats: |
| | format_value = next(fmt["value"] for fmt in export_formats if fmt["name"] == format_name) |
| | |
| | try: |
| | export_file = self.discussion_storage.export_discussion( |
| | discussion_result, |
| | format_value, |
| | export_base_path + f".{format_value}" |
| | ) |
| | |
| | success_exports.append((format_name, export_file)) |
| | print(f"✅ 成功导出为 {format_name}: {export_file}") |
| | |
| | except Exception as e: |
| | failed_exports.append((format_name, str(e))) |
| | print(f"❌ 导出 {format_name} 失败: {e}") |
| | |
| | |
| | print("\n" + "="*50) |
| | print("导出结果汇总:") |
| | print("="*50) |
| | |
| | if success_exports: |
| | print("✅ 成功导出的格式:") |
| | for format_name, filepath in success_exports: |
| | print(f" - {format_name}: {filepath}") |
| | |
| | if failed_exports: |
| | print("❌ 导出失败的格式:") |
| | for format_name, error in failed_exports: |
| | print(f" - {format_name}: {error}") |
| | |
| | self.wait_for_enter() |
| | |
| | except Exception as e: |
| | print(f"❌ 导出过程中发生未知错误: {e}") |
| | self.wait_for_enter() |
| | |
| | def prompt_for_intervention_with_timeout(self, prompt: str, timeout: int = 5) -> Optional[str]: |
| | """ |
| | 带超时的用户介入提示 |
| | """ |
| | import threading |
| | from queue import Queue |
| | |
| | |
| | print(f"\n{prompt}", end='', flush=True) |
| | |
| | result_queue = Queue() |
| | |
| | def input_with_timeout(): |
| | try: |
| | user_input = input() |
| | result_queue.put(user_input) |
| | except: |
| | result_queue.put(None) |
| | |
| | |
| | input_thread = threading.Thread(target=input_with_timeout) |
| | input_thread.daemon = True |
| | input_thread.start() |
| | |
| | |
| | input_thread.join(timeout) |
| | |
| | if not result_queue.empty(): |
| | return result_queue.get() |
| | else: |
| | print(f"\n⏰ {timeout}秒超时,继续自动讨论...") |
| | return None |
| |
|
| | def get_structured_intervention_prompt(self) -> Dict[str, Any]: |
| | """获取结构化的介入提示选项""" |
| | print((" - 1 向特定智能体提问 (q); 2. 向所有智能体提问 (a); 3. 补充病例信息 (i); 4. 跳过当前轮次 (s); 5. 终止讨论 (x); 6. 继续自动讨论 (回车) ")) |
| | |
| | |
| | print("请选择介入方式: ", end='', flush=True) |
| | |
| | try: |
| | choice = input().strip().lower() |
| | except: |
| | choice = "" |
| | |
| | intervention_map = { |
| | '1': 'question_to_agent', |
| | 'q': 'question_to_agent', |
| | '2': 'broadcast_question', |
| | 'a': 'broadcast_question', |
| | '3': 'add_information', |
| | 'i': 'add_information', |
| | '4': 'skip_round', |
| | 's': 'skip_round', |
| | '5': 'interrupt', |
| | 'x': 'interrupt' |
| | } |
| | |
| | intervention_type = intervention_map.get(choice) |
| | if intervention_type: |
| | return self._get_intervention_details(intervention_type) |
| | |
| | return None |
| |
|
| | def _get_intervention_details(self, intervention_type: str) -> Dict[str, Any]: |
| | """获取介入详细信息 - 立即提示输入""" |
| | try: |
| | if intervention_type == 'question_to_agent': |
| | print("请选择目标智能体: ", end='', flush=True) |
| | |
| | available_agents = list(self.agents.keys()) |
| | for i, agent in enumerate(available_agents, 1): |
| | print(f"{i}. {agent}") |
| | print("请输入智能体编号或名称: ", end='', flush=True) |
| | |
| | agent_input = input().strip() |
| | |
| | if agent_input.isdigit() and 1 <= int(agent_input) <= len(available_agents): |
| | target_agent = available_agents[int(agent_input) - 1] |
| | else: |
| | target_agent = agent_input |
| | |
| | print("请输入问题: ", end='', flush=True) |
| | question = input().strip() |
| | |
| | return { |
| | 'type': 'question_to_agent', |
| | 'target_agent': target_agent, |
| | 'question': question |
| | } |
| | |
| | elif intervention_type == 'broadcast_question': |
| | print("请输入要向所有智能体提问的问题: ", end='', flush=True) |
| | question = input().strip() |
| | |
| | return { |
| | 'type': 'broadcast_question', |
| | 'question': question |
| | } |
| | |
| | elif intervention_type == 'add_information': |
| | print("请输入要补充的病例信息: ", end='', flush=True) |
| | information = input().strip() |
| | |
| | return { |
| | 'type': 'add_information', |
| | 'information': information |
| | } |
| | |
| | elif intervention_type == 'skip_round': |
| | return {'type': 'skip_round'} |
| | |
| | elif intervention_type == 'interrupt': |
| | return {'type': 'interrupt'} |
| | |
| | except Exception as e: |
| | self.logger.error(f"获取介入详情失败: {e}") |
| | |
| | return None |
| |
|
| | def view_discussion_history(self): |
| | """查看历史讨论""" |
| | self.print_header("历史讨论记录") |
| | |
| | try: |
| | discussions = self.discussion_storage.get_user_discussions(self.current_user['user_id']) |
| | |
| | if not discussions: |
| | print("暂无讨论记录。") |
| | self.wait_for_enter() |
| | return |
| | |
| | |
| | discussion_list = [] |
| | for disc in discussions: |
| | discussion_list.append({ |
| | 'id': disc['metadata']['discussion_id'], |
| | 'date': disc['metadata']['timestamp'], |
| | 'question': disc['medical_context']['question'][:50] + '...' if len(disc['medical_context']['question']) > 50 else disc['medical_context']['question'], |
| | 'agents': ', '.join(disc['metadata']['agents_used'][:3]) + ('...' if len(disc['metadata']['agents_used']) > 3 else '') |
| | }) |
| | |
| | while True: |
| | |
| | print("历史讨论记录:") |
| | print("-" * 80) |
| | print(f"{'序号':<4} {'日期':<12} {'问题':<30} {'参与智能体':<30}") |
| | print("-" * 80) |
| | |
| | for i, disc in enumerate(discussion_list, 1): |
| | print(f"{i:<4} {disc['date']:<12} {disc['question']:<30} {disc['agents']:<30}") |
| | |
| | print("-" * 80) |
| | print("\n操作选项:") |
| | print("1. 查看详细记录") |
| | print("2. 导出讨论记录") |
| | print("3. 删除讨论记录") |
| | print("4. 返回主菜单") |
| | print() |
| | |
| | choice = self.cli_interface.get_user_input("请选择操作: ", required=False) |
| | |
| | if choice == "1": |
| | self.view_discussion_details(discussions) |
| | elif choice == "2": |
| | self.export_discussion_batch(discussions) |
| | elif choice == "3": |
| | self.delete_discussions(discussions) |
| | elif choice == "4": |
| | break |
| | else: |
| | print("无效选择,请重新输入。") |
| | self.wait_for_enter() |
| | |
| | except Exception as e: |
| | print(f"获取历史记录失败: {e}") |
| | self.wait_for_enter() |
| | |
| | def view_discussion_details(self, discussions: List[Dict]): |
| | """查看讨论详情""" |
| | discussion_ids = [disc['discussion_id'] for disc in discussions] |
| | selected_id = self.cli_interface.select_from_list( |
| | discussion_ids, |
| | "请选择要查看的讨论记录:", |
| | allow_multiple=False |
| | ) |
| | |
| | if selected_id: |
| | discussion_id = selected_id[0] |
| | discussion = next(disc for disc in discussions if disc['discussion_id'] == discussion_id) |
| | |
| | self.show_discussion_result(discussion) |
| | self.wait_for_enter() |
| | |
| | def export_discussion_batch(self, discussions: List[Dict]): |
| | """批量导出讨论记录""" |
| | discussion_ids = [disc['discussion_id'] for disc in discussions] |
| | selected_ids = self.cli_interface.select_from_list( |
| | discussion_ids, |
| | "请选择要导出的讨论记录(可多选):", |
| | allow_multiple=True |
| | ) |
| | |
| | if not selected_ids: |
| | return |
| | |
| | selected_discussions = [disc for disc in discussions if disc['discussion_id'] in selected_ids] |
| | |
| | export_formats = ["json", "docx", "html", "simple_html", "txt"] |
| | format_choice = self.cli_interface.select_from_list( |
| | export_formats, |
| | "请选择导出格式:", |
| | allow_multiple=False |
| | ) |
| | |
| | if format_choice: |
| | export_format = format_choice[0] |
| | export_path = self.cli_interface.get_user_input("导出路径: ", required=False) or "./exports/batch_export" |
| | |
| | try: |
| | export_files = self.discussion_storage.export_discussions_batch( |
| | selected_discussions, |
| | export_format, |
| | export_path |
| | ) |
| | |
| | print(f"成功导出 {len(export_files)} 个文件。") |
| | for file in export_files: |
| | print(f" - {file}") |
| | |
| | self.wait_for_enter() |
| | |
| | except Exception as e: |
| | print(f"批量导出失败: {e}") |
| | self.wait_for_enter() |
| | |
| | def delete_discussions(self, discussions: List[Dict]): |
| | """删除讨论记录""" |
| | discussion_ids = [disc['discussion_id'] for disc in discussions] |
| | selected_ids = self.cli_interface.select_from_list( |
| | discussion_ids, |
| | "请选择要删除的讨论记录(可多选):", |
| | allow_multiple=True |
| | ) |
| | |
| | if not selected_ids: |
| | return |
| | |
| | if self.cli_interface.confirm_action(f"确定要删除选中的 {len(selected_ids)} 条讨论记录吗?此操作不可恢复。"): |
| | try: |
| | deleted_count = self.user_data_manager.delete_discussions( |
| | self.current_user['user_id'], |
| | selected_ids |
| | ) |
| | |
| | print(f"成功删除 {deleted_count} 条讨论记录。") |
| | self.wait_for_enter() |
| | |
| | except Exception as e: |
| | print(f"删除失败: {e}") |
| | self.wait_for_enter() |
| |
|
| | def manage_agents(self): |
| | """管理智能体 - 增强版本""" |
| | self.print_header("智能体管理") |
| | |
| | while True: |
| | available_agents = self.agent_registry.get_available_agents(self.current_session) |
| | builtin_agents = self.agent_registry.get_builtin_agents() |
| | custom_agents = self.agent_registry.get_custom_agents(self.current_session) |
| | |
| | print("当前可用智能体:") |
| | print("-" * 60) |
| | |
| | |
| | print("📚 内置智能体:") |
| | categorized_agents = self.agent_registry.get_agents_by_category(self.current_session) |
| | for category, agents in categorized_agents.items(): |
| | if any(agent.get('is_builtin', False) for agent in agents): |
| | print(f" 📂 {category}:") |
| | for agent in agents: |
| | if agent.get('is_builtin', False): |
| | print(f" • {agent['name']} - {agent['description']}") |
| | |
| | |
| | print("\n🎯 自定义智能体:") |
| | if custom_agents: |
| | for agent_name, agent_info in custom_agents.items(): |
| | print(f" • {agent_name} - {agent_info.get('description', '自定义智能体')}") |
| | else: |
| | print(" 暂无自定义智能体") |
| | |
| | print("-" * 60) |
| | print("\n🛠️ 操作选项:") |
| | print("1. 添加自定义智能体") |
| | print("2. 编辑自定义智能体") |
| | print("3. 删除自定义智能体") |
| | print("4. 搜索智能体") |
| | print("5. 查看智能体详情") |
| | print("6. 返回主菜单") |
| | print() |
| | |
| | choice = self.cli_interface.get_user_input("请选择操作: ", required=False) |
| | |
| | if choice == "1": |
| | self.add_custom_agent() |
| | elif choice == "2": |
| | self.edit_custom_agent(custom_agents) |
| | elif choice == "3": |
| | self.delete_custom_agent(custom_agents) |
| | elif choice == "4": |
| | self.search_agents(available_agents) |
| | elif choice == "5": |
| | self.view_agent_details(available_agents) |
| | elif choice == "6": |
| | break |
| | else: |
| | print("无效选择,请重新输入。") |
| | self.wait_for_enter() |
| |
|
| | def add_custom_agent(self): |
| | """添加自定义智能体 - 增强版本""" |
| | self.print_header("添加自定义智能体") |
| | |
| | print("🎯 创建新的自定义智能体") |
| | print("=" * 50) |
| | |
| | agent_name = self.cli_interface.get_user_input("智能体名称: ") |
| | |
| | |
| | available_agents = self.agent_registry.get_available_agents(self.current_session) |
| | if agent_name in available_agents: |
| | print("❌ 该名称已存在,请使用其他名称。") |
| | self.wait_for_enter() |
| | return |
| | |
| | print("\n📝📝 请输入智能体的专业描述:") |
| | description = self.cli_interface.get_user_input("智能体描述: ") |
| | print("\n请输入智能体的专业提示词:") |
| | print("(这将决定智能体的专业领域和行为方式)") |
| | print("提示:可以描述智能体的专业背景、分析风格、回答特点等") |
| | agent_prompt = self.cli_interface.get_multiline_input("智能体提示词:") |
| | |
| | if not agent_prompt.strip(): |
| | print("❌ 提示词不能为空。") |
| | self.wait_for_enter() |
| | return |
| | |
| | |
| | categories = ["内科", "外科", "医技科", "药学", "辅助科室", "自定义"] |
| | print("\n📂 请选择分类:") |
| | for i, category in enumerate(categories, 1): |
| | print(f" {i}. {category}") |
| | |
| | category_choice = self.cli_interface.get_user_input(f"选择分类 (1-{len(categories)}): ", required=False) |
| | try: |
| | category_index = int(category_choice) - 1 |
| | category = categories[category_index] if 0 <= category_index < len(categories) else "自定义" |
| | except: |
| | category = "自定义" |
| | |
| | try: |
| | success = self.agent_registry.create_custom_agent( |
| | self.current_session, |
| | agent_name, |
| | agent_prompt, |
| | description=description, |
| | category=category |
| | ) |
| | |
| | if success: |
| | print(f"✅ 成功添加自定义智能体: {agent_name}") |
| | print(f"📂 分类: {category}") |
| | print(f"📝 描述: {description}") |
| | self.logger.info(f"用户 {self.current_user['username']} 添加自定义智能体: {agent_name}") |
| | else: |
| | print("❌ 添加智能体失败") |
| | |
| | self.wait_for_enter() |
| | |
| | except Exception as e: |
| | print(f"❌ 添加智能体失败: {e}") |
| | self.wait_for_enter() |
| |
|
| | def search_agents(self, available_agents: Dict): |
| | """搜索智能体""" |
| | self.print_header("搜索智能体") |
| | |
| | query = self.cli_interface.get_user_input("请输入搜索关键词: ", required=False) |
| | if not query: |
| | return |
| | |
| | results = self.agent_registry.search_agents(query, self.current_session) |
| | |
| | if results: |
| | print(f"🔍 找到 {len(results)} 个相关智能体:") |
| | print("-" * 60) |
| | for i, result in enumerate(results, 1): |
| | type_icon = "📚" if result.get('is_builtin') else "🎯" |
| | print(f"{i}. {type_icon} {result['name']}") |
| | print(f" 分类: {result.get('category', '未知')}") |
| | print(f" 描述: {result.get('description', '')}") |
| | print() |
| | else: |
| | print("❌ 未找到相关智能体") |
| | |
| | self.wait_for_enter() |
| |
|
| | def delete_custom_agent(self, custom_agents: Dict): |
| | """删除自定义智能体""" |
| | if not custom_agents: |
| | print("当前没有自定义智能体可删除。") |
| | self.wait_for_enter() |
| | return |
| | |
| | agent_names = list(custom_agents.keys()) |
| | selected_agents = self.cli_interface.select_from_list( |
| | agent_names, |
| | "请选择要删除的自定义智能体(可多选):", |
| | allow_multiple=True |
| | ) |
| | |
| | if not selected_agents: |
| | return |
| | |
| | if self.cli_interface.confirm_action(f"确定要删除选中的 {len(selected_agents)} 个自定义智能体吗?"): |
| | try: |
| | for agent_name in selected_agents: |
| | self.agent_registry.delete_custom_agent( |
| | self.current_session, |
| | agent_name |
| | ) |
| | print(f"已删除智能体: {agent_name}") |
| | |
| | self.logger.info(f"用户 {self.current_user['username']} 删除自定义智能体: {selected_agents}") |
| | self.wait_for_enter() |
| | |
| | except Exception as e: |
| | print(f"删除智能体失败: {e}") |
| | self.wait_for_enter() |
| | |
| | def view_agent_details(self, available_agents: Dict): |
| | """查看智能体详情""" |
| | agent_names = list(available_agents.keys()) |
| | selected_agent = self.cli_interface.select_from_list( |
| | agent_names, |
| | "请选择要查看的智能体:", |
| | allow_multiple=False |
| | ) |
| | |
| | if not selected_agent: |
| | return |
| | |
| | agent_name = selected_agent[0] |
| | agent_info = available_agents[agent_name] |
| | |
| | self.print_header(f"智能体详情 - {agent_name}") |
| | print(f"类型: {'内置' if agent_name in self.agent_registry.get_builtin_agents() else '自定义'}") |
| | print(f"专业领域: {agent_info.get('specialty', '未指定')}") |
| | print("\n提示词:") |
| | print("-" * 60) |
| | print(agent_info.get('prompt', '无提示词信息')) |
| | print("-" * 60) |
| | |
| | self.wait_for_enter() |
| | |
| | def system_settings(self): |
| | """系统设置""" |
| | self.print_header("系统设置") |
| | |
| | print("当前系统设置:") |
| | print(f"1. 默认讨论轮数: {self.config.DEFAULT_ROUNDS}") |
| | print(f"2. 最大自定义智能体数: {self.config.MAX_CUSTOM_AGENTS}") |
| | print(f"3. 会话超时时间: {self.config.SESSION_TIMEOUT} 秒") |
| | print(f"4. 默认导出格式: {self.config.DEFAULT_EXPORT_FORMAT}") |
| | print() |
| | |
| | if self.current_user.get('role') == 'admin': |
| | print("管理员选项:") |
| | print("5. 修改系统设置") |
| | print("6. 查看系统日志") |
| | print("7. 管理所有用户") |
| | print() |
| | |
| | print("8. 返回主菜单") |
| | print() |
| | |
| | choice = self.cli_interface.get_user_input("请选择操作: ", required=False) |
| | |
| | if choice == "1" and self.current_user.get('role') == 'admin': |
| | self.change_default_rounds() |
| | elif choice == "2" and self.current_user.get('role') == 'admin': |
| | self.change_max_custom_agents() |
| | elif choice == "3" and self.current_user.get('role') == 'admin': |
| | self.change_session_timeout() |
| | elif choice == "4" and self.current_user.get('role') == 'admin': |
| | self.change_default_export_format() |
| |
|
| | elif choice == "5" and self.current_user.get('role') == 'admin': |
| | print("当前系统设置:") |
| | print(f"1. 模型引擎: {self.config.model.engine}") |
| | print(f"2. API端点: {self.config.model.api_base}") |
| | print(f"3. 模型名称: {self.config.model.model_name}") |
| | print(f"4. 温度参数: {self.config.model.temperature}") |
| | print("\n模型配置管理:") |
| | print("5. 重新加载模型配置") |
| | print("6. 测试模型连接") |
| | print("7. 编辑模型配置文件") |
| | |
| | choice = self.cli_interface.get_user_input("请选择操作: ", required=False) |
| | |
| | if choice == "5" and self.current_user.get('role') == 'admin': |
| | self.reload_model_config() |
| | elif choice == "6": |
| | self.test_model_connection() |
| | elif choice == "7" and self.current_user.get('role') == 'admin': |
| | self.edit_model_config() |
| |
|
| | elif choice == "6" and self.current_user.get('role') == 'admin': |
| | self.view_system_logs() |
| | elif choice == "7" and self.current_user.get('role') == 'admin': |
| | self.manage_all_users() |
| | elif choice == "8": |
| | |
| | pass |
| | else: |
| | print("无权限或无效选择。") |
| | self.wait_for_enter() |
| | |
| | def change_default_rounds(self): |
| | """修改默认讨论轮数""" |
| | new_rounds = self.get_user_input(f"新的默认讨论轮数 ({self.config.DEFAULT_ROUNDS}): ") |
| | try: |
| | new_rounds = int(new_rounds) |
| | if 1 <= new_rounds <= 10: |
| | self.config.DEFAULT_ROUNDS = new_rounds |
| | self.config.save() |
| | print("默认讨论轮数已更新。") |
| | else: |
| | print("讨论轮数必须在1-10之间。") |
| | except ValueError: |
| | print("请输入有效的数字。") |
| | self.wait_for_enter() |
| |
|
| | def reload_model_config(self): |
| | """重新加载模型配置""" |
| | try: |
| | self.config = ClinicalConfig.reload_config() |
| | print("✅ 模型配置已重新加载") |
| | except Exception as e: |
| | print(f"❌ 重新加载配置失败: {e}") |
| |
|
| | def test_model_connection(self): |
| | """测试模型连接""" |
| | print("测试模型连接...") |
| | |
| | pass |
| |
|
| | def edit_model_config(self): |
| | """编辑模型配置文件""" |
| | config_file = "model_config.json" |
| | if not os.path.exists(config_file): |
| | |
| | self.create_default_model_config() |
| | |
| | print(f"编辑配置文件: {config_file}") |
| | |
| | pass |
| |
|
| | def change_max_custom_agents(self): |
| | """修改最大自定义智能体数""" |
| | new_max = self.get_user_input(f"新的最大自定义智能体数 ({self.config.MAX_CUSTOM_AGENTS}): ") |
| | try: |
| | new_max = int(new_max) |
| | if new_max >= 1: |
| | self.config.MAX_CUSTOM_AGENTS = new_max |
| | self.config.save() |
| | print("最大自定义智能体数已更新。") |
| | else: |
| | print("数值必须大于0。") |
| | except ValueError: |
| | print("请输入有效的数字。") |
| | self.wait_for_enter() |
| | |
| | def change_session_timeout(self): |
| | """修改会话超时时间""" |
| | new_timeout = self.get_user_input(f"新的会话超时时间(秒) ({self.config.SESSION_TIMEOUT}): ") |
| | try: |
| | new_timeout = int(new_timeout) |
| | if new_timeout >= 60: |
| | self.config.SESSION_TIMEOUT = new_timeout |
| | self.config.save() |
| | print("会话超时时间已更新。") |
| | else: |
| | print("超时时间必须至少60秒。") |
| | except ValueError: |
| | print("请输入有效的数字。") |
| | self.wait_for_enter() |
| | |
| | def change_llm_api(self): |
| | """修改LLM API地址""" |
| | new_api = self.get_user_input(f"新的LLM API地址 ({self.config.LLM_API_BASE}): ") |
| | if new_api: |
| | self.config.LLM_API_BASE = new_api |
| | self.config.save() |
| | print("LLM API地址已更新。") |
| | self.wait_for_enter() |
| | |
| | def change_default_export_format(self): |
| | """修改默认导出格式""" |
| | formats = ["json", "docx", "txt", "pdf"] |
| | print("可选格式: " + ", ".join(formats)) |
| | new_format = self.get_user_input(f"新的默认导出格式 ({self.config.DEFAULT_EXPORT_FORMAT}): ") |
| | if new_format and new_format.lower() in formats: |
| | self.config.DEFAULT_EXPORT_FORMAT = new_format.lower() |
| | self.config.save() |
| | print("默认导出格式已更新。") |
| | self.wait_for_enter() |
| | |
| | def view_system_logs(self): |
| | """查看系统日志""" |
| | self.print_header("系统日志") |
| | |
| | log_files = self.get_log_files() |
| | if not log_files: |
| | print("暂无日志文件。") |
| | self.wait_for_enter() |
| | return |
| | |
| | selected_file = self.cli_interface.select_from_list( |
| | log_files, |
| | "请选择要查看的日志文件:", |
| | allow_multiple=False |
| | ) |
| | |
| | if selected_file: |
| | try: |
| | with open(selected_file[0], 'r', encoding='utf-8') as f: |
| | content = f.read() |
| | |
| | print(f"日志文件: {selected_file[0]}") |
| | print("-" * 80) |
| | print(content) |
| | print("-" * 80) |
| | |
| | self.wait_for_enter() |
| | |
| | except Exception as e: |
| | print(f"读取日志文件失败: {e}") |
| | self.wait_for_enter() |
| | |
| | def get_log_files(self): |
| | """获取日志文件列表""" |
| | log_dir = "logs" |
| | if not os.path.exists(log_dir): |
| | return [] |
| | |
| | log_files = [] |
| | for file in os.listdir(log_dir): |
| | if file.endswith(".log"): |
| | log_files.append(os.path.join(log_dir, file)) |
| | |
| | return sorted(log_files, reverse=True) |
| | |
| | def manage_all_users(self): |
| | """管理所有用户""" |
| | self.print_header("用户管理") |
| | |
| | try: |
| | all_users = self.user_manager.get_all_users() |
| | |
| | print("所有用户列表:") |
| | print("-" * 80) |
| | print(f"{'用户名':<15} {'真实姓名':<15} {'科室':<15} {'角色':<10} {'最后登录':<20}") |
| | print("-" * 80) |
| | |
| | for user in all_users: |
| | print(f"{user['username']:<15} {user.get('full_name', ''):<15} " |
| | f"{user.get('department', ''):<15} {user.get('role', 'user'):<10} " |
| | f"{user.get('last_login', '从未登录'):<20}") |
| | |
| | print("-" * 80) |
| | print("\n操作选项:") |
| | print("1. 查看用户详情") |
| | print("2. 修改用户角色") |
| | print("3. 重置用户密码") |
| | print("4. 删除用户") |
| | print("5. 返回系统设置") |
| | print() |
| | |
| | choice = self.cli_interface.get_user_input("请选择操作: ", required=False) |
| | |
| | if choice == "1": |
| | self.view_user_details(all_users) |
| | elif choice == "2": |
| | self.change_user_role(all_users) |
| | elif choice == "3": |
| | self.reset_user_password(all_users) |
| | elif choice == "4": |
| | self.delete_user(all_users) |
| | elif choice == "5": |
| | return |
| | else: |
| | print("无效选择,请重新输入。") |
| | self.wait_for_enter() |
| | |
| | except Exception as e: |
| | print(f"获取用户列表失败: {e}") |
| | self.wait_for_enter() |
| | |
| | def view_user_details(self, users: List[Dict]): |
| | """查看用户详情""" |
| | usernames = [user['username'] for user in users] |
| | selected_user = self.cli_interface.select_from_list( |
| | usernames, |
| | "请选择要查看的用户:", |
| | allow_multiple=False |
| | ) |
| | |
| | if not selected_user: |
| | return |
| | |
| | username = selected_user[0] |
| | user_info = next(user for user in users if user['username'] == username) |
| | |
| | self.print_header(f"用户详情 - {username}") |
| | print(f"用户名: {user_info['username']}") |
| | print(f"真实姓名: {user_info.get('full_name', '未设置')}") |
| | print(f"科室: {user_info.get('department', '未设置')}") |
| | print(f"角色: {user_info.get('role', 'user')}") |
| | print(f"创建时间: {user_info.get('created_at', '未知')}") |
| | print(f"最后登录: {user_info.get('last_login', '从未登录')}") |
| | print(f"讨论记录数: {user_info.get('discussion_count', 0)}") |
| | |
| | self.wait_for_enter() |
| | |
| | def change_user_role(self, users: List[Dict]): |
| | """修改用户角色""" |
| | usernames = [user['username'] for user in users] |
| | selected_user = self.cli_interface.select_from_list( |
| | usernames, |
| | "请选择要修改角色的用户:", |
| | allow_multiple=False |
| | ) |
| | |
| | if not selected_user: |
| | return |
| | |
| | username = selected_user[0] |
| | current_role = next(user['role'] for user in users if user['username'] == username) |
| | |
| | roles = ["user", "admin"] |
| | new_role = self.cli_interface.select_from_list( |
| | roles, |
| | f"请选择新角色 (当前: {current_role}):", |
| | allow_multiple=False |
| | ) |
| | |
| | if new_role and new_role[0] != current_role: |
| | try: |
| | self.user_manager.change_user_role(username, new_role[0]) |
| | print(f"用户 {username} 的角色已从 {current_role} 改为 {new_role[0]}。") |
| | self.logger.info(f"管理员 {self.current_user['username']} 修改用户 {username} 角色为 {new_role[0]}") |
| | except Exception as e: |
| | print(f"修改角色失败: {e}") |
| | |
| | self.wait_for_enter() |
| | |
| | def reset_user_password(self, users: List[Dict]): |
| | """重置用户密码""" |
| | usernames = [user['username'] for user in users] |
| | selected_user = self.cli_interface.select_from_list( |
| | usernames, |
| | "请选择要重置密码的用户:", |
| | allow_multiple=False |
| | ) |
| | |
| | if not selected_user: |
| | return |
| | |
| | username = selected_user[0] |
| | |
| | if self.cli_interface.confirm_action(f"确定要重置用户 {username} 的密码吗?"): |
| | new_password = self.get_user_input("请输入新密码: ", password=True) |
| | confirm_password = self.get_user_input("请确认新密码: ", password=True) |
| | |
| | if new_password != confirm_password: |
| | print("密码不一致,重置取消。") |
| | self.wait_for_enter() |
| | return |
| | |
| | try: |
| | self.user_manager.reset_user_password(username, new_password) |
| | print(f"用户 {username} 的密码已重置。") |
| | self.logger.info(f"管理员 {self.current_user['username']} 重置用户 {username} 的密码") |
| | except Exception as e: |
| | print(f"重置密码失败: {e}") |
| | |
| | self.wait_for_enter() |
| | |
| | def delete_user(self, users: List[Dict]): |
| | """删除用户""" |
| | usernames = [user['username'] for user in users if user['username'] != self.current_user['username']] |
| | |
| | if not usernames: |
| | print("没有其他用户可删除。") |
| | self.wait_for_enter() |
| | return |
| | |
| | selected_user = self.cli_interface.select_from_list( |
| | usernames, |
| | "请选择要删除的用户:", |
| | allow_multiple=False |
| | ) |
| | |
| | if not selected_user: |
| | return |
| | |
| | username = selected_user[0] |
| | |
| | if self.cli_interface.confirm_action(f"确定要删除用户 {username} 吗?此操作不可恢复。"): |
| | try: |
| | self.user_manager.delete_user(username) |
| | print(f"用户 {username} 已删除。") |
| | self.logger.info(f"管理员 {self.current_user['username']} 删除用户 {username}") |
| | except Exception as e: |
| | print(f"删除用户失败: {e}") |
| | |
| | self.wait_for_enter() |
| | |
| | def user_information(self): |
| | """用户信息""" |
| | self.print_header("用户信息") |
| | |
| | print(f"用户名: {self.current_user['username']}") |
| | print(f"真实姓名: {self.current_user.get('full_name', '未设置')}") |
| | print(f"科室: {self.current_user.get('department', '未设置')}") |
| | print(f"角色: {self.current_user.get('role', 'user')}") |
| | print(f"注册时间: {self.current_user.get('created_at', '未知')}") |
| | print(f"最后登录: {self.current_user.get('last_login', '未知')}") |
| | |
| | |
| | try: |
| | discussion_count = self.user_data_manager.get_user_discussion_count(self.current_user['user_id']) |
| | print(f"讨论记录数: {discussion_count}") |
| | except: |
| | print("讨论记录数: 无法获取") |
| | |
| | print("\n操作选项:") |
| | print("1. 修改个人信息") |
| | print("2. 修改密码") |
| | print("3. 返回主菜单") |
| | print() |
| | |
| | choice = self.cli_interface.get_user_input("请选择操作: ", required=False) |
| | |
| | if choice == "1": |
| | self.update_profile() |
| | elif choice == "2": |
| | self.change_password() |
| | elif choice == "3": |
| | return |
| | else: |
| | print("无效选择,请重新输入。") |
| | self.wait_for_enter() |
| | |
| | def update_profile(self): |
| | """修改个人信息""" |
| | self.print_header("修改个人信息") |
| | |
| | current_full_name = self.current_user.get('full_name', '') |
| | current_department = self.current_user.get('department', '') |
| | |
| | new_full_name = self.get_user_input(f"真实姓名 ({current_full_name}): ", required=False) |
| | new_department = self.get_user_input(f"科室 ({current_department}): ", required=False) |
| | |
| | if not new_full_name and not new_department: |
| | print("未修改任何信息。") |
| | self.wait_for_enter() |
| | return |
| | |
| | try: |
| | updates = {} |
| | if new_full_name: |
| | updates['full_name'] = new_full_name |
| | if new_department: |
| | updates['department'] = new_department |
| | |
| | self.user_manager.update_user_profile(self.current_user['user_id'], updates) |
| | |
| | |
| | if new_full_name: |
| | self.current_user['full_name'] = new_full_name |
| | if new_department: |
| | self.current_user['department'] = new_department |
| | |
| | print("个人信息已更新。") |
| | self.logger.info(f"用户 {self.current_user['username']} 更新个人信息") |
| | |
| | except Exception as e: |
| | print(f"更新个人信息失败: {e}") |
| | |
| | self.wait_for_enter() |
| | |
| | def change_password(self): |
| | """修改密码""" |
| | self.print_header("修改密码") |
| | |
| | current_password = self.get_user_input("当前密码: ", password=True) |
| | |
| | |
| | if not self.user_manager.verify_password(self.current_user['user_id'], current_password): |
| | print("当前密码不正确。") |
| | self.wait_for_enter() |
| | return |
| | |
| | new_password = self.get_user_input("新密码: ", password=True) |
| | confirm_password = self.get_user_input("确认新密码: ", password=True) |
| | |
| | if new_password != confirm_password: |
| | print("新密码不一致,修改取消。") |
| | self.wait_for_enter() |
| | return |
| | |
| | try: |
| | self.user_manager.change_user_password(self.current_user['user_id'], new_password) |
| | print("密码已修改。") |
| | self.logger.info(f"用户 {self.current_user['username']} 修改密码") |
| | except Exception as e: |
| | print(f"修改密码失败: {e}") |
| | |
| | self.wait_for_enter() |
| | |
| | def run(self): |
| | """运行CLI主循环""" |
| | try: |
| | |
| | self.clear_screen() |
| | print("=" * 60) |
| | print(" 欢迎使用临床多智能体讨论系统") |
| | print("=" * 60) |
| | print() |
| | self.wait_for_enter("按回车键开始...") |
| | |
| | |
| | if not self.authenticate_user(): |
| | return |
| | |
| | |
| | self.show_main_menu() |
| | |
| | except KeyboardInterrupt: |
| | print("\n\n感谢使用临床多智能体讨论系统!") |
| | except Exception as e: |
| | print(f"系统错误: {e}") |
| | self.logger.error(f"系统错误: {e}") |
| | self.wait_for_enter() |
| |
|
| | class CLIInterface: |
| | """CLI接口适配器 - 统一输入处理""" |
| | |
| | def __init__(self, cli_instance): |
| | self.cli = cli_instance |
| | |
| | def get_user_input(self, prompt: str = "", required: bool = True, password: bool = False) -> str: |
| | """获取用户输入""" |
| | while True: |
| | try: |
| | if password: |
| | user_input = getpass.getpass(prompt) |
| | else: |
| | user_input = input(prompt).strip() |
| | |
| | if required and not user_input: |
| | print("请输入:") |
| | continue |
| | |
| | return user_input |
| | except KeyboardInterrupt: |
| | print("\n\n操作已取消。") |
| | sys.exit(0) |
| | except Exception as e: |
| | print(f"输入错误: {e}") |
| | continue |
| |
|
| | def get_multiline_input(self, prompt: str) -> str: |
| | """获取多行输入""" |
| | print(prompt) |
| | print("(输入空行结束输入)") |
| | lines = [] |
| | while True: |
| | try: |
| | line = input() |
| | if line.strip() == "": |
| | break |
| | lines.append(line) |
| | except EOFError: |
| | break |
| | except KeyboardInterrupt: |
| | return "" |
| | return "\n".join(lines) |
| | |
| | def confirm_action(self, prompt: str) -> bool: |
| | """确认操作""" |
| | response = self.get_user_input(f"{prompt} (y/n): ", required=False) |
| | return response and response.lower() in ['y', 'yes', '是', '确认'] |
| | |
| | def select_from_list(self, items: List, prompt: str, allow_multiple: bool = False) -> List: |
| | """从列表中选择项目""" |
| | if not items: |
| | print("暂无选项可用。") |
| | return [] |
| | |
| | print(f"\n{prompt}") |
| | print("-" * 40) |
| | |
| | for i, item in enumerate(items, 1): |
| | if isinstance(item, dict): |
| | display_text = item.get('name', str(item)) |
| | else: |
| | display_text = str(item) |
| | print(f"{i}. {display_text}", end=" ") |
| | if i % 6 == 0: |
| | print() |
| | |
| | print() |
| | |
| | if allow_multiple: |
| | print(f"{len(items) + 1}. 全选") |
| | print(f"{len(items) + 2}. 取消选择") |
| | print("\n提示:可以输入多个编号,用空格、逗号或中文逗号分隔") |
| | else: |
| | print(f"{len(items) + 1}. 返回上级") |
| | |
| | print("-" * 40) |
| | |
| | while True: |
| | try: |
| | choice = self.get_user_input("请选择编号: ", required=False) |
| | |
| | if not choice: |
| | return [] |
| | |
| | if allow_multiple: |
| | if choice.lower() == 'all' or choice == str(len(items) + 1): |
| | return items |
| | if choice.lower() in ['cancel', '取消'] or choice == str(len(items) + 2): |
| | return [] |
| | |
| | |
| | import re |
| | choice = re.sub(r'[,\s]+', ',', choice) |
| | choice = choice.strip(',') |
| | |
| | if ',' in choice: |
| | indices = [] |
| | for part in choice.split(','): |
| | part = part.strip() |
| | if part.isdigit(): |
| | indices.append(int(part)) |
| | elif '-' in part: |
| | range_parts = part.split('-') |
| | if len(range_parts) == 2 and range_parts[0].isdigit() and range_parts[1].isdigit(): |
| | start, end = int(range_parts[0]), int(range_parts[1]) |
| | indices.extend(range(start, end + 1)) |
| | else: |
| | indices = [int(choice)] if choice.isdigit() else [] |
| | |
| | valid_indices = [i for i in indices if 1 <= i <= len(items)] |
| | if valid_indices: |
| | selected_items = [] |
| | for i in valid_indices: |
| | if i <= len(items): |
| | selected_items.append(items[i-1]) |
| | return selected_items |
| | else: |
| | print("无效的选择,请重新输入。") |
| | |
| | else: |
| | index = int(choice) |
| | if 1 <= index <= len(items): |
| | return [items[index-1]] |
| | elif index == len(items) + 1: |
| | return [] |
| | else: |
| | print("无效的选择,请重新输入。") |
| | |
| | except ValueError: |
| | print("请输入有效的数字。") |
| | except KeyboardInterrupt: |
| | return [] |
| | |
| | def has_user_input(self, timeout: float = 0) -> bool: |
| | """检查是否有用户输入 - 简化实现""" |
| | |
| | return False |
| | |
| | def should_prompt_for_intervention(self) -> bool: |
| | """是否应该提示用户介入""" |
| | return False |
| |
|
| | |
| | def main(): |
| | """主函数""" |
| | try: |
| | cli = ClinicalCLI() |
| | cli.run() |
| | except Exception as e: |
| | print(f"程序启动失败: {e}") |
| | import traceback |
| | traceback.print_exc() |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |