soil_profile / langgraph_agent.py
Sompote's picture
Upload 17 files
2c200f8 verified
from langgraph.graph import StateGraph, END
from langchain.schema import BaseMessage, HumanMessage, AIMessage
from typing import TypedDict, List, Dict, Any
import json
from llm_client import LLMClient
from soil_analyzer import SoilLayerAnalyzer
class AgentState(TypedDict):
messages: List[BaseMessage]
soil_data: Dict[str, Any]
analysis_results: Dict[str, Any]
user_feedback: str
current_task: str
iteration_count: int
text_content: str
image_base64: str
class SoilAnalysisAgent:
def __init__(self):
# Initialize with None client - will be set when needed
self.llm_client = None
self.soil_analyzer = SoilLayerAnalyzer()
self.graph = self._create_graph()
def _create_graph(self):
"""Create the LangGraph workflow"""
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("analyze_document", self._analyze_document)
workflow.add_node("validate_layers", self._validate_layers)
workflow.add_node("optimize_layers", self._optimize_layers)
workflow.add_node("generate_insights", self._generate_insights)
workflow.add_node("handle_feedback", self._handle_feedback)
# Add edges
workflow.add_edge("analyze_document", "validate_layers")
workflow.add_edge("validate_layers", "optimize_layers")
workflow.add_edge("optimize_layers", "generate_insights")
workflow.add_conditional_edges(
"generate_insights",
self._should_handle_feedback,
{
"feedback": "handle_feedback",
"end": END
}
)
workflow.add_edge("handle_feedback", "validate_layers")
# Set entry point
workflow.set_entry_point("analyze_document")
return workflow.compile()
def _analyze_document(self, state: AgentState) -> AgentState:
"""Analyze the soil boring log document"""
# Extract document content from state
document_content = state.get("text_content")
image_content = state.get("image_base64")
# Analyze using LLM
soil_data = self.llm_client.analyze_soil_boring_log(
text_content=document_content,
image_base64=image_content
)
state["soil_data"] = soil_data
state["current_task"] = "document_analysis"
state["messages"].append(AIMessage(content="Document analysis completed"))
return state
def _validate_layers(self, state: AgentState) -> AgentState:
"""Validate soil layer continuity and consistency"""
soil_data = state["soil_data"]
if "soil_layers" in soil_data:
# Validate layer continuity
validated_layers = self.soil_analyzer.validate_layer_continuity(
soil_data["soil_layers"]
)
soil_data["soil_layers"] = validated_layers
# Calculate statistics
stats = self.soil_analyzer.calculate_layer_statistics(validated_layers)
state["analysis_results"] = {"validation_stats": stats}
state["current_task"] = "layer_validation"
state["messages"].append(AIMessage(content="Layer validation completed"))
return state
def _optimize_layers(self, state: AgentState) -> AgentState:
"""Optimize layer division by merging/splitting as needed"""
soil_data = state["soil_data"]
if "soil_layers" in soil_data:
optimization_results = self.soil_analyzer.optimize_layer_division(
soil_data["soil_layers"]
)
state["analysis_results"]["optimization"] = optimization_results
state["current_task"] = "layer_optimization"
state["messages"].append(AIMessage(content="Layer optimization completed"))
return state
def _generate_insights(self, state: AgentState) -> AgentState:
"""Generate insights and recommendations"""
soil_data = state["soil_data"]
analysis_results = state["analysis_results"]
# Generate insights using LLM
insights_prompt = f"""
Based on the soil boring log analysis, provide geotechnical insights and recommendations:
Soil Data: {json.dumps(soil_data, indent=2)}
Analysis Results: {json.dumps(analysis_results, indent=2)}
Please provide:
1. Key geotechnical findings
2. Foundation recommendations
3. Construction considerations
4. Potential risks or concerns
5. Recommended additional testing
"""
try:
response = self.llm_client.client.chat.completions.create(
model=self.llm_client.model,
messages=[{"role": "user", "content": insights_prompt}],
max_tokens=1000,
temperature=0.3
)
insights = response.choices[0].message.content
state["analysis_results"]["insights"] = insights
except Exception as e:
state["analysis_results"]["insights"] = f"Error generating insights: {str(e)}"
state["current_task"] = "insight_generation"
state["messages"].append(AIMessage(content="Insights generation completed"))
return state
def _handle_feedback(self, state: AgentState) -> AgentState:
"""Handle user feedback and refine analysis"""
user_feedback = state.get("user_feedback", "")
soil_data = state["soil_data"]
if user_feedback:
# Refine soil layers based on feedback
refined_data = self.llm_client.refine_soil_layers(soil_data, user_feedback)
if "error" not in refined_data:
state["soil_data"] = refined_data
state["current_task"] = "feedback_handling"
state["iteration_count"] = state.get("iteration_count", 0) + 1
state["messages"].append(AIMessage(content=f"Feedback processed (iteration {state['iteration_count']})"))
return state
def _should_handle_feedback(self, state: AgentState) -> str:
"""Determine if feedback should be handled"""
if state.get("user_feedback") and state.get("iteration_count", 0) < 3:
return "feedback"
return "end"
def run_analysis(self, text_content=None, image_base64=None, user_feedback=None):
"""Run the complete soil analysis workflow"""
# Prepare initial state - store content in state instead of message
initial_message = HumanMessage(content="Starting soil boring log analysis")
initial_state = {
"messages": [initial_message],
"soil_data": {},
"analysis_results": {},
"user_feedback": user_feedback or "",
"current_task": "initialization",
"iteration_count": 0,
"text_content": text_content,
"image_base64": image_base64
}
# Run the graph
result = self.graph.invoke(initial_state)
return {
"soil_data": result["soil_data"],
"analysis_results": result["analysis_results"],
"messages": result["messages"],
"current_task": result["current_task"],
"iteration_count": result["iteration_count"]
}
def process_feedback(self, current_state, feedback):
"""Process user feedback and continue analysis"""
current_state["user_feedback"] = feedback
# Continue from feedback handling
result = self.graph.invoke(current_state, {"recursion_limit": 10})
return {
"soil_data": result["soil_data"],
"analysis_results": result["analysis_results"],
"messages": result["messages"],
"current_task": result["current_task"],
"iteration_count": result["iteration_count"]
}