from langgraph.graph import StateGraph, END from langchain.schema import BaseMessage, HumanMessage, AIMessage from typing import TypedDict, List, Dict, Any import json from llm_client import LLMClient from soil_analyzer import SoilLayerAnalyzer class AgentState(TypedDict): messages: List[BaseMessage] soil_data: Dict[str, Any] analysis_results: Dict[str, Any] user_feedback: str current_task: str iteration_count: int text_content: str image_base64: str class SoilAnalysisAgent: def __init__(self): # Initialize with None client - will be set when needed self.llm_client = None self.soil_analyzer = SoilLayerAnalyzer() self.graph = self._create_graph() def _create_graph(self): """Create the LangGraph workflow""" workflow = StateGraph(AgentState) # Add nodes workflow.add_node("analyze_document", self._analyze_document) workflow.add_node("validate_layers", self._validate_layers) workflow.add_node("optimize_layers", self._optimize_layers) workflow.add_node("generate_insights", self._generate_insights) workflow.add_node("handle_feedback", self._handle_feedback) # Add edges workflow.add_edge("analyze_document", "validate_layers") workflow.add_edge("validate_layers", "optimize_layers") workflow.add_edge("optimize_layers", "generate_insights") workflow.add_conditional_edges( "generate_insights", self._should_handle_feedback, { "feedback": "handle_feedback", "end": END } ) workflow.add_edge("handle_feedback", "validate_layers") # Set entry point workflow.set_entry_point("analyze_document") return workflow.compile() def _analyze_document(self, state: AgentState) -> AgentState: """Analyze the soil boring log document""" # Extract document content from state document_content = state.get("text_content") image_content = state.get("image_base64") # Analyze using LLM soil_data = self.llm_client.analyze_soil_boring_log( text_content=document_content, image_base64=image_content ) state["soil_data"] = soil_data state["current_task"] = "document_analysis" state["messages"].append(AIMessage(content="Document analysis completed")) return state def _validate_layers(self, state: AgentState) -> AgentState: """Validate soil layer continuity and consistency""" soil_data = state["soil_data"] if "soil_layers" in soil_data: # Validate layer continuity validated_layers = self.soil_analyzer.validate_layer_continuity( soil_data["soil_layers"] ) soil_data["soil_layers"] = validated_layers # Calculate statistics stats = self.soil_analyzer.calculate_layer_statistics(validated_layers) state["analysis_results"] = {"validation_stats": stats} state["current_task"] = "layer_validation" state["messages"].append(AIMessage(content="Layer validation completed")) return state def _optimize_layers(self, state: AgentState) -> AgentState: """Optimize layer division by merging/splitting as needed""" soil_data = state["soil_data"] if "soil_layers" in soil_data: optimization_results = self.soil_analyzer.optimize_layer_division( soil_data["soil_layers"] ) state["analysis_results"]["optimization"] = optimization_results state["current_task"] = "layer_optimization" state["messages"].append(AIMessage(content="Layer optimization completed")) return state def _generate_insights(self, state: AgentState) -> AgentState: """Generate insights and recommendations""" soil_data = state["soil_data"] analysis_results = state["analysis_results"] # Generate insights using LLM insights_prompt = f""" Based on the soil boring log analysis, provide geotechnical insights and recommendations: Soil Data: {json.dumps(soil_data, indent=2)} Analysis Results: {json.dumps(analysis_results, indent=2)} Please provide: 1. Key geotechnical findings 2. Foundation recommendations 3. Construction considerations 4. Potential risks or concerns 5. Recommended additional testing """ try: response = self.llm_client.client.chat.completions.create( model=self.llm_client.model, messages=[{"role": "user", "content": insights_prompt}], max_tokens=1000, temperature=0.3 ) insights = response.choices[0].message.content state["analysis_results"]["insights"] = insights except Exception as e: state["analysis_results"]["insights"] = f"Error generating insights: {str(e)}" state["current_task"] = "insight_generation" state["messages"].append(AIMessage(content="Insights generation completed")) return state def _handle_feedback(self, state: AgentState) -> AgentState: """Handle user feedback and refine analysis""" user_feedback = state.get("user_feedback", "") soil_data = state["soil_data"] if user_feedback: # Refine soil layers based on feedback refined_data = self.llm_client.refine_soil_layers(soil_data, user_feedback) if "error" not in refined_data: state["soil_data"] = refined_data state["current_task"] = "feedback_handling" state["iteration_count"] = state.get("iteration_count", 0) + 1 state["messages"].append(AIMessage(content=f"Feedback processed (iteration {state['iteration_count']})")) return state def _should_handle_feedback(self, state: AgentState) -> str: """Determine if feedback should be handled""" if state.get("user_feedback") and state.get("iteration_count", 0) < 3: return "feedback" return "end" def run_analysis(self, text_content=None, image_base64=None, user_feedback=None): """Run the complete soil analysis workflow""" # Prepare initial state - store content in state instead of message initial_message = HumanMessage(content="Starting soil boring log analysis") initial_state = { "messages": [initial_message], "soil_data": {}, "analysis_results": {}, "user_feedback": user_feedback or "", "current_task": "initialization", "iteration_count": 0, "text_content": text_content, "image_base64": image_base64 } # Run the graph result = self.graph.invoke(initial_state) return { "soil_data": result["soil_data"], "analysis_results": result["analysis_results"], "messages": result["messages"], "current_task": result["current_task"], "iteration_count": result["iteration_count"] } def process_feedback(self, current_state, feedback): """Process user feedback and continue analysis""" current_state["user_feedback"] = feedback # Continue from feedback handling result = self.graph.invoke(current_state, {"recursion_limit": 10}) return { "soil_data": result["soil_data"], "analysis_results": result["analysis_results"], "messages": result["messages"], "current_task": result["current_task"], "iteration_count": result["iteration_count"] }