| import sys |
| sys.path.append('../core') |
|
|
|
|
| import os |
| import shutil |
| from datetime import datetime |
| import json |
| import re |
|
|
| |
| from llm.llm import LLM |
| |
| from input.problem import problem_input |
| from agent.problem_analysis import ProblemAnalysis |
| |
| from agent.problem_modeling import ProblemModeling |
| from agent.task_decompse import TaskDecompose |
| from agent.task import Task |
| from agent.create_charts import Chart |
| from agent.coordinator import Coordinator |
| from utils.utils import read_json_file, write_json_file, write_text_file, json_to_markdown |
| from prompt.template import TASK_ANALYSIS_APPEND_PROMPT, TASK_FORMULAS_APPEND_PROMPT, TASK_MODELING_APPEND_PROMPT |
| from utils.generate_paper import generate_paper_from_json |
| |
| from prompt.constants import modeling_methods |
|
|
| def mkdir_output(path): |
| """Creates the necessary output directories.""" |
| os.makedirs(path, exist_ok=True) |
| os.makedirs(os.path.join(path, 'json'), exist_ok=True) |
| os.makedirs(os.path.join(path, 'markdown'), exist_ok=True) |
| os.makedirs(os.path.join(path, 'latex'), exist_ok=True) |
| os.makedirs(os.path.join(path, 'code'), exist_ok=True) |
| os.makedirs(os.path.join(path, 'usage'), exist_ok=True) |
| os.makedirs(os.path.join(path, 'intermediate'), exist_ok=True) |
|
|
| class ModelingAgentSystem: |
| """ |
| Manages the step-by-step generation of a mathematical modeling report. |
| Allows for granular control over section generation and tracks progress. |
| """ |
| def __init__(self, problem_path: str, config: dict, dataset_path: str, output_path: str, name: str): |
| """ |
| Initializes the Modeling Agent System. |
| |
| Args: |
| problem_path: Path to the problem description file (e.g., JSON). |
| config: Dictionary containing configuration parameters (model_name, rounds, etc.). |
| dataset_path: Path to the dataset directory associated with the problem. |
| output_path: Path where generated outputs (json, md, code, etc.) will be saved. |
| name: A unique name for this run/problem (used in filenames). |
| """ |
| self.problem_path = problem_path |
| self.config = config |
| self.dataset_path = dataset_path |
| self.output_path = output_path |
| self.name = name |
|
|
| |
| self.paper = {'tasks': []} |
| self.completed_steps = set() |
| self.planned_steps = [] |
| self.dependencies = self._define_dependencies() |
|
|
| |
| self.llm = LLM(config['model_name']) |
| self.pa = ProblemAnalysis(self.llm) |
| |
| self.pm = ProblemModeling(self.llm) |
| self.td = TaskDecompose(self.llm) |
| self.task = Task(self.llm) |
| self.chart = Chart(self.llm) |
| self.coordinator = Coordinator(self.llm) |
|
|
| |
| self.problem_str: str | None = None |
| self.problem: dict | None = None |
| self.problem_type: str | None = None |
| self.problem_year: str | None = None |
| self.problem_analysis: str | None = None |
| self.modeling_solution: str | None = None |
| self.task_descriptions: list[str] | None = None |
| self.order: list[int] | None = None |
| self.with_code: bool = False |
|
|
| |
| mkdir_output(self.output_path) |
| self._initialize_problem_and_steps() |
| print(f"Initialization complete. Starting steps: {self.planned_steps}") |
| print(f"Already completed: {self.completed_steps}") |
|
|
|
|
| def _define_dependencies(self): |
| """Defines the prerequisite steps for each generation step.""" |
| |
| deps = { |
| 'Problem Background': [], |
| 'Problem Requirement': [], |
| 'Problem Analysis': ['Problem Background', 'Problem Requirement'], |
| 'High-Level Modeling': ['Problem Analysis'], |
| 'Task Decomposition': ['High-Level Modeling'], |
| 'Dependency Analysis': ['Task Decomposition'], |
| |
| } |
| return deps |
|
|
| def _update_dependencies_after_decomp(self): |
| """Updates dependencies for task-specific steps after decomposition and dependency analysis.""" |
| if not self.order: |
| print("Warning: Task order not determined. Cannot update task dependencies.") |
| return |
|
|
| num_tasks = len(self.task_descriptions) |
| for i in range(1, num_tasks + 1): |
| task_id = str(i) |
| task_prereqs = [f'Task {dep_id} Subtask Outcome Analysis' for dep_id in self.coordinator.DAG.get(task_id, [])] |
| |
| base_task_prereqs = ['Dependency Analysis'] + task_prereqs |
|
|
| self.dependencies[f'Task {i} Description'] = ['Task Decomposition'] |
| self.dependencies[f'Task {i} Analysis'] = [f'Task {i} Description'] + base_task_prereqs |
| self.dependencies[f'Task {i} Preliminary Formulas'] = [f'Task {i} Analysis'] |
| self.dependencies[f'Task {i} Mathematical Modeling Process'] = [f'Task {i} Preliminary Formulas'] |
| if self.with_code: |
| self.dependencies[f'Task {i} Code'] = [f'Task {i} Mathematical Modeling Process'] |
| self.dependencies[f'Task {i} Solution Interpretation'] = [f'Task {i} Code'] |
| else: |
| |
| self.dependencies[f'Task {i} Solution Interpretation'] = [f'Task {i} Mathematical Modeling Process'] |
| self.dependencies[f'Task {i} Subtask Outcome Analysis'] = [f'Task {i} Solution Interpretation'] |
| self.dependencies[f'Task {i} Charts'] = [f'Task {i} Subtask Outcome Analysis'] |
|
|
|
|
| def _initialize_problem_and_steps(self): |
| """Loads the problem input and sets up the initial state.""" |
| print("Loading problem input...") |
| self.problem_str, self.problem = problem_input(self.problem_path, self.llm) |
| filename = os.path.splitext(os.path.basename(self.problem_path))[0] |
| if '_' in filename: |
| self.problem_year, self.problem_type = filename.split('_')[:2] |
| else: |
| self.problem_type = 'X' |
| self.problem_year = 'XXXX' |
|
|
| self.paper['problem_background'] = self.problem['background'] |
| self.paper['problem_requirement'] = self.problem['problem_requirement'] |
| self.completed_steps.add('Problem Background') |
| self.completed_steps.add('Problem Requirement') |
|
|
| self.with_code = len(self.problem.get('dataset_path', '')) > 0 or len(self.dataset_path) > 0 |
|
|
| if self.with_code and os.path.exists(self.dataset_path): |
| print(f"Copying dataset from {self.dataset_path} to {os.path.join(self.output_path, 'code')}") |
| shutil.copytree(self.dataset_path, os.path.join(self.output_path, 'code'), dirs_exist_ok=True) |
| elif self.with_code: |
| print(f"Warning: Code execution expected, but dataset path '{self.dataset_path}' not found.") |
|
|
|
|
| |
| self.planned_steps = [ |
| 'Problem Background', |
| 'Problem Requirement', |
| 'Problem Analysis', |
| 'High-Level Modeling', |
| 'Task Decomposition', |
| 'Dependency Analysis' |
| ] |
|
|
| def _check_dependencies(self, step_name: str) -> bool: |
| """Checks if all prerequisites for a given step are completed.""" |
| if step_name not in self.dependencies: |
| print(f"Warning: No dependency information defined for step '{step_name}'. Assuming runnable.") |
| return True |
|
|
| prerequisites = self.dependencies.get(step_name, []) |
| for prereq in prerequisites: |
| if prereq not in self.completed_steps: |
| print(f"Dependency Error: Step '{step_name}' requires '{prereq}', which is not completed.") |
| return False |
| return True |
|
|
| def _update_planned_steps_after_decomp(self): |
| """Adds all task-specific steps to the planned steps list.""" |
| if not self.task_descriptions or self.order is None: |
| print("Error: Cannot update planned steps. Task decomposition or dependency analysis incomplete.") |
| return |
|
|
| task_step_templates = [ |
| 'Description', |
| 'Analysis', |
| 'Preliminary Formulas', |
| 'Mathematical Modeling Process', |
| 'Code' if self.with_code else None, |
| 'Solution Interpretation', |
| 'Subtask Outcome Analysis', |
| 'Charts', |
| ] |
| |
| task_step_templates = [t for t in task_step_templates if t] |
|
|
| new_task_steps = [] |
| |
| for task_id_int in self.order: |
| for template in task_step_templates: |
| new_task_steps.append(f'Task {task_id_int} {template}') |
|
|
| |
| dep_analysis_index = self.planned_steps.index('Dependency Analysis') |
| self.planned_steps = self.planned_steps[:dep_analysis_index+1] + new_task_steps |
|
|
| |
| self.paper['tasks'] = [{} for _ in range(len(self.task_descriptions))] |
|
|
|
|
| |
| def get_completed_steps(self) -> set: |
| """Returns the set of names of completed steps.""" |
| return self.completed_steps |
|
|
| def get_planned_steps(self) -> list: |
| """Returns the list of names of planned steps (including completed).""" |
| return self.planned_steps |
|
|
| def get_paper(self) -> dict: |
| """Returns the current state of the generated paper dictionary.""" |
| |
| |
| return self.paper |
|
|
| def save_paper(self, intermediate=False): |
| """Saves the current paper state to files.""" |
| filename = f"{self.name}_intermediate_{datetime.now().strftime('%Y%m%d%H%M%S')}" if intermediate else self.name |
| json_path = os.path.join(self.output_path, 'json', f"{filename}.json") |
| md_path = os.path.join(self.output_path, 'markdown', f"{filename}.md") |
| |
|
|
| write_json_file(json_path, self.paper) |
| markdown_str = json_to_markdown(self.paper) |
| write_text_file(md_path, markdown_str) |
| |
| print(f"Saved paper snapshot to {json_path} and {md_path}") |
|
|
| def save_usage(self): |
| """Saves the LLM usage statistics.""" |
| usage_path = os.path.join(self.output_path, 'usage', f"{self.name}.json") |
| write_json_file(usage_path, self.llm.get_total_usage()) |
| print(f"Saved LLM usage to {usage_path}") |
| print(f"Total Usage: {self.llm.get_total_usage()}") |
|
|
| |
|
|
| def _generate_problem_analysis(self, user_prompt: str = '', round: int = 0): |
| print("Generating: Problem Analysis") |
| self.problem_analysis = self.pa.analysis( |
| self.problem_str, |
| round=round if round > 0 else self.config.get('problem_analysis_round', 0), |
| user_prompt=user_prompt |
| ) |
| self.paper['problem_analysis'] = self.problem_analysis |
| print("Completed: Problem Analysis") |
|
|
| def _generate_high_level_modeling(self, user_prompt: str = '', round: int = 0): |
| print("Generating: High-Level Modeling") |
| |
| self.modeling_solution = self.pm.modeling( |
| self.problem_str, |
| self.problem_analysis, |
| "", |
| round=round if round > 0 else self.config.get('problem_modeling_round', 0), |
| user_prompt=user_prompt |
| ) |
| self.paper['high_level_modeling'] = self.modeling_solution |
| print("Completed: High-Level Modeling") |
|
|
| def _generate_task_decomposition(self, user_prompt: str = ''): |
| print("Generating: Task Decomposition") |
| self.task_descriptions = self.td.decompose_and_refine( |
| self.problem_str, |
| self.problem_analysis, |
| self.modeling_solution, |
| self.problem_type, |
| self.config.get('tasknum', 4), |
| user_prompt=user_prompt |
| ) |
| self.paper['task_decomposition_summary'] = "\n".join([f"Task {i+1}: {desc}" for i, desc in enumerate(self.task_descriptions)]) |
| print(f"Completed: Task Decomposition ({len(self.task_descriptions)} tasks)") |
| |
| |
|
|
| def _generate_dependency_analysis(self): |
| print("Generating: Dependency Analysis") |
| self.order = self.coordinator.analyze_dependencies( |
| self.problem_str, |
| self.problem_analysis, |
| self.modeling_solution, |
| self.task_descriptions, |
| self.with_code |
| ) |
| self.order = [int(i) for i in self.order] |
| self.paper['task_execution_order'] = self.order |
| self.paper['task_dependency_analysis'] = self.coordinator.task_dependency_analysis |
| print(f"Completed: Dependency Analysis. Execution order: {self.order}") |
| |
| self._update_planned_steps_after_decomp() |
| self._update_dependencies_after_decomp() |
| print(f"Updated planned steps: {self.planned_steps}") |
|
|
| def _generate_task_step(self, task_id: int, step_type: str, user_prompt: str = '', round: int = 0): |
| """Handles generation for a specific step within a specific task.""" |
| print(f"Generating: Task {task_id} {step_type}") |
| task_index = task_id - 1 |
|
|
| |
| if task_index >= len(self.paper['tasks']): |
| print(f"Error: Task index {task_index} out of bounds for self.paper['tasks'].") |
| return False |
|
|
| |
| task_description = self.task_descriptions[task_index] |
| |
| current_task_dict = self.paper['tasks'][task_index] |
| task_analysis = current_task_dict.get('task_analysis') |
| task_formulas = current_task_dict.get('preliminary_formulas') |
| task_modeling = current_task_dict.get('mathematical_modeling_process') |
| task_code = current_task_dict.get('task_code') |
| execution_result = current_task_dict.get('execution_result') |
| task_result = current_task_dict.get('solution_interpretation') |
|
|
|
|
| |
| task_dependency_ids = [int(i) for i in self.coordinator.DAG.get(str(task_id), [])] |
| dependency_prompt = "" |
| dependent_file_prompt = "" |
|
|
| if len(task_dependency_ids) > 0: |
| |
| rationale = "" |
| if self.coordinator.task_dependency_analysis and task_index < len(self.coordinator.task_dependency_analysis): |
| rationale = self.coordinator.task_dependency_analysis[task_index] |
| else: |
| print(f"Warning: Could not find dependency rationale for Task {task_id}") |
|
|
| dependency_prompt = f"This task is Task {task_id}, which depends on the following tasks: {task_dependency_ids}. The dependencies for this task are analyzed as follows: {rationale}\n" |
|
|
| for dep_id in task_dependency_ids: |
| dep_task_index = dep_id - 1 |
| if dep_task_index < 0 or dep_task_index >= len(self.paper['tasks']): |
| print(f"Warning: Cannot build dependency prompt. Dependent Task {dep_id} data not found.") |
| continue |
|
|
| dep_task_dict = self.paper['tasks'][dep_task_index] |
| |
| dep_mem_dict = self.coordinator.memory.get(str(dep_id), {}) |
| dep_code_mem_dict = self.coordinator.code_memory.get(str(dep_id), {}) |
|
|
|
|
| dependency_prompt += f"---\n# The Description of Task {dep_id}:\n{dep_task_dict.get('task_description', dep_mem_dict.get('task_description', 'N/A'))}\n" |
| dependency_prompt += f"# The modeling method for Task {dep_id}:\n{dep_task_dict.get('mathematical_modeling_process', dep_mem_dict.get('mathematical_modeling_process', 'N/A'))}\n" |
|
|
| if self.with_code: |
| |
| code_structure_str = json.dumps(dep_task_dict.get('code_structure', dep_code_mem_dict), indent=2) if dep_task_dict.get('code_structure', dep_code_mem_dict) else "{}" |
| dependency_prompt += f"# The structure of code for Task {dep_id}:\n{code_structure_str}\n" |
| dependency_prompt += f"# The result for Task {dep_id}:\n{dep_task_dict.get('solution_interpretation', dep_mem_dict.get('solution_interpretation', 'N/A'))}\n---\n" |
|
|
| dependent_file_prompt += f"# The files generated by code for Task {dep_id}:\n{code_structure_str}\n" |
| else: |
| dependency_prompt += f"# The result for Task {dep_id}:\n{dep_task_dict.get('solution_interpretation', dep_mem_dict.get('solution_interpretation', 'N/A'))}\n---\n" |
|
|
| |
| task_analysis_prompt = dependency_prompt + TASK_ANALYSIS_APPEND_PROMPT if step_type == 'Analysis' else dependency_prompt |
| task_formulas_prompt = dependency_prompt + TASK_FORMULAS_APPEND_PROMPT if step_type == 'Preliminary Formulas' else dependency_prompt |
| task_modeling_prompt = dependency_prompt + TASK_MODELING_APPEND_PROMPT if step_type == 'Mathematical Modeling Process' else dependency_prompt |
|
|
| |
| success = True |
| try: |
| if step_type == 'Description': |
| |
| self.paper['tasks'][task_index]['task_description'] = task_description |
| |
| if str(task_id) not in self.coordinator.memory: self.coordinator.memory[str(task_id)] = {} |
| self.coordinator.memory[str(task_id)]['task_description'] = task_description |
|
|
| elif step_type == 'Analysis': |
| task_analysis = self.task.analysis( |
| task_analysis_prompt, |
| task_description, |
| user_prompt=user_prompt |
| ) |
| self.paper['tasks'][task_index]['task_analysis'] = task_analysis |
| self.coordinator.memory[str(task_id)]['task_analysis'] = task_analysis |
|
|
|
|
| elif step_type == 'Preliminary Formulas': |
| if not task_analysis: raise ValueError(f"Task {task_id} Analysis is missing.") |
| description_and_analysis = f'## Task Description\n{task_description}\n\n## Task Analysis\n{task_analysis}' |
| top_modeling_methods = modeling_methods |
| task_formulas = self.task.formulas( |
| task_formulas_prompt, |
| self.problem.get('data_description', ''), |
| task_description, |
| task_analysis, |
| top_modeling_methods, |
| round=round if round > 0 else self.config.get('task_formulas_round', 0), |
| user_prompt=user_prompt |
| ) |
| self.paper['tasks'][task_index]['preliminary_formulas'] = task_formulas |
| self.coordinator.memory[str(task_id)]['preliminary_formulas'] = task_formulas |
|
|
|
|
| elif step_type == 'Mathematical Modeling Process': |
| if not task_analysis or not task_formulas: raise ValueError(f"Task {task_id} Analysis or Formulas missing.") |
| task_modeling = self.task.modeling( |
| task_modeling_prompt, |
| self.problem.get('data_description', ''), |
| task_description, |
| task_analysis, |
| task_formulas, |
| round=round if round > 0 else self.config.get('task_modeling_round', 0), |
| user_prompt=user_prompt |
| ) |
| self.paper['tasks'][task_index]['mathematical_modeling_process'] = task_modeling |
| self.coordinator.memory[str(task_id)]['mathematical_modeling_process'] = task_modeling |
|
|
|
|
| elif step_type == 'Code' and self.with_code: |
| if not task_analysis or not task_formulas or not task_modeling: |
| raise ValueError(f"Task {task_id} Analysis, Formulas, or Modeling missing for coding.") |
|
|
| code_template_path = os.path.join('../data/actor_data/input/code_template', f'main{task_id}.py') |
| code_template = "" |
| if os.path.exists(code_template_path): |
| with open(code_template_path, 'r') as f: |
| code_template = f.read() |
| else: |
| print(f"Warning: Code template not found at {code_template_path}. Using empty template.") |
|
|
| save_path = os.path.join(self.output_path, 'code', f'main{task_id}.py') |
| work_dir = os.path.join(self.output_path, 'code') |
| script_name = f'main{task_id}.py' |
| dataset_input_path = self.problem.get('dataset_path') or self.dataset_path |
|
|
| task_code, is_pass, execution_result = self.task.coding( |
| dataset_input_path, |
| self.problem.get('data_description', ''), |
| self.problem.get('variable_description', ''), |
| task_description, |
| task_analysis, |
| task_formulas, |
| task_modeling, |
| dependent_file_prompt, |
| code_template, |
| script_name, |
| work_dir, |
| try_num=5, |
| round=round if round > 0 else 1, |
| user_prompt=user_prompt |
| ) |
| code_structure = self.task.extract_code_structure(task_id, task_code, save_path) |
|
|
| |
| self.paper['tasks'][task_index]['task_code'] = '```Python\n' + task_code + '\n```' |
| self.paper['tasks'][task_index]['is_pass'] = is_pass |
| self.paper['tasks'][task_index]['execution_result'] = execution_result |
| self.paper['tasks'][task_index]['code_structure'] = code_structure |
| |
| self.coordinator.code_memory[str(task_id)] = code_structure |
|
|
|
|
| elif step_type == 'Solution Interpretation': |
| if not task_modeling: raise ValueError(f"Task {task_id} Modeling is missing.") |
| if self.with_code and execution_result is None: raise ValueError(f"Task {task_id} Code execution result is missing.") |
|
|
| task_result = self.task.result( |
| task_description, |
| task_analysis, |
| task_formulas, |
| task_modeling, |
| user_prompt=user_prompt, |
| execution_result=execution_result if self.with_code else '' |
| ) |
| self.paper['tasks'][task_index]['solution_interpretation'] = task_result |
| self.coordinator.memory[str(task_id)]['solution_interpretation'] = task_result |
|
|
|
|
| elif step_type == 'Subtask Outcome Analysis': |
| if not task_result: raise ValueError(f"Task {task_id} Solution Interpretation is missing.") |
| task_answer = self.task.answer( |
| task_description, |
| task_analysis, |
| task_formulas, |
| task_modeling, |
| task_result, |
| user_prompt=user_prompt |
| ) |
| self.paper['tasks'][task_index]['subtask_outcome_analysis'] = task_answer |
| self.coordinator.memory[str(task_id)]['subtask_outcome_analysis'] = task_answer |
|
|
|
|
| elif step_type == 'Charts': |
| |
| full_task_dict_str = json.dumps(self.paper['tasks'][task_index], indent=2) |
| charts = self.chart.create_charts( |
| full_task_dict_str, |
| self.config.get('chart_num', 0), |
| user_prompt=user_prompt |
| ) |
| self.paper['tasks'][task_index]['charts'] = charts |
| self.coordinator.memory[str(task_id)]['charts'] = charts |
|
|
| else: |
| print(f"Warning: Unknown step type '{step_type}' for Task {task_id}.") |
| success = False |
|
|
| except Exception as e: |
| print(f"Error generating Task {task_id} {step_type}: {e}") |
| import traceback |
| traceback.print_exc() |
| success = False |
|
|
| if success: |
| print(f"Completed: Task {task_id} {step_type}") |
| return success |
|
|
|
|
| |
|
|
| def generate_step(self, step_name: str, user_prompt: str = '', round: int = 0, force_regenerate: bool = True) -> bool: |
| """ |
| Generates the content for a specific step, checking dependencies first. |
| |
| Args: |
| step_name: The name of the step to generate (e.g., 'Problem Analysis', 'Task 1 Preliminary Formulas'). |
| user_prompt: Optional user guidance to influence the generation. |
| round: Number of improvement rounds to apply (where applicable). |
| force_regenerate: If True, regenerate the step even if it's already completed. |
| |
| Returns: |
| True if the step was generated successfully (or was already complete), False otherwise. |
| """ |
| if step_name in self.completed_steps and not force_regenerate: |
| print(f"Skipping already completed step: '{step_name}'") |
| return True |
|
|
| if step_name in self.completed_steps and force_regenerate: |
| print(f"Regenerating step: '{step_name}'") |
| |
| self.completed_steps.remove(step_name) |
|
|
| if not self._check_dependencies(step_name): |
| print(f"Cannot generate step '{step_name}' due to unmet dependencies.") |
| return False |
|
|
| |
| success = False |
| try: |
| if step_name == 'Problem Analysis': |
| self._generate_problem_analysis(user_prompt, round) |
| success = True |
| elif step_name == 'High-Level Modeling': |
| self._generate_high_level_modeling(user_prompt, round) |
| success = True |
| elif step_name == 'Task Decomposition': |
| self._generate_task_decomposition(user_prompt) |
| success = True |
| elif step_name == 'Dependency Analysis': |
| self._generate_dependency_analysis() |
| success = True |
| elif step_name.startswith('Task '): |
| |
| match = re.match(r"Task (\d+) (.*)", step_name) |
| if match: |
| task_id = int(match.group(1)) |
| step_type = match.group(2) |
| |
| if self.order and task_id in self.order: |
| success = self._generate_task_step(task_id, step_type, user_prompt, round) |
| elif not self.order: |
| print(f"Error: Cannot generate task step '{step_name}'. Task order not determined yet.") |
| success = False |
| else: |
| print(f"Error: Cannot generate task step '{step_name}'. Task ID {task_id} not found in execution order {self.order}.") |
| success = False |
| else: |
| print(f"Error: Could not parse task step name: '{step_name}'") |
| success = False |
| else: |
| |
| if step_name in ['Problem Background', 'Problem Requirement']: |
| print(f"Step '{step_name}' completed during initialization.") |
| success = True |
| else: |
| print(f"Error: Unknown step name: '{step_name}'") |
| success = False |
|
|
| if success: |
| self.completed_steps.add(step_name) |
| |
| |
| except Exception as e: |
| print(f"Critical error during generation of step '{step_name}': {e}") |
| import traceback |
| traceback.print_exc() |
| success = False |
|
|
| return success |
|
|
| def run_sequential(self, force_regenerate_all: bool = False): |
| """ |
| Runs the entire generation process sequentially, step by step. |
| |
| Args: |
| force_regenerate_all: If True, regenerate all steps even if already completed. |
| """ |
| print("Starting sequential generation...") |
| current_step_index = 0 |
| |
| |
| if force_regenerate_all: |
| print("Force regenerating all steps...") |
| self.completed_steps.clear() |
| |
| while current_step_index < len(self.planned_steps): |
| |
| if current_step_index >= len(self.planned_steps): |
| print("Reached end of planned steps.") |
| break |
|
|
| step_name = self.planned_steps[current_step_index] |
|
|
| print(f"\n--- Attempting Step: {step_name} ({current_step_index + 1}/{len(self.planned_steps)}) ---") |
|
|
| if step_name in self.completed_steps: |
| print(f"Skipping already completed step: '{step_name}'") |
| current_step_index += 1 |
| continue |
|
|
| |
| length_before = len(self.planned_steps) |
| success = self.generate_step(step_name, force_regenerate_all) |
| length_after = len(self.planned_steps) |
|
|
| if success: |
| print(f"--- Successfully completed step: '{step_name}' ---") |
| |
| |
| |
| |
| |
| current_step_index += 1 |
| else: |
| print(f"--- Failed to complete step: '{step_name}'. Stopping generation. ---") |
| break |
|
|
| print("\nSequential generation process finished.") |
| self.save_paper() |
| self.save_usage() |
| print(f"Final paper saved for run '{self.name}' in '{self.output_path}'.") |
| print(f"Completed steps: {self.completed_steps}") |
| if current_step_index < len(self.planned_steps): |
| print(f"Next planned step was: {self.planned_steps[current_step_index]}") |
|
|
| def generate_paper(self, project_dir: str): |
| |
| metadata = { |
| "team": "Agent", |
| "year": self.problem_year, |
| "problem_type": self.problem_type |
| } |
| json_file_path = f"{project_dir}/json/{self.problem_year}_{self.problem_type}.json" |
| with open(json_file_path, 'w+') as f: |
| json.dump(self.paper, f, indent=2) |
| code_dir = f'{project_dir}/code' |
| metadata['figures'] = [os.path.join(code_dir, f) for f in os.listdir(code_dir) if f.lower().split('.')[-1] in ['png', 'jpg', 'jpeg']] |
| metadata['codes'] = sorted([os.path.join(code_dir, f) for f in os.listdir(code_dir) if f.lower().split('.')[-1] in ['py']]) |
| generate_paper_from_json(self.llm, self.paper, metadata, os.path.join(project_dir, 'latex'), 'solution') |
|
|
|
|
|
|
| |
|
|
| def create_generator(name): |
| """Helper function to set up configuration and create the agent system.""" |
| config = { |
| 'top_method_num': 6, |
| 'problem_analysis_round': 0, |
| 'problem_modeling_round': 0, |
| 'task_formulas_round': 0, |
| 'tasknum': 4, |
| 'chart_num': 0, |
| 'model_name': 'gpt-4o-mini', |
| "method_name": "MM-Agent-Refactored" |
| } |
|
|
| |
| base_data_path = '../data/actor_data' |
| problem_file = os.path.join(base_data_path, 'input', 'problem', f'{name}.json') |
| dataset_input_path = os.path.join(base_data_path, 'input', 'dataset', name) |
| output_dir = os.path.join(base_data_path, 'exps', config["method_name"]) |
| |
| run_output_path = os.path.join(output_dir, f"{name}_{datetime.now().strftime('%Y%m%d-%H%M%S')}") |
|
|
| if not os.path.exists(problem_file): |
| print(f"Error: Problem file not found at {problem_file}") |
| return None |
|
|
| |
| |
| |
|
|
| multi_agent = ModelingAgentSystem( |
| problem_path=problem_file, |
| config=config, |
| dataset_path=dataset_input_path, |
| output_path=run_output_path, |
| name=name |
| ) |
| return multi_agent |
|
|
|
|
| if __name__ == "__main__": |
| problem_name = "2024_C" |
| agent_system = create_generator(problem_name) |
|
|
| if agent_system: |
| |
| agent_system.run_sequential() |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|