Spaces:
Sleeping
Sleeping
from langgraph.graph import StateGraph, END | |
from langchain.schema import BaseMessage, HumanMessage, AIMessage | |
from typing import TypedDict, List, Dict, Any | |
import json | |
from llm_client import LLMClient | |
from soil_analyzer import SoilLayerAnalyzer | |
class AgentState(TypedDict): | |
messages: List[BaseMessage] | |
soil_data: Dict[str, Any] | |
analysis_results: Dict[str, Any] | |
user_feedback: str | |
current_task: str | |
iteration_count: int | |
text_content: str | |
image_base64: str | |
class SoilAnalysisAgent: | |
def __init__(self): | |
# Initialize with None client - will be set when needed | |
self.llm_client = None | |
self.soil_analyzer = SoilLayerAnalyzer() | |
self.graph = self._create_graph() | |
def _create_graph(self): | |
"""Create the LangGraph workflow""" | |
workflow = StateGraph(AgentState) | |
# Add nodes | |
workflow.add_node("analyze_document", self._analyze_document) | |
workflow.add_node("validate_layers", self._validate_layers) | |
workflow.add_node("optimize_layers", self._optimize_layers) | |
workflow.add_node("generate_insights", self._generate_insights) | |
workflow.add_node("handle_feedback", self._handle_feedback) | |
# Add edges | |
workflow.add_edge("analyze_document", "validate_layers") | |
workflow.add_edge("validate_layers", "optimize_layers") | |
workflow.add_edge("optimize_layers", "generate_insights") | |
workflow.add_conditional_edges( | |
"generate_insights", | |
self._should_handle_feedback, | |
{ | |
"feedback": "handle_feedback", | |
"end": END | |
} | |
) | |
workflow.add_edge("handle_feedback", "validate_layers") | |
# Set entry point | |
workflow.set_entry_point("analyze_document") | |
return workflow.compile() | |
def _analyze_document(self, state: AgentState) -> AgentState: | |
"""Analyze the soil boring log document""" | |
# Extract document content from state | |
document_content = state.get("text_content") | |
image_content = state.get("image_base64") | |
# Analyze using LLM | |
soil_data = self.llm_client.analyze_soil_boring_log( | |
text_content=document_content, | |
image_base64=image_content | |
) | |
state["soil_data"] = soil_data | |
state["current_task"] = "document_analysis" | |
state["messages"].append(AIMessage(content="Document analysis completed")) | |
return state | |
def _validate_layers(self, state: AgentState) -> AgentState: | |
"""Validate soil layer continuity and consistency""" | |
soil_data = state["soil_data"] | |
if "soil_layers" in soil_data: | |
# Validate layer continuity | |
validated_layers = self.soil_analyzer.validate_layer_continuity( | |
soil_data["soil_layers"] | |
) | |
soil_data["soil_layers"] = validated_layers | |
# Calculate statistics | |
stats = self.soil_analyzer.calculate_layer_statistics(validated_layers) | |
state["analysis_results"] = {"validation_stats": stats} | |
state["current_task"] = "layer_validation" | |
state["messages"].append(AIMessage(content="Layer validation completed")) | |
return state | |
def _optimize_layers(self, state: AgentState) -> AgentState: | |
"""Optimize layer division by merging/splitting as needed""" | |
soil_data = state["soil_data"] | |
if "soil_layers" in soil_data: | |
optimization_results = self.soil_analyzer.optimize_layer_division( | |
soil_data["soil_layers"] | |
) | |
state["analysis_results"]["optimization"] = optimization_results | |
state["current_task"] = "layer_optimization" | |
state["messages"].append(AIMessage(content="Layer optimization completed")) | |
return state | |
def _generate_insights(self, state: AgentState) -> AgentState: | |
"""Generate insights and recommendations""" | |
soil_data = state["soil_data"] | |
analysis_results = state["analysis_results"] | |
# Generate insights using LLM | |
insights_prompt = f""" | |
Based on the soil boring log analysis, provide geotechnical insights and recommendations: | |
Soil Data: {json.dumps(soil_data, indent=2)} | |
Analysis Results: {json.dumps(analysis_results, indent=2)} | |
Please provide: | |
1. Key geotechnical findings | |
2. Foundation recommendations | |
3. Construction considerations | |
4. Potential risks or concerns | |
5. Recommended additional testing | |
""" | |
try: | |
response = self.llm_client.client.chat.completions.create( | |
model=self.llm_client.model, | |
messages=[{"role": "user", "content": insights_prompt}], | |
max_tokens=1000, | |
temperature=0.3 | |
) | |
insights = response.choices[0].message.content | |
state["analysis_results"]["insights"] = insights | |
except Exception as e: | |
state["analysis_results"]["insights"] = f"Error generating insights: {str(e)}" | |
state["current_task"] = "insight_generation" | |
state["messages"].append(AIMessage(content="Insights generation completed")) | |
return state | |
def _handle_feedback(self, state: AgentState) -> AgentState: | |
"""Handle user feedback and refine analysis""" | |
user_feedback = state.get("user_feedback", "") | |
soil_data = state["soil_data"] | |
if user_feedback: | |
# Refine soil layers based on feedback | |
refined_data = self.llm_client.refine_soil_layers(soil_data, user_feedback) | |
if "error" not in refined_data: | |
state["soil_data"] = refined_data | |
state["current_task"] = "feedback_handling" | |
state["iteration_count"] = state.get("iteration_count", 0) + 1 | |
state["messages"].append(AIMessage(content=f"Feedback processed (iteration {state['iteration_count']})")) | |
return state | |
def _should_handle_feedback(self, state: AgentState) -> str: | |
"""Determine if feedback should be handled""" | |
if state.get("user_feedback") and state.get("iteration_count", 0) < 3: | |
return "feedback" | |
return "end" | |
def run_analysis(self, text_content=None, image_base64=None, user_feedback=None): | |
"""Run the complete soil analysis workflow""" | |
# Prepare initial state - store content in state instead of message | |
initial_message = HumanMessage(content="Starting soil boring log analysis") | |
initial_state = { | |
"messages": [initial_message], | |
"soil_data": {}, | |
"analysis_results": {}, | |
"user_feedback": user_feedback or "", | |
"current_task": "initialization", | |
"iteration_count": 0, | |
"text_content": text_content, | |
"image_base64": image_base64 | |
} | |
# Run the graph | |
result = self.graph.invoke(initial_state) | |
return { | |
"soil_data": result["soil_data"], | |
"analysis_results": result["analysis_results"], | |
"messages": result["messages"], | |
"current_task": result["current_task"], | |
"iteration_count": result["iteration_count"] | |
} | |
def process_feedback(self, current_state, feedback): | |
"""Process user feedback and continue analysis""" | |
current_state["user_feedback"] = feedback | |
# Continue from feedback handling | |
result = self.graph.invoke(current_state, {"recursion_limit": 10}) | |
return { | |
"soil_data": result["soil_data"], | |
"analysis_results": result["analysis_results"], | |
"messages": result["messages"], | |
"current_task": result["current_task"], | |
"iteration_count": result["iteration_count"] | |
} |