Spaces:
Sleeping
Sleeping
""" | |
Unified Soil Analysis Workflow using LangGraph | |
Combines LLM classification and SS/ST processing into a single controlled workflow | |
""" | |
import json | |
from typing import Dict, List, Any, Optional, TypedDict, Annotated | |
import streamlit as st | |
from langgraph.graph import StateGraph, START, END | |
from langgraph.graph.message import add_messages | |
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage | |
import openai | |
from soil_classification import SoilClassificationProcessor | |
from soil_calculations import SoilCalculations | |
from config import LLM_PROVIDERS, AVAILABLE_MODELS, get_default_provider_and_model, get_api_key | |
class SoilAnalysisState(TypedDict): | |
"""State for the unified soil analysis workflow""" | |
# Input data | |
text_content: Optional[str] | |
image_base64: Optional[str] | |
model: str | |
api_key: str | |
# Processing flags | |
merge_similar: bool | |
split_thick: bool | |
# LLM Analysis results | |
raw_llm_response: Optional[str] | |
llm_extraction_success: bool | |
extraction_errors: List[str] | |
retry_count: int # Add retry counter | |
# Soil data (from LLM) | |
project_info: Dict[str, Any] | |
raw_soil_layers: List[Dict[str, Any]] | |
water_table: Dict[str, Any] | |
notes: str | |
# Processing results | |
processed_layers: List[Dict[str, Any]] | |
processing_summary: Dict[str, Any] | |
validation_stats: Dict[str, Any] | |
optimization_results: Dict[str, Any] | |
# Final output | |
final_soil_data: Dict[str, Any] | |
workflow_status: str | |
workflow_messages: Annotated[List[BaseMessage], add_messages] | |
class UnifiedSoilWorkflow: | |
""" | |
Unified LangGraph workflow for soil analysis | |
Combines LLM extraction and SS/ST processing into one controlled flow | |
""" | |
def __init__(self): | |
self.soil_processor = SoilClassificationProcessor() | |
self.soil_calculator = SoilCalculations() | |
self.workflow = self._build_workflow() | |
def _get_provider_from_model(self, model: str) -> str: | |
"""Determine provider from model name""" | |
for model_id, model_info in AVAILABLE_MODELS.items(): | |
if model_id == model: | |
# Return the first provider that supports this model | |
providers = model_info.get("providers", []) | |
if providers: | |
return providers[0] | |
# Default fallback logic based on model prefix | |
if model.startswith("anthropic/"): | |
return "anthropic" | |
elif model.startswith("google/"): | |
return "google" | |
else: | |
return "openrouter" # Default to OpenRouter for other models | |
def _build_workflow(self) -> StateGraph: | |
"""Build the unified LangGraph workflow""" | |
# Create workflow graph | |
workflow = StateGraph(SoilAnalysisState) | |
# Add nodes | |
workflow.add_node("validate_inputs", self._validate_inputs) | |
workflow.add_node("extract_with_llm", self._extract_with_llm) | |
workflow.add_node("validate_extraction", self._validate_extraction) | |
workflow.add_node("process_ss_st_classification", self._process_ss_st_classification) | |
workflow.add_node("apply_unit_conversions", self._apply_unit_conversions) | |
workflow.add_node("validate_soil_classification", self._validate_soil_classification) | |
workflow.add_node("calculate_parameters", self._calculate_parameters) | |
workflow.add_node("optimize_layers", self._optimize_layers) | |
workflow.add_node("finalize_results", self._finalize_results) | |
workflow.add_node("handle_errors", self._handle_errors) | |
# Define workflow edges | |
workflow.add_edge(START, "validate_inputs") | |
# Conditional routing based on validation | |
workflow.add_conditional_edges( | |
"validate_inputs", | |
self._should_continue_after_validation, | |
{ | |
"continue": "extract_with_llm", | |
"error": "handle_errors" | |
} | |
) | |
workflow.add_edge("extract_with_llm", "validate_extraction") | |
# Simplified routing - no retry loop to prevent recursion | |
workflow.add_conditional_edges( | |
"validate_extraction", | |
self._should_continue_after_extraction, | |
{ | |
"continue": "process_ss_st_classification", | |
"error": "handle_errors" | |
} | |
) | |
workflow.add_edge("process_ss_st_classification", "apply_unit_conversions") | |
workflow.add_edge("apply_unit_conversions", "validate_soil_classification") | |
workflow.add_edge("validate_soil_classification", "calculate_parameters") | |
workflow.add_edge("calculate_parameters", "optimize_layers") | |
workflow.add_edge("finalize_results", END) | |
workflow.add_edge("optimize_layers", "finalize_results") | |
workflow.add_edge("handle_errors", END) | |
return workflow.compile() | |
def _validate_inputs(self, state: SoilAnalysisState) -> SoilAnalysisState: | |
"""Validate input data and configuration""" | |
st.info("π Step 1: Validating inputs...") | |
errors = [] | |
# Validate API key | |
if not state.get("api_key"): | |
errors.append("No API key provided") | |
# Validate content | |
if not state.get("text_content") and not state.get("image_base64"): | |
errors.append("No text or image content provided") | |
# Validate model (allow custom models not in AVAILABLE_MODELS) | |
_, default_model = get_default_provider_and_model() | |
model = state.get("model", default_model) | |
if not model or not isinstance(model, str): | |
errors.append(f"Invalid model format: {model}") | |
elif model not in AVAILABLE_MODELS: | |
# Allow custom models - just log info | |
st.info(f"π Using custom model: {model} (not in pre-configured list)") | |
if errors: | |
state["extraction_errors"] = errors | |
state["workflow_status"] = "validation_failed" | |
state["workflow_messages"] = [HumanMessage(content=f"Validation errors: {', '.join(errors)}")] | |
else: | |
state["workflow_status"] = "validated" | |
state["workflow_messages"] = [HumanMessage(content="Input validation passed")] | |
st.success("β Input validation passed") | |
return state | |
def _extract_with_llm(self, state: SoilAnalysisState) -> SoilAnalysisState: | |
"""Extract soil data using LLM with enhanced prompts""" | |
retry_count = state.get("retry_count", 0) | |
st.info(f"π€ Step 2: Extracting soil data with LLM... (attempt {retry_count + 1})") | |
try: | |
# Determine provider and base URL from model | |
provider_id = self._get_provider_from_model(state["model"]) | |
base_url = LLM_PROVIDERS[provider_id]["base_url"] | |
# Initialize OpenAI client with correct provider | |
client = openai.OpenAI( | |
base_url=base_url, | |
api_key=state["api_key"] | |
) | |
# Enhanced system prompt with all requirements - use safer version for Gemini | |
if "gemini" in state["model"].lower(): | |
system_prompt = self._get_gemini_safe_prompt() | |
st.info("π§ Using Gemini-optimized prompt to avoid content filtering") | |
else: | |
system_prompt = self._get_unified_system_prompt() | |
# Build messages | |
messages = [{"role": "system", "content": system_prompt}] | |
# Add content | |
if state.get("text_content"): | |
messages.append({ | |
"role": "user", | |
"content": f"Please analyze this soil boring log text:\n\n{state['text_content']}" | |
}) | |
# Add image if supported and available | |
model_info = AVAILABLE_MODELS.get(state["model"], {}) | |
# For custom models, assume image support (user responsibility) | |
supports_images = model_info.get('supports_images', True) if state["model"] not in AVAILABLE_MODELS else model_info.get('supports_images', False) | |
if state.get("image_base64") and supports_images: | |
messages.append({ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": "Please analyze this soil boring log image:"}, | |
{ | |
"type": "image_url", | |
"image_url": {"url": f"data:image/png;base64,{state['image_base64']}"} | |
} | |
] | |
}) | |
# Call LLM with detailed error handling | |
st.info(f"π Making API call to {state['model']}...") | |
st.info(f"π Message count: {len(messages)}, Max tokens: 3000") | |
try: | |
response = client.chat.completions.create( | |
model=state["model"], | |
messages=messages, | |
max_tokens=3000, | |
temperature=0.1 | |
) | |
# Debug response structure | |
st.info(f"π Response received - Choices count: {len(response.choices) if response and response.choices else 0}") | |
# Check if response is valid | |
if not response or not response.choices: | |
raise Exception("No response received from LLM API") | |
raw_response = response.choices[0].message.content | |
# Debug response content | |
if raw_response is None: | |
raise Exception("Response content is None") | |
elif not raw_response.strip(): | |
# Check if it's just whitespace/newlines | |
if len(raw_response) > 0: | |
whitespace_chars = [repr(c) for c in raw_response[:10]] | |
raise Exception(f"Response contains only whitespace (length: {len(raw_response)}, chars: {whitespace_chars})") | |
else: | |
raise Exception("Completely empty response from LLM API") | |
# Check for very short responses that might indicate filtering | |
elif len(raw_response.strip()) < 10: | |
st.warning(f"β οΈ Very short response ({len(raw_response)} chars): '{raw_response[:50]}'") | |
st.info("π‘ This might indicate content filtering. Try a simpler prompt or different model.") | |
state["raw_llm_response"] = raw_response | |
st.success(f"π₯ Received response: {len(raw_response)} characters") | |
except Exception as api_error: | |
# Enhanced API error handling | |
error_msg = str(api_error) | |
st.error(f"β API call failed: {error_msg}") | |
# Check if it's a model-specific issue | |
if "not a valid model ID" in error_msg: | |
st.error(f"π« Model '{state['model']}' is not available on OpenRouter") | |
st.info("π‘ Try using a different model like 'anthropic/claude-sonnet-4'") | |
elif "rate limit" in error_msg.lower(): | |
st.error("β° Rate limit exceeded. Please wait and try again.") | |
elif "empty" in error_msg.lower() or "none" in error_msg.lower(): | |
st.error("π Model returned empty response. This might be due to:") | |
st.info(" β’ Content filtering by the model") | |
st.info(" β’ Model configuration issues") | |
st.info(" β’ Input content triggering safety filters") | |
st.info("π‘ Try a different model or simpler input text") | |
raise api_error | |
# Parse JSON response with enhanced error handling | |
soil_data = self._parse_llm_response(raw_response) | |
if "error" in soil_data: | |
state["llm_extraction_success"] = False | |
state["extraction_errors"] = [soil_data["error"]] | |
state["workflow_status"] = "extraction_failed" | |
st.error(f"β JSON parsing failed: {soil_data['error']}") | |
else: | |
# Validate that we have basic required data | |
layers = soil_data.get("soil_layers", []) | |
if not layers: | |
state["llm_extraction_success"] = False | |
state["extraction_errors"] = ["No soil layers found in LLM response"] | |
state["workflow_status"] = "extraction_failed" | |
st.error("β No soil layers found in LLM response") | |
else: | |
state["llm_extraction_success"] = True | |
state["project_info"] = soil_data.get("project_info", {}) | |
state["raw_soil_layers"] = layers | |
state["water_table"] = soil_data.get("water_table", {}) | |
state["notes"] = soil_data.get("notes", "") | |
state["workflow_status"] = "extracted" | |
st.success(f"β LLM extraction completed: {len(layers)} layers found") | |
except Exception as e: | |
state["llm_extraction_success"] = False | |
state["extraction_errors"] = [str(e)] | |
state["workflow_status"] = "extraction_error" | |
st.error(f"β LLM extraction failed: {str(e)}") | |
state["workflow_messages"] = state.get("workflow_messages", []) + [ | |
AIMessage(content=f"LLM extraction: {'success' if state['llm_extraction_success'] else 'failed'}") | |
] | |
return state | |
def _validate_extraction(self, state: SoilAnalysisState) -> SoilAnalysisState: | |
"""Validate LLM extraction results""" | |
st.info("π Step 3: Validating extraction results...") | |
if not state["llm_extraction_success"]: | |
return state | |
validation_errors = [] | |
# Check for required data | |
if not state["raw_soil_layers"]: | |
validation_errors.append("No soil layers extracted") | |
# Validate layer structure | |
for i, layer in enumerate(state["raw_soil_layers"]): | |
if "depth_from" not in layer or "depth_to" not in layer: | |
validation_errors.append(f"Layer {i+1}: Missing depth information") | |
if "soil_type" not in layer: | |
validation_errors.append(f"Layer {i+1}: Missing soil type") | |
if validation_errors: | |
state["extraction_errors"] = validation_errors | |
state["workflow_status"] = "extraction_failed" # Use consistent status name | |
st.warning(f"β οΈ Validation issues found: {len(validation_errors)} errors") | |
else: | |
state["workflow_status"] = "extraction_validated" | |
st.success("β Extraction validation passed") | |
return state | |
def _process_ss_st_classification(self, state: SoilAnalysisState) -> SoilAnalysisState: | |
"""Process SS/ST sample classification""" | |
st.info("π§ͺ Step 4: Processing SS/ST sample classification...") | |
try: | |
processed_layers = self.soil_processor.process_soil_layers(state["raw_soil_layers"]) | |
state["processed_layers"] = processed_layers | |
state["workflow_status"] = "ss_st_processed" | |
st.success(f"β SS/ST processing completed: {len(processed_layers)} layers processed") | |
except Exception as e: | |
state["extraction_errors"] = state.get("extraction_errors", []) + [f"SS/ST processing error: {str(e)}"] | |
state["workflow_status"] = "ss_st_error" | |
st.error(f"β SS/ST processing failed: {str(e)}") | |
return state | |
def _apply_unit_conversions(self, state: SoilAnalysisState) -> SoilAnalysisState: | |
"""Apply unit conversions to all measurements""" | |
st.info("π§ Step 5: Applying unit conversions...") | |
try: | |
converted_layers = [] | |
unit_warnings = [] | |
for layer in state["processed_layers"]: | |
converted_layer = self.soil_processor._convert_to_si_units(layer) | |
converted_layers.append(converted_layer) | |
# Collect unit validation warnings | |
if converted_layer.get('unit_validation_warning'): | |
unit_warnings.append(f"Layer {layer.get('layer_id', '?')}: {converted_layer['unit_validation_warning']}") | |
state["processed_layers"] = converted_layers | |
state["workflow_status"] = "units_converted" | |
# Track different types of validation issues | |
unit_errors = [] | |
recheck_needed = [] | |
critical_errors = [] | |
for layer in converted_layers: | |
validation_warning = layer.get('unit_validation_warning', '') | |
if validation_warning: | |
layer_id = layer.get('layer_id', '?') | |
# Check if this layer needs image recheck | |
if hasattr(self.soil_processor, '_validate_su_with_water_content'): | |
detailed_validation = self.soil_processor._validate_su_with_water_content(layer) | |
if detailed_validation.get('critical_unit_error'): | |
critical_errors.append(f"Layer {layer_id}: {detailed_validation.get('suggested_conversion', 'Unit error')}") | |
if detailed_validation.get('recheck_image'): | |
recheck_needed.append(f"Layer {layer_id}: {validation_warning}") | |
else: | |
unit_errors.append(f"Layer {layer_id}: {validation_warning}") | |
# Display different types of issues with appropriate severity | |
if critical_errors: | |
st.error("π¨ CRITICAL UNIT CONVERSION ERRORS DETECTED:") | |
for error in critical_errors: | |
st.error(f" β’ {error}") | |
st.error("β οΈ These values appear to be in wrong units - conversion may be needed!") | |
if recheck_needed: | |
st.warning("π· IMAGE RECHECK RECOMMENDED:") | |
for recheck in recheck_needed: | |
st.warning(f" β’ {recheck}") | |
st.info("π‘ Su-water content values seem inconsistent - consider reloading the image") | |
if unit_errors: | |
st.warning("β οΈ Su-water content validation issues:") | |
for error in unit_errors: | |
st.info(f" β’ {error}") | |
# Store all warnings for later reference | |
all_warnings = critical_errors + recheck_needed + unit_errors | |
if all_warnings: | |
state["unit_validation_warnings"] = all_warnings | |
state["needs_image_recheck"] = len(recheck_needed) > 0 | |
state["has_critical_unit_errors"] = len(critical_errors) > 0 | |
# Add to final results for user action | |
state["validation_recommendations"] = { | |
"critical_unit_errors": critical_errors, | |
"recheck_image": recheck_needed, | |
"general_warnings": unit_errors | |
} | |
else: | |
st.success("β Unit conversions applied - all Su-water content correlations look reasonable") | |
except Exception as e: | |
state["extraction_errors"] = state.get("extraction_errors", []) + [f"Unit conversion error: {str(e)}"] | |
state["workflow_status"] = "conversion_error" | |
st.error(f"β Unit conversion failed: {str(e)}") | |
return state | |
def _validate_soil_classification(self, state: SoilAnalysisState) -> SoilAnalysisState: | |
"""Validate soil classification with sieve analysis requirements""" | |
st.info("π― Step 6: Validating soil classification...") | |
try: | |
validated_layers = [] | |
classification_warnings = [] | |
for layer in state["processed_layers"]: | |
# Apply enhanced soil classification validation | |
validated_layer = layer.copy() | |
# Re-classify with strict sieve analysis requirements | |
soil_type = self.soil_processor._classify_soil_type(validated_layer) | |
validated_layer["soil_type"] = soil_type | |
# Track classification changes | |
if layer.get("soil_type") != soil_type: | |
classification_warnings.append( | |
f"Layer {layer.get('layer_id', '?')}: Changed from '{layer.get('soil_type')}' to '{soil_type}'" | |
) | |
validated_layers.append(validated_layer) | |
state["processed_layers"] = validated_layers | |
state["workflow_status"] = "classification_validated" | |
if classification_warnings: | |
st.warning(f"β οΈ Classification changes: {len(classification_warnings)} layers updated") | |
for warning in classification_warnings: | |
st.info(f" β’ {warning}") | |
else: | |
st.success("β Soil classification validation passed") | |
except Exception as e: | |
state["extraction_errors"] = state.get("extraction_errors", []) + [f"Classification validation error: {str(e)}"] | |
state["workflow_status"] = "classification_error" | |
st.error(f"β Classification validation failed: {str(e)}") | |
return state | |
def _calculate_parameters(self, state: SoilAnalysisState) -> SoilAnalysisState: | |
"""Calculate engineering parameters (Su, Ο, etc.)""" | |
st.info("π Step 7: Calculating engineering parameters...") | |
try: | |
enhanced_layers = self.soil_calculator.enhance_soil_layers(state["processed_layers"]) | |
# Enhanced post-processing for multiple Su values | |
enhanced_layers = self._process_multiple_su_values(enhanced_layers) | |
state["processed_layers"] = enhanced_layers | |
state["workflow_status"] = "parameters_calculated" | |
st.success("β Engineering parameters calculated") | |
except Exception as e: | |
state["extraction_errors"] = state.get("extraction_errors", []) + [f"Parameter calculation error: {str(e)}"] | |
state["workflow_status"] = "calculation_error" | |
st.error(f"β Parameter calculation failed: {str(e)}") | |
return state | |
def _process_multiple_su_values(self, layers: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
"""Process layers that may have multiple Su values and decide on subdivision""" | |
enhanced_layers = [] | |
for layer in layers: | |
# Check if layer description mentions multiple Su values | |
description = layer.get('description', '').lower() | |
# Look for patterns indicating multiple Su values | |
import re | |
# Pattern to find multiple Su values in description | |
su_pattern = r'su[=\s]*(\d+(?:\.\d+)?)\s*(?:kpa|kPa|t/mΒ²|ksc|psi)' | |
su_values = re.findall(su_pattern, description) | |
# Pattern to find Su ranges | |
range_pattern = r'su\s*(?:ranges?|from)\s*(\d+(?:\.\d+)?)\s*(?:-|to)\s*(\d+(?:\.\d+)?)\s*(?:kpa|kPa)' | |
range_match = re.search(range_pattern, description) | |
# Pattern to find averaged Su values | |
avg_pattern = r'su\s*(?:averaged|average|mean)\s*(?:from)?\s*(?:\d+\s*measurements?)?\s*[:\s]*(\d+(?:\.\d+)?)' | |
avg_match = re.search(avg_pattern, description) | |
if len(su_values) > 1: | |
# Multiple Su values found - decide on subdivision or averaging | |
su_nums = [float(val) for val in su_values] | |
# Check variation | |
min_su = min(su_nums) | |
max_su = max(su_nums) | |
avg_su = sum(su_nums) / len(su_nums) | |
variation = (max_su - min_su) / avg_su if avg_su > 0 else 0 | |
if variation > 0.5 or max_su / min_su > 2.0: | |
# High variation - suggest layer subdivision | |
layer['subdivision_suggested'] = True | |
layer['su_variation_high'] = True | |
layer['su_values_found'] = su_nums | |
layer['su_variation_ratio'] = max_su / min_su if min_su > 0 else 0 | |
layer['subdivision_reason'] = f"High Su variation: {min_su:.1f}-{max_su:.1f} kPa (ratio: {max_su/min_su:.1f}x)" | |
# Update description to highlight the issue | |
layer['description'] += f" [SUBDIVISION RECOMMENDED: Su varies {min_su:.1f}-{max_su:.1f} kPa]" | |
st.warning(f"π Layer {layer.get('layer_id', '?')}: High Su variation detected - subdivision recommended") | |
else: | |
# Low variation - use average | |
layer['su_averaged'] = True | |
layer['su_values_found'] = su_nums | |
layer['su_average_used'] = avg_su | |
layer['strength_value'] = avg_su | |
layer['description'] += f" [Su averaged from {len(su_nums)} values: {', '.join([f'{v:.1f}' for v in su_nums])} kPa β {avg_su:.1f} kPa]" | |
st.info(f"π Layer {layer.get('layer_id', '?')}: Averaged {len(su_nums)} Su values: {avg_su:.1f} kPa") | |
elif range_match: | |
# Su range found | |
min_su = float(range_match.group(1)) | |
max_su = float(range_match.group(2)) | |
avg_su = (min_su + max_su) / 2 | |
layer['su_range_found'] = True | |
layer['su_range'] = [min_su, max_su] | |
layer['su_range_average'] = avg_su | |
layer['strength_value'] = avg_su | |
layer['description'] += f" [Su range {min_su:.1f}-{max_su:.1f} kPa, using average {avg_su:.1f} kPa]" | |
st.info(f"π Layer {layer.get('layer_id', '?')}: Su range processed, using average {avg_su:.1f} kPa") | |
elif avg_match: | |
# Averaged Su value already mentioned | |
avg_su = float(avg_match.group(1)) | |
layer['su_pre_averaged'] = True | |
layer['su_average_value'] = avg_su | |
layer['strength_value'] = avg_su | |
# Add metadata for tracking | |
layer['su_processing_applied'] = True | |
enhanced_layers.append(layer) | |
return enhanced_layers | |
def _optimize_layers(self, state: SoilAnalysisState) -> SoilAnalysisState: | |
"""Optimize layer division and grouping""" | |
st.info("βοΈ Step 8: Optimizing layer division...") | |
try: | |
from soil_analyzer import SoilLayerAnalyzer | |
analyzer = SoilLayerAnalyzer() | |
# Validate layer continuity | |
validated_layers = analyzer.validate_layer_continuity(state["processed_layers"]) | |
# Calculate statistics | |
stats = analyzer.calculate_layer_statistics(validated_layers) | |
state["validation_stats"] = stats | |
# Optimize layer division | |
optimization = analyzer.optimize_layer_division( | |
validated_layers, | |
merge_similar=state.get("merge_similar", True), | |
split_thick=state.get("split_thick", True) | |
) | |
state["optimization_results"] = optimization | |
# Use optimized layers | |
state["processed_layers"] = optimization.get("optimized_layers", validated_layers) | |
state["workflow_status"] = "optimized" | |
st.success("β Layer optimization completed") | |
except Exception as e: | |
state["extraction_errors"] = state.get("extraction_errors", []) + [f"Optimization error: {str(e)}"] | |
state["workflow_status"] = "optimization_error" | |
st.error(f"β Layer optimization failed: {str(e)}") | |
return state | |
def _finalize_results(self, state: SoilAnalysisState) -> SoilAnalysisState: | |
"""Finalize and package results""" | |
st.info("π¦ Step 9: Finalizing results...") | |
try: | |
# Generate processing summary | |
processing_summary = self.soil_processor.get_processing_summary(state["processed_layers"]) | |
state["processing_summary"] = processing_summary | |
# Package final results | |
final_soil_data = { | |
"project_info": state["project_info"], | |
"soil_layers": state["processed_layers"], | |
"water_table": state["water_table"], | |
"notes": state["notes"], | |
"processing_summary": processing_summary, | |
"validation_stats": state.get("validation_stats", {}), | |
"optimization_results": state.get("optimization_results", {}), | |
"workflow_metadata": { | |
"model_used": state["model"], | |
"processing_steps": 9, | |
"total_layers": len(state["processed_layers"]), | |
"ss_samples": processing_summary.get("ss_samples", 0), | |
"st_samples": processing_summary.get("st_samples", 0) | |
} | |
} | |
state["final_soil_data"] = final_soil_data | |
state["workflow_status"] = "completed" | |
st.success("π Unified soil analysis workflow completed successfully!") | |
except Exception as e: | |
state["extraction_errors"] = state.get("extraction_errors", []) + [f"Finalization error: {str(e)}"] | |
state["workflow_status"] = "finalization_error" | |
st.error(f"β Result finalization failed: {str(e)}") | |
return state | |
def _handle_errors(self, state: SoilAnalysisState) -> SoilAnalysisState: | |
"""Handle workflow errors""" | |
st.error("β Workflow encountered errors") | |
errors = state.get("extraction_errors", []) | |
for error in errors: | |
st.error(f" β’ {error}") | |
state["workflow_status"] = "failed" | |
state["final_soil_data"] = { | |
"error": "Workflow failed", | |
"errors": errors, | |
"raw_response": state.get("raw_llm_response", "") | |
} | |
return state | |
# Conditional routing functions | |
def _should_continue_after_validation(self, state: SoilAnalysisState) -> str: | |
"""Determine next step after input validation""" | |
if state["workflow_status"] == "validated": | |
return "continue" | |
else: | |
return "error" | |
def _should_continue_after_extraction(self, state: SoilAnalysisState) -> str: | |
"""Determine next step after LLM extraction - simplified without retry loops""" | |
workflow_status = state.get("workflow_status", "unknown") | |
if workflow_status == "extraction_validated": | |
st.info("β Proceeding to SS/ST classification...") | |
return "continue" | |
else: | |
st.error(f"β Extraction validation failed with status: {workflow_status}") | |
return "error" | |
def _get_gemini_safe_prompt(self) -> str: | |
"""Get a simplified, safer prompt for Gemini models to avoid content filtering""" | |
return """You are a geotechnical engineer analyzing soil data. | |
Extract information from soil boring logs and return ONLY valid JSON. | |
Required JSON format: | |
{ | |
"project_info": { | |
"project_name": "string", | |
"boring_id": "string", | |
"location": "string", | |
"date": "string", | |
"depth_total": 10.0 | |
}, | |
"soil_layers": [ | |
{ | |
"layer_id": 1, | |
"depth_from": 0.0, | |
"depth_to": 2.0, | |
"soil_type": "clay", | |
"description": "description text", | |
"sample_type": "SS", | |
"strength_parameter": "SPT-N", | |
"strength_value": 15, | |
"water_content": 25, | |
"color": "brown", | |
"consistency": "soft" | |
} | |
], | |
"water_table": {"depth": 3.0, "date_encountered": "2024-01-01"}, | |
"notes": "Additional notes" | |
} | |
Key rules: | |
1. Look for SS-* or ST-* sample identifiers in first column | |
2. SS samples use SPT-N values, ST samples use Su values | |
3. **CRITICAL - READ COLUMN HEADERS FOR UNITS**: | |
Look at table headers to identify Su units: | |
- If header shows "Su t/mΒ²" or "Su (t/mΒ²)" β Units are t/mΒ² | |
- If header shows "Su kPa" or "Su (kPa)" β Units are kPa | |
- If header shows "Su ksc" or "Su (ksc)" β Units are ksc | |
4. **CAREFULLY convert Su units to kPa BASED ON HEADER**: | |
- t/mΒ² β kPa: multiply by 9.81 (CRITICAL - MOST COMMON ERROR) | |
- ksc/kg/cmΒ² β kPa: multiply by 98.0 | |
- psi β kPa: multiply by 6.895 | |
- MPa β kPa: multiply by 1000 | |
- kPa β kPa: no conversion (use directly) | |
5. Extract water content when available | |
6. Check Su-water content correlation (soft clay: Su<50kPa, w%>30%) | |
7. Group similar layers (maximum 7 layers total) | |
8. Return ONLY the JSON object, no explanatory text | |
9. Start response with { and end with }""" | |
def _get_unified_system_prompt(self) -> str: | |
"""Get the comprehensive system prompt for unified processing""" | |
return """You are an expert geotechnical engineer specializing in soil boring log interpretation. | |
IMPORTANT: You must respond with ONLY valid JSON data. Do not include any text before or after the JSON. | |
SAMPLE TYPE IDENTIFICATION (CRITICAL - FOLLOW EXACT ORDER): | |
**STEP 1 - FIRST COLUMN STRATIFICATION SYMBOLS (ABSOLUTE HIGHEST PRIORITY):** | |
ALWAYS look at the FIRST COLUMN of each layer for stratification symbols: | |
- **SS-1, SS-2, SS-18, SS18, SS-5** β SS (Split Spoon) sample | |
- **ST-1, ST-2, ST-5, ST5, ST-12** β ST (Shelby Tube) sample | |
- **SS1, SS2, SS3** (without dash) β SS sample | |
- **ST1, ST2, ST3** (without dash) β ST sample | |
- **Look for pattern: [SS|ST][-]?[0-9]+** in first column | |
**EXAMPLES of First Column Recognition:** | |
``` | |
SS-18 | Brown clay, N=8 β sample_type="SS" (SS-18 in first column) | |
ST-5 | Gray clay, Su=45 kPa β sample_type="ST" (ST-5 in first column) | |
SS12 | Sandy clay, SPT test β sample_type="SS" (SS12 in first column) | |
ST3 | Soft clay, unconfined β sample_type="ST" (ST3 in first column) | |
``` | |
**STEP 2 - If NO first column symbols, then check description keywords:** | |
- SS indicators: "split spoon", "SPT", "standard penetration", "disturbed" | |
- ST indicators: "shelby", "tube", "undisturbed", "UT", "unconfined compression" | |
**STEP 3 - If still unclear, use strength parameter type:** | |
- SPT-N values present β likely SS sample | |
- Su values from unconfined test β likely ST sample | |
CRITICAL SOIL CLASSIFICATION RULES (MANDATORY): | |
**SAND LAYER CLASSIFICATION REQUIREMENTS:** | |
1. **Sand layers MUST have sieve analysis evidence** - Look for: | |
- "Sieve #200: X% passing" or "#200 passing: X%" | |
- "Fines content: X%" (same as sieve #200) | |
- "Particle size analysis" or "gradation test" | |
- "% passing 0.075mm" (equivalent to #200 sieve) | |
2. **Classification Rules**: | |
- Sieve #200 >50% passing β CLAY (fine-grained) | |
- Sieve #200 <50% passing β SAND/GRAVEL (coarse-grained) | |
3. **NO SIEVE ANALYSIS = ASSUME CLAY (MANDATORY)**: | |
- If no sieve analysis data found β ALWAYS classify as CLAY | |
- Include note: "Assumed clay - no sieve analysis data available" | |
- Set sieve_200_passing: null (not a number) | |
**CRITICAL**: Never classify as sand/silt without explicit sieve analysis evidence | |
**CRITICAL**: Always look for sieve #200 data before classifying as sand | |
CRITICAL SS/ST SAMPLE RULES (MUST FOLLOW): | |
FOR SS (Split Spoon) SAMPLES: | |
1. ALWAYS use RAW N-VALUE (not N-corrected, N-correction, or adjusted N) | |
2. Look for: "N = 15", "SPT-N = 8", "raw N = 20", "field N = 12" | |
3. IGNORE: "N-corrected = 25", "N-correction = 18", "adjusted N = 30" | |
4. For clay: Use SPT-N parameter (will be converted to Su using Su=5*N) | |
5. For sand/silt: Use SPT-N parameter (will be converted to friction angle) | |
6. NEVER use unconfined compression Su values for SS samples - ONLY use N values | |
FOR ST (Shelby Tube) SAMPLES: | |
1. ALWAYS USE DIRECT Su values from unconfined compression test | |
2. If ST sample has Su value (e.g., "Su = 25 kPa"), use that EXACT value | |
3. NEVER convert SPT-N to Su for ST samples when direct Su is available | |
4. Priority: Direct Su measurement > any other value | |
CRITICAL SU VALUE EXTRACTION - MULTIPLE VALUES PER LAYER: | |
**EXTRACT ALL SU VALUES IN COLUMN (CRITICAL ENHANCEMENT):** | |
**STEP 1 - SCAN ENTIRE SU COLUMN FOR EACH LAYER:** | |
1. Look for ALL Su values that fall within each layer's depth range | |
2. Extract EVERY Su value found in the Su column for that depth interval | |
3. Record ALL values with their exact depths if specified | |
4. Note: A single layer may have multiple Su measurements at different depths | |
**STEP 2 - HANDLE MULTIPLE SU VALUES PER LAYER:** | |
For layers with multiple Su values, you have several options: | |
Option A - **LAYER SUBDIVISION (PREFERRED for significant variation):** | |
- If Su values vary by >50% or have >2x ratio β Split into sublayers | |
- Example: Layer 2.0-6.0m has Su values [25, 45, 80] kPa | |
- Split into: Layer 2.0-3.5m (Su=25kPa), Layer 3.5-5.0m (Su=45kPa), Layer 5.0-6.0m (Su=80kPa) | |
Option B - **AVERAGE SU VALUES (for similar values):** | |
- If Su values are within Β±30% of mean β Use average | |
- Example: Layer 1.0-3.0m has Su values [35, 40, 38] kPa β Use Su=37.7kPa | |
- Include note: "Su averaged from 3 measurements: 35, 40, 38 kPa" | |
Option C - **REPRESENTATIVE VALUE (for clusters):** | |
- If multiple similar values with one outlier β Use cluster average | |
- Example: Su values [25, 28, 26, 45] β Use 26.3kPa (ignore outlier 45) | |
**STEP 3 - DOCUMENT ALL VALUES FOUND:** | |
Always include in description: | |
- "Su values found: 25, 35, 42 kPa (averaged to 34 kPa)" | |
- "Multiple Su measurements: 30, 28, 32 kPa at depths 2.1, 2.5, 2.8m" | |
- "Su ranges from 40-60 kPa, used average 50 kPa" | |
CRITICAL UNIT CONVERSION REQUIREMENTS (MUST APPLY): | |
**MANDATORY SU UNIT CONVERSION - READ COLUMN HEADERS FIRST:** | |
**STEP 1 - IDENTIFY UNITS FROM TABLE HEADERS (CRITICAL):** | |
ALWAYS look at the column headers to identify Su units: | |
- "Su t/mΒ²" or "Su (t/mΒ²)" in header β Values are in t/mΒ² | |
- "Su kPa" or "Su (kPa)" in header β Values are in kPa | |
- "Su ksc" or "Su (ksc)" in header β Values are in ksc | |
- "Su psi" or "Su (psi)" in header β Values are in psi | |
- Just "Su" with units below β Look at unit row (e.g., "t/mΒ²") | |
**STEP 2 - CONVERT TO kPa BASED ON IDENTIFIED UNITS:** | |
When extracting Su values from images or text, you MUST convert to kPa BEFORE using the value: | |
1. **ksc or kg/cmΒ²**: Su_kPa = Su_ksc Γ 98.0 | |
Example: "Su = 2.5 ksc" β strength_value: 245 (not 2.5) | |
2. **t/mΒ² (tonnes/mΒ²)**: Su_kPa = Su_tonnes Γ 9.81 | |
Example: "Su = 3.0 t/mΒ²" β strength_value: 29.43 (not 3.0) | |
**CRITICAL**: This is the MOST COMMON unit in boring logs! | |
3. **psi**: Su_kPa = Su_psi Γ 6.895 | |
Example: "Su = 50 psi" β strength_value: 344.75 (not 50) | |
4. **psf**: Su_kPa = Su_psf Γ 0.048 | |
Example: "Su = 1000 psf" β strength_value: 48 (not 1000) | |
5. **kPa**: Use directly (no conversion needed) | |
Example: "Su = 75 kPa" β strength_value: 75 | |
6. **MPa**: Su_kPa = Su_MPa Γ 1000 | |
Example: "Su = 0.1 MPa" β strength_value: 100 (not 0.1) | |
**CRITICAL EXAMPLES FROM BORING LOGS:** | |
- Table header shows "Su t/mΒ²", value 1.41 β strength_value: 13.83 (1.41 Γ 9.81) | |
- Table header shows "Su t/mΒ²", value 2.41 β strength_value: 23.64 (2.41 Γ 9.81) | |
- Table header shows "Su kPa", value 75 β strength_value: 75 (no conversion) | |
**IMPORTANT**: Always include original unit in description for verification | |
**SPT-N values**: Keep as-is (no unit conversion needed) | |
CRITICAL SU-WATER CONTENT VALIDATION (MANDATORY): | |
**EXTRACT WATER CONTENT WHEN AVAILABLE:** | |
Always extract water content (w%) when mentioned in the description: | |
- \"water content = 25%\" β water_content: 25 | |
- \"w = 30%\" β water_content: 30 | |
- \"moisture content 35%\" β water_content: 35 | |
**VALIDATE SU-WATER CONTENT CORRELATION:** | |
For clay layers, Su and water content should correlate reasonably: | |
- Very soft clay: Su < 25 kPa, w% > 40% | |
- Soft clay: Su 25-50 kPa, w% 30-40% | |
- Medium clay: Su 50-100 kPa, w% 20-30% | |
- Stiff clay: Su 100-200 kPa, w% 15-25% | |
- Very stiff clay: Su 200-400 kPa, w% 10-20% | |
- Hard clay: Su > 400 kPa, w% < 15% | |
**CRITICAL UNIT CHECK SCENARIOS:** | |
- If Su > 1000 kPa with w% > 20%: CHECK if Su is in wrong units (psi, psf?) | |
- If Su < 5 kPa with w% < 15%: CHECK if Su is in wrong units (MPa, bar?) | |
- If correlation seems very off: VERIFY unit conversion was applied correctly | |
CRITICAL OUTPUT FORMAT (MANDATORY): | |
You MUST respond with ONLY a valid JSON object. Do not include: | |
- Explanatory text before or after the JSON | |
- Markdown formatting (```json ```) | |
- Comments or notes | |
- Multiple JSON objects | |
Start your response directly with { and end with } | |
EXAMPLE CORRECT RESPONSE FORMAT: | |
{ | |
"project_info": { | |
"project_name": "Sample Project", | |
"boring_id": "BH-01", | |
"location": "Sample Location", | |
"date": "2024-06-25", | |
"depth_total": 10.0 | |
}, | |
"soil_layers": [ | |
{ | |
"layer_id": 1, | |
"depth_from": 0.0, | |
"depth_to": 2.0, | |
"soil_type": "clay", | |
"description": "Brown clay, soft, SS-1 sample", | |
"sample_type": "SS", | |
"strength_parameter": "SPT-N", | |
"strength_value": 4, | |
"water_content": 35, | |
"color": "brown", | |
"consistency": "soft" | |
} | |
], | |
"water_table": {"depth": 3.0, "date_encountered": "2024-06-25"}, | |
"notes": "Standard soil boring analysis" | |
} | |
LAYER GROUPING REQUIREMENTS: | |
1. MAXIMUM 7 LAYERS TOTAL - Group similar adjacent layers to achieve this limit | |
2. CLAY AND SAND MUST BE SEPARATE - Never combine clay layers with sand layers | |
3. Group adjacent layers with similar properties (same soil type and similar consistency) | |
4. Prioritize engineering significance over minor variations | |
Analyze the provided soil boring log and extract the following information in this exact JSON format: | |
{ | |
"project_info": { | |
"project_name": "string", | |
"boring_id": "string", | |
"location": "string", | |
"date": "string", | |
"depth_total": 10.0 | |
}, | |
"soil_layers": [ | |
{ | |
"layer_id": 1, | |
"depth_from": 0.0, | |
"depth_to": 2.5, | |
"soil_type": "clay", | |
"description": "Brown silty clay, ST sample, Su = 25 kPa", | |
"sample_type": "ST", | |
"strength_parameter": "Su", | |
"strength_value": 25, | |
"sieve_200_passing": 65, | |
"water_content": 35.5, | |
"color": "brown", | |
"moisture": "moist", | |
"consistency": "soft", | |
"su_source": "Unconfined Compression Test" | |
} | |
], | |
"water_table": { | |
"depth": 3.0, | |
"date_encountered": "2024-01-01" | |
}, | |
"notes": "Additional observations" | |
} | |
**CRITICAL EXAMPLES - MULTIPLE SU VALUES PER LAYER:** | |
**EXAMPLE 1 - Multiple Su Values (SUBDIVISION CASE):** | |
Layer depth 2.0-6.0m with Su column showing: | |
- "Su at 2.5m = 25 kPa" | |
- "Su at 4.0m = 45 kPa" | |
- "Su at 5.5m = 80 kPa" | |
PROCESSING: High variation (25-80 kPa, ratio 3.2x) β SUBDIVISION RECOMMENDED | |
β Include ALL values in description: "Multiple Su values: 25, 45, 80 kPa [SUBDIVISION RECOMMENDED: High variation]" | |
β Use representative value (middle): strength_value=45 | |
β Add metadata: subdivision_suggested=true, su_variation_high=true | |
**EXAMPLE 2 - Multiple Similar Su Values (AVERAGING CASE):** | |
Layer depth 1.0-3.0m with Su column showing: | |
- "Su = 35 kPa" | |
- "Su = 40 kPa" | |
- "Su = 38 kPa" | |
PROCESSING: Low variation (Β±7% from mean) β USE AVERAGE | |
β Description: "Su averaged from 3 measurements: 35, 40, 38 kPa β 37.7 kPa" | |
β Use: strength_value=37.7 | |
**EXAMPLE 3 - Su Range Detection:** | |
Layer with Su column: "Su ranges 40-60 kPa" | |
β Description: "Su range 40-60 kPa, using average 50 kPa" | |
β Use: strength_value=50 | |
EXAMPLES OF CORRECT FIRST COLUMN SYMBOL RECOGNITION: | |
**SS SAMPLE EXAMPLES (First Column Priority):** | |
1. "SS-18 | Clay layer, N = 8, Su = 45 kPa from unconfined test" | |
β First column: SS-18 β sample_type="SS" (HIGHEST PRIORITY) | |
β Use: strength_parameter="SPT-N", strength_value=8 | |
β IGNORE the Su=45 kPa value for SS samples | |
2. "SS18 | Soft clay, field N = 6, N-corrected = 10" | |
β First column: SS18 β sample_type="SS" (HIGHEST PRIORITY) | |
β Use: strength_parameter="SPT-N", strength_value=6 (raw N) | |
β IGNORE N-corrected value | |
3. "SS-5 | Brown clay, split spoon test, N=12" | |
β First column: SS-5 β sample_type="SS" (HIGHEST PRIORITY) | |
β Use: strength_parameter="SPT-N", strength_value=12 | |
**ST SAMPLE EXAMPLES (First Column Priority):** | |
1. "ST-5 | Stiff clay, Su = 85 kPa from unconfined compression" | |
β First column: ST-5 β sample_type="ST" (HIGHEST PRIORITY) | |
β Use: strength_parameter="Su", strength_value=85 | |
2. "ST-12 | Medium clay, Su = 2.5 ksc from unconfined test" | |
β First column: ST-12 β sample_type="ST" (HIGHEST PRIORITY) | |
β Convert: 2.5 Γ 98 = 245 kPa | |
β Use: strength_parameter="Su", strength_value=245 | |
3. "ST3 | Clay, unconfined strength = 3.0 t/mΒ²" | |
β First column: ST3 β sample_type="ST" (HIGHEST PRIORITY) | |
β Convert: 3.0 Γ 9.81 = 29.43 kPa | |
β Use: strength_parameter="Su", strength_value=29.43 | |
4. "ST-8 | Gray clay, shelby tube, Su = 120 kPa" | |
β First column: ST-8 β sample_type="ST" (HIGHEST PRIORITY) | |
β Use: strength_parameter="Su", strength_value=120 | |
5. "ST-10 | Gray clay, depth 3.0-6.0m, Su values: 35, 42, 39 kPa" | |
β First column: ST-10 β sample_type="ST" (HIGHEST PRIORITY) | |
β Multiple values detected: variation <30% β Use average | |
β Use: strength_parameter="Su", strength_value=38.7 | |
β Description: "Gray clay, shelby tube, Su averaged from 3 measurements: 35, 42, 39 kPa β 38.7 kPa" | |
6. "ST-15 | Stiff clay, Su measurements: 45, 85, 120 kPa at different depths" | |
β First column: ST-15 β sample_type="ST" (HIGHEST PRIORITY) | |
β High variation detected: ratio 2.7x β SUBDIVISION RECOMMENDED | |
β Use: strength_parameter="Su", strength_value=85 (middle value) | |
β Description: "Stiff clay, multiple Su values: 45, 85, 120 kPa [SUBDIVISION RECOMMENDED: High variation]" | |
**SOIL CLASSIFICATION EXAMPLES:** | |
1. "Brown silty clay, no sieve analysis data" | |
β soil_type="clay", sieve_200_passing=null | |
β Note: "Assumed clay - no sieve analysis data available" | |
2. "Sandy clay, sieve #200: 75% passing" | |
β soil_type="clay", sieve_200_passing=75 | |
β Classification: Clay (>50% passing) | |
3. "Medium sand, gradation test shows 25% passing #200" | |
β soil_type="sand", sieve_200_passing=25 | |
β Classification: Sand (<50% passing) | |
4. "Dense sand layer" (NO sieve data mentioned) | |
β soil_type="clay", sieve_200_passing=null | |
β Note: "Assumed clay - no sieve analysis data available" | |
β NEVER classify as sand without sieve data | |
TECHNICAL RULES: | |
1. All numeric values must be numbers, not strings | |
2. For soil_type, use basic terms: "clay", "sand", "silt", "gravel" - do NOT include consistency | |
3. Include sample_type field: "SS" (Split Spoon) or "ST" (Shelby Tube) | |
4. Include sieve_200_passing field when available (percentage passing sieve #200) | |
5. Include water_content field when available (percentage water content for clay consistency checks) | |
6. Include su_source field: "Unconfined Compression Test" for direct measurements, or "Calculated from SPT-N" for conversions | |
7. Strength parameters: | |
- SS samples: ALWAYS use "SPT-N" with RAW N-value (will be converted based on soil type) | |
- ST samples with clay: Use "Su" with DIRECT value in kPa from unconfined compression test | |
- For sand/gravel: Always use "SPT-N" with N-value | |
- NEVER use Su for SS samples, NEVER calculate Su from SPT-N for ST samples that have direct Su | |
8. Put consistency separately in "consistency" field: "soft", "medium", "stiff", "loose", "dense", etc. | |
9. Ensure continuous depths (no gaps or overlaps) | |
10. All depths in meters, strength values as numbers | |
11. Return ONLY the JSON object, no additional text""" | |
def _parse_llm_response(self, response: str) -> Dict[str, Any]: | |
"""Parse LLM JSON response with enhanced error handling""" | |
# First check if response is empty or None | |
if not response or not response.strip(): | |
return {"error": "Empty response from LLM", "raw_response": response or ""} | |
try: | |
# Clean response | |
json_str = response.strip() | |
# Log raw response for debugging (first 500 chars) | |
st.info(f"π Raw LLM response preview: {json_str[:500]}{'...' if len(json_str) > 500 else ''}") | |
# Remove markdown code blocks if present | |
if "```json" in json_str: | |
json_start = json_str.find("```json") + 7 | |
json_end = json_str.find("```", json_start) | |
if json_end == -1: | |
json_end = len(json_str) | |
json_str = json_str[json_start:json_end].strip() | |
st.info("π§ Extracted JSON from markdown code block") | |
elif "```" in json_str: | |
json_start = json_str.find("```") + 3 | |
json_end = json_str.rfind("```") | |
if json_end > json_start: | |
json_str = json_str[json_start:json_end].strip() | |
st.info("π§ Extracted content from code block") | |
# Handle cases where LLM includes explanatory text before/after JSON | |
# Look for JSON object boundaries more aggressively | |
brace_start = json_str.find("{") | |
brace_end = json_str.rfind("}") | |
if brace_start != -1 and brace_end != -1 and brace_end > brace_start: | |
json_str = json_str[brace_start:brace_end + 1] | |
st.info(f"π§ Extracted JSON object: {len(json_str)} characters") | |
elif not json_str.startswith("{"): | |
# No JSON found | |
return { | |
"error": f"No JSON object found in response. Response appears to be: {json_str[:200]}", | |
"raw_response": response | |
} | |
# Try to parse JSON | |
result = json.loads(json_str) | |
# Validate structure | |
if not isinstance(result, dict): | |
return {"error": f"Expected JSON object, got {type(result)}", "raw_response": response} | |
if "soil_layers" not in result: | |
result["soil_layers"] = [] | |
st.warning("β οΈ No 'soil_layers' found in response, using empty list") | |
if "project_info" not in result: | |
result["project_info"] = {} | |
st.warning("β οΈ No 'project_info' found in response, using empty dict") | |
st.success(f"β JSON parsed successfully: {len(result.get('soil_layers', []))} layers found") | |
return result | |
except json.JSONDecodeError as e: | |
error_msg = f"JSON parsing failed: {str(e)}" | |
st.error(f"β {error_msg}") | |
st.error(f"π Problematic content: {json_str[:300] if 'json_str' in locals() else 'N/A'}") | |
return {"error": error_msg, "raw_response": response} | |
except Exception as e: | |
error_msg = f"Response parsing failed: {str(e)}" | |
st.error(f"β {error_msg}") | |
return {"error": error_msg, "raw_response": response} | |
def get_workflow_visualization(self) -> str: | |
"""Get a visual representation of the workflow steps""" | |
return """ | |
π **Unified Soil Analysis Workflow** π | |
**Step 1** π **Validate Inputs** β Check API key, content, model | |
**Step 2** π€ **Extract with LLM** β Use enhanced prompts for SS/ST classification | |
**Step 3** β **Validate Extraction** β Check layer structure and data quality | |
**Step 4** π§ͺ **Process SS/ST Classification** β Apply sample-specific processing | |
**Step 5** π§ **Apply Unit Conversions** β Convert all values to SI units (kPa) | |
**Step 6** π― **Validate Soil Classification** β Enforce sieve analysis requirements | |
**Step 7** π **Calculate Parameters** β Compute Su, Ο, and other properties | |
**Step 8** βοΈ **Optimize Layers** β Group and validate layer continuity | |
**Step 9** π¦ **Finalize Results** β Package complete analysis results | |
**Key Features:** | |
β’ **Unified Processing**: Single workflow handles all steps | |
β’ **SS/ST Classification**: Automatic sample type identification | |
β’ **Unit Conversion**: All Su values converted to kPa from images/text | |
β’ **Sieve Analysis Enforcement**: Sand layers require #200 sieve data | |
β’ **Error Handling**: Comprehensive validation and recovery | |
β’ **State Management**: Complete workflow state tracking | |
""" | |
def analyze_soil_boring_log(self, | |
text_content: Optional[str] = None, | |
image_base64: Optional[str] = None, | |
model: str = None, | |
api_key: str = None, | |
merge_similar: bool = True, | |
split_thick: bool = True) -> Dict[str, Any]: | |
""" | |
Run the unified soil analysis workflow | |
Args: | |
text_content: Extracted text from document | |
image_base64: Base64 encoded image | |
model: LLM model to use | |
api_key: OpenRouter API key | |
merge_similar: Whether to merge similar layers | |
split_thick: Whether to split thick layers | |
Returns: | |
Complete soil analysis results | |
""" | |
# Initialize state | |
initial_state = SoilAnalysisState( | |
text_content=text_content, | |
image_base64=image_base64, | |
model=model or get_default_provider_and_model()[1], | |
api_key=api_key or "", | |
merge_similar=merge_similar, | |
split_thick=split_thick, | |
llm_extraction_success=False, | |
extraction_errors=[], | |
retry_count=0, # Initialize retry counter | |
project_info={}, | |
raw_soil_layers=[], | |
processed_layers=[], | |
water_table={}, | |
notes="", | |
processing_summary={}, | |
validation_stats={}, | |
optimization_results={}, | |
final_soil_data={}, | |
workflow_status="initializing", | |
workflow_messages=[] | |
) | |
# Run workflow | |
st.info("π Starting unified soil analysis workflow...") | |
try: | |
# Execute the workflow with recursion limit protection | |
final_state = self.workflow.invoke( | |
initial_state, | |
config={"recursion_limit": 50} # Set explicit recursion limit | |
) | |
# Return results | |
if final_state["workflow_status"] == "completed": | |
st.success("π Unified workflow completed successfully!") | |
return final_state["final_soil_data"] | |
else: | |
st.error(f"β Workflow failed with status: {final_state['workflow_status']}") | |
return final_state["final_soil_data"] | |
except Exception as e: | |
error_msg = str(e) | |
if "recursion limit" in error_msg.lower(): | |
st.error("β Workflow execution failed: Recursion limit reached. This may indicate a configuration issue with the model or workflow logic.") | |
st.info("π‘ Try using a different model or check your input data format.") | |
else: | |
st.error(f"β Workflow execution failed: {error_msg}") | |
return { | |
"error": f"Workflow execution failed: {error_msg}", | |
"workflow_status": "execution_failed" | |
} |