""" Unified Soil Analysis Workflow using LangGraph Combines LLM classification and SS/ST processing into a single controlled workflow """ import json from typing import Dict, List, Any, Optional, TypedDict, Annotated import streamlit as st from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langchain_core.messages import BaseMessage, HumanMessage, AIMessage import openai from soil_classification import SoilClassificationProcessor from soil_calculations import SoilCalculations from config import LLM_PROVIDERS, AVAILABLE_MODELS, get_default_provider_and_model, get_api_key class SoilAnalysisState(TypedDict): """State for the unified soil analysis workflow""" # Input data text_content: Optional[str] image_base64: Optional[str] model: str api_key: str # Processing flags merge_similar: bool split_thick: bool # LLM Analysis results raw_llm_response: Optional[str] llm_extraction_success: bool extraction_errors: List[str] retry_count: int # Add retry counter # Soil data (from LLM) project_info: Dict[str, Any] raw_soil_layers: List[Dict[str, Any]] water_table: Dict[str, Any] notes: str # Processing results processed_layers: List[Dict[str, Any]] processing_summary: Dict[str, Any] validation_stats: Dict[str, Any] optimization_results: Dict[str, Any] # Final output final_soil_data: Dict[str, Any] workflow_status: str workflow_messages: Annotated[List[BaseMessage], add_messages] class UnifiedSoilWorkflow: """ Unified LangGraph workflow for soil analysis Combines LLM extraction and SS/ST processing into one controlled flow """ def __init__(self): self.soil_processor = SoilClassificationProcessor() self.soil_calculator = SoilCalculations() self.workflow = self._build_workflow() def _get_provider_from_model(self, model: str) -> str: """Determine provider from model name""" for model_id, model_info in AVAILABLE_MODELS.items(): if model_id == model: # Return the first provider that supports this model providers = model_info.get("providers", []) if providers: return providers[0] # Default fallback logic based on model prefix if model.startswith("anthropic/"): return "anthropic" elif model.startswith("google/"): return "google" else: return "openrouter" # Default to OpenRouter for other models def _build_workflow(self) -> StateGraph: """Build the unified LangGraph workflow""" # Create workflow graph workflow = StateGraph(SoilAnalysisState) # Add nodes workflow.add_node("validate_inputs", self._validate_inputs) workflow.add_node("extract_with_llm", self._extract_with_llm) workflow.add_node("validate_extraction", self._validate_extraction) workflow.add_node("process_ss_st_classification", self._process_ss_st_classification) workflow.add_node("apply_unit_conversions", self._apply_unit_conversions) workflow.add_node("validate_soil_classification", self._validate_soil_classification) workflow.add_node("calculate_parameters", self._calculate_parameters) workflow.add_node("optimize_layers", self._optimize_layers) workflow.add_node("finalize_results", self._finalize_results) workflow.add_node("handle_errors", self._handle_errors) # Define workflow edges workflow.add_edge(START, "validate_inputs") # Conditional routing based on validation workflow.add_conditional_edges( "validate_inputs", self._should_continue_after_validation, { "continue": "extract_with_llm", "error": "handle_errors" } ) workflow.add_edge("extract_with_llm", "validate_extraction") # Simplified routing - no retry loop to prevent recursion workflow.add_conditional_edges( "validate_extraction", self._should_continue_after_extraction, { "continue": "process_ss_st_classification", "error": "handle_errors" } ) workflow.add_edge("process_ss_st_classification", "apply_unit_conversions") workflow.add_edge("apply_unit_conversions", "validate_soil_classification") workflow.add_edge("validate_soil_classification", "calculate_parameters") workflow.add_edge("calculate_parameters", "optimize_layers") workflow.add_edge("finalize_results", END) workflow.add_edge("optimize_layers", "finalize_results") workflow.add_edge("handle_errors", END) return workflow.compile() def _validate_inputs(self, state: SoilAnalysisState) -> SoilAnalysisState: """Validate input data and configuration""" st.info("๐Ÿ” Step 1: Validating inputs...") errors = [] # Validate API key if not state.get("api_key"): errors.append("No API key provided") # Validate content if not state.get("text_content") and not state.get("image_base64"): errors.append("No text or image content provided") # Validate model (allow custom models not in AVAILABLE_MODELS) _, default_model = get_default_provider_and_model() model = state.get("model", default_model) if not model or not isinstance(model, str): errors.append(f"Invalid model format: {model}") elif model not in AVAILABLE_MODELS: # Allow custom models - just log info st.info(f"๐Ÿ“‹ Using custom model: {model} (not in pre-configured list)") if errors: state["extraction_errors"] = errors state["workflow_status"] = "validation_failed" state["workflow_messages"] = [HumanMessage(content=f"Validation errors: {', '.join(errors)}")] else: state["workflow_status"] = "validated" state["workflow_messages"] = [HumanMessage(content="Input validation passed")] st.success("โœ… Input validation passed") return state def _extract_with_llm(self, state: SoilAnalysisState) -> SoilAnalysisState: """Extract soil data using LLM with enhanced prompts""" retry_count = state.get("retry_count", 0) st.info(f"๐Ÿค– Step 2: Extracting soil data with LLM... (attempt {retry_count + 1})") try: # Determine provider and base URL from model provider_id = self._get_provider_from_model(state["model"]) base_url = LLM_PROVIDERS[provider_id]["base_url"] # Initialize OpenAI client with correct provider client = openai.OpenAI( base_url=base_url, api_key=state["api_key"] ) # Enhanced system prompt with all requirements - use safer version for Gemini if "gemini" in state["model"].lower(): system_prompt = self._get_gemini_safe_prompt() st.info("๐Ÿ”ง Using Gemini-optimized prompt to avoid content filtering") else: system_prompt = self._get_unified_system_prompt() # Build messages messages = [{"role": "system", "content": system_prompt}] # Add content if state.get("text_content"): messages.append({ "role": "user", "content": f"Please analyze this soil boring log text:\n\n{state['text_content']}" }) # Add image if supported and available model_info = AVAILABLE_MODELS.get(state["model"], {}) # For custom models, assume image support (user responsibility) supports_images = model_info.get('supports_images', True) if state["model"] not in AVAILABLE_MODELS else model_info.get('supports_images', False) if state.get("image_base64") and supports_images: messages.append({ "role": "user", "content": [ {"type": "text", "text": "Please analyze this soil boring log image:"}, { "type": "image_url", "image_url": {"url": f"data:image/png;base64,{state['image_base64']}"} } ] }) # Call LLM with detailed error handling st.info(f"๐Ÿ”— Making API call to {state['model']}...") st.info(f"๐Ÿ“ Message count: {len(messages)}, Max tokens: 3000") try: response = client.chat.completions.create( model=state["model"], messages=messages, max_tokens=3000, temperature=0.1 ) # Debug response structure st.info(f"๐Ÿ” Response received - Choices count: {len(response.choices) if response and response.choices else 0}") # Check if response is valid if not response or not response.choices: raise Exception("No response received from LLM API") raw_response = response.choices[0].message.content # Debug response content if raw_response is None: raise Exception("Response content is None") elif not raw_response.strip(): # Check if it's just whitespace/newlines if len(raw_response) > 0: whitespace_chars = [repr(c) for c in raw_response[:10]] raise Exception(f"Response contains only whitespace (length: {len(raw_response)}, chars: {whitespace_chars})") else: raise Exception("Completely empty response from LLM API") # Check for very short responses that might indicate filtering elif len(raw_response.strip()) < 10: st.warning(f"โš ๏ธ Very short response ({len(raw_response)} chars): '{raw_response[:50]}'") st.info("๐Ÿ’ก This might indicate content filtering. Try a simpler prompt or different model.") state["raw_llm_response"] = raw_response st.success(f"๐Ÿ“ฅ Received response: {len(raw_response)} characters") except Exception as api_error: # Enhanced API error handling error_msg = str(api_error) st.error(f"โŒ API call failed: {error_msg}") # Check if it's a model-specific issue if "not a valid model ID" in error_msg: st.error(f"๐Ÿšซ Model '{state['model']}' is not available on OpenRouter") st.info("๐Ÿ’ก Try using a different model like 'anthropic/claude-sonnet-4'") elif "rate limit" in error_msg.lower(): st.error("โฐ Rate limit exceeded. Please wait and try again.") elif "empty" in error_msg.lower() or "none" in error_msg.lower(): st.error("๐Ÿ“ญ Model returned empty response. This might be due to:") st.info(" โ€ข Content filtering by the model") st.info(" โ€ข Model configuration issues") st.info(" โ€ข Input content triggering safety filters") st.info("๐Ÿ’ก Try a different model or simpler input text") raise api_error # Parse JSON response with enhanced error handling soil_data = self._parse_llm_response(raw_response) if "error" in soil_data: state["llm_extraction_success"] = False state["extraction_errors"] = [soil_data["error"]] state["workflow_status"] = "extraction_failed" st.error(f"โŒ JSON parsing failed: {soil_data['error']}") else: # Validate that we have basic required data layers = soil_data.get("soil_layers", []) if not layers: state["llm_extraction_success"] = False state["extraction_errors"] = ["No soil layers found in LLM response"] state["workflow_status"] = "extraction_failed" st.error("โŒ No soil layers found in LLM response") else: state["llm_extraction_success"] = True state["project_info"] = soil_data.get("project_info", {}) state["raw_soil_layers"] = layers state["water_table"] = soil_data.get("water_table", {}) state["notes"] = soil_data.get("notes", "") state["workflow_status"] = "extracted" st.success(f"โœ… LLM extraction completed: {len(layers)} layers found") except Exception as e: state["llm_extraction_success"] = False state["extraction_errors"] = [str(e)] state["workflow_status"] = "extraction_error" st.error(f"โŒ LLM extraction failed: {str(e)}") state["workflow_messages"] = state.get("workflow_messages", []) + [ AIMessage(content=f"LLM extraction: {'success' if state['llm_extraction_success'] else 'failed'}") ] return state def _validate_extraction(self, state: SoilAnalysisState) -> SoilAnalysisState: """Validate LLM extraction results""" st.info("๐Ÿ” Step 3: Validating extraction results...") if not state["llm_extraction_success"]: return state validation_errors = [] # Check for required data if not state["raw_soil_layers"]: validation_errors.append("No soil layers extracted") # Validate layer structure for i, layer in enumerate(state["raw_soil_layers"]): if "depth_from" not in layer or "depth_to" not in layer: validation_errors.append(f"Layer {i+1}: Missing depth information") if "soil_type" not in layer: validation_errors.append(f"Layer {i+1}: Missing soil type") if validation_errors: state["extraction_errors"] = validation_errors state["workflow_status"] = "extraction_failed" # Use consistent status name st.warning(f"โš ๏ธ Validation issues found: {len(validation_errors)} errors") else: state["workflow_status"] = "extraction_validated" st.success("โœ… Extraction validation passed") return state def _process_ss_st_classification(self, state: SoilAnalysisState) -> SoilAnalysisState: """Process SS/ST sample classification""" st.info("๐Ÿงช Step 4: Processing SS/ST sample classification...") try: processed_layers = self.soil_processor.process_soil_layers(state["raw_soil_layers"]) state["processed_layers"] = processed_layers state["workflow_status"] = "ss_st_processed" st.success(f"โœ… SS/ST processing completed: {len(processed_layers)} layers processed") except Exception as e: state["extraction_errors"] = state.get("extraction_errors", []) + [f"SS/ST processing error: {str(e)}"] state["workflow_status"] = "ss_st_error" st.error(f"โŒ SS/ST processing failed: {str(e)}") return state def _apply_unit_conversions(self, state: SoilAnalysisState) -> SoilAnalysisState: """Apply unit conversions to all measurements""" st.info("๐Ÿ”ง Step 5: Applying unit conversions...") try: converted_layers = [] unit_warnings = [] for layer in state["processed_layers"]: converted_layer = self.soil_processor._convert_to_si_units(layer) converted_layers.append(converted_layer) # Collect unit validation warnings if converted_layer.get('unit_validation_warning'): unit_warnings.append(f"Layer {layer.get('layer_id', '?')}: {converted_layer['unit_validation_warning']}") state["processed_layers"] = converted_layers state["workflow_status"] = "units_converted" # Track different types of validation issues unit_errors = [] recheck_needed = [] critical_errors = [] for layer in converted_layers: validation_warning = layer.get('unit_validation_warning', '') if validation_warning: layer_id = layer.get('layer_id', '?') # Check if this layer needs image recheck if hasattr(self.soil_processor, '_validate_su_with_water_content'): detailed_validation = self.soil_processor._validate_su_with_water_content(layer) if detailed_validation.get('critical_unit_error'): critical_errors.append(f"Layer {layer_id}: {detailed_validation.get('suggested_conversion', 'Unit error')}") if detailed_validation.get('recheck_image'): recheck_needed.append(f"Layer {layer_id}: {validation_warning}") else: unit_errors.append(f"Layer {layer_id}: {validation_warning}") # Display different types of issues with appropriate severity if critical_errors: st.error("๐Ÿšจ CRITICAL UNIT CONVERSION ERRORS DETECTED:") for error in critical_errors: st.error(f" โ€ข {error}") st.error("โš ๏ธ These values appear to be in wrong units - conversion may be needed!") if recheck_needed: st.warning("๐Ÿ“ท IMAGE RECHECK RECOMMENDED:") for recheck in recheck_needed: st.warning(f" โ€ข {recheck}") st.info("๐Ÿ’ก Su-water content values seem inconsistent - consider reloading the image") if unit_errors: st.warning("โš ๏ธ Su-water content validation issues:") for error in unit_errors: st.info(f" โ€ข {error}") # Store all warnings for later reference all_warnings = critical_errors + recheck_needed + unit_errors if all_warnings: state["unit_validation_warnings"] = all_warnings state["needs_image_recheck"] = len(recheck_needed) > 0 state["has_critical_unit_errors"] = len(critical_errors) > 0 # Add to final results for user action state["validation_recommendations"] = { "critical_unit_errors": critical_errors, "recheck_image": recheck_needed, "general_warnings": unit_errors } else: st.success("โœ… Unit conversions applied - all Su-water content correlations look reasonable") except Exception as e: state["extraction_errors"] = state.get("extraction_errors", []) + [f"Unit conversion error: {str(e)}"] state["workflow_status"] = "conversion_error" st.error(f"โŒ Unit conversion failed: {str(e)}") return state def _validate_soil_classification(self, state: SoilAnalysisState) -> SoilAnalysisState: """Validate soil classification with sieve analysis requirements""" st.info("๐ŸŽฏ Step 6: Validating soil classification...") try: validated_layers = [] classification_warnings = [] for layer in state["processed_layers"]: # Apply enhanced soil classification validation validated_layer = layer.copy() # Re-classify with strict sieve analysis requirements soil_type = self.soil_processor._classify_soil_type(validated_layer) validated_layer["soil_type"] = soil_type # Track classification changes if layer.get("soil_type") != soil_type: classification_warnings.append( f"Layer {layer.get('layer_id', '?')}: Changed from '{layer.get('soil_type')}' to '{soil_type}'" ) validated_layers.append(validated_layer) state["processed_layers"] = validated_layers state["workflow_status"] = "classification_validated" if classification_warnings: st.warning(f"โš ๏ธ Classification changes: {len(classification_warnings)} layers updated") for warning in classification_warnings: st.info(f" โ€ข {warning}") else: st.success("โœ… Soil classification validation passed") except Exception as e: state["extraction_errors"] = state.get("extraction_errors", []) + [f"Classification validation error: {str(e)}"] state["workflow_status"] = "classification_error" st.error(f"โŒ Classification validation failed: {str(e)}") return state def _calculate_parameters(self, state: SoilAnalysisState) -> SoilAnalysisState: """Calculate engineering parameters (Su, ฯ†, etc.)""" st.info("๐Ÿ“Š Step 7: Calculating engineering parameters...") try: enhanced_layers = self.soil_calculator.enhance_soil_layers(state["processed_layers"]) # Enhanced post-processing for multiple Su values enhanced_layers = self._process_multiple_su_values(enhanced_layers) state["processed_layers"] = enhanced_layers state["workflow_status"] = "parameters_calculated" st.success("โœ… Engineering parameters calculated") except Exception as e: state["extraction_errors"] = state.get("extraction_errors", []) + [f"Parameter calculation error: {str(e)}"] state["workflow_status"] = "calculation_error" st.error(f"โŒ Parameter calculation failed: {str(e)}") return state def _process_multiple_su_values(self, layers: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Process layers that may have multiple Su values and decide on subdivision""" enhanced_layers = [] for layer in layers: # Check if layer description mentions multiple Su values description = layer.get('description', '').lower() # Look for patterns indicating multiple Su values import re # Pattern to find multiple Su values in description su_pattern = r'su[=\s]*(\d+(?:\.\d+)?)\s*(?:kpa|kPa|t/mยฒ|ksc|psi)' su_values = re.findall(su_pattern, description) # Pattern to find Su ranges range_pattern = r'su\s*(?:ranges?|from)\s*(\d+(?:\.\d+)?)\s*(?:-|to)\s*(\d+(?:\.\d+)?)\s*(?:kpa|kPa)' range_match = re.search(range_pattern, description) # Pattern to find averaged Su values avg_pattern = r'su\s*(?:averaged|average|mean)\s*(?:from)?\s*(?:\d+\s*measurements?)?\s*[:\s]*(\d+(?:\.\d+)?)' avg_match = re.search(avg_pattern, description) if len(su_values) > 1: # Multiple Su values found - decide on subdivision or averaging su_nums = [float(val) for val in su_values] # Check variation min_su = min(su_nums) max_su = max(su_nums) avg_su = sum(su_nums) / len(su_nums) variation = (max_su - min_su) / avg_su if avg_su > 0 else 0 if variation > 0.5 or max_su / min_su > 2.0: # High variation - suggest layer subdivision layer['subdivision_suggested'] = True layer['su_variation_high'] = True layer['su_values_found'] = su_nums layer['su_variation_ratio'] = max_su / min_su if min_su > 0 else 0 layer['subdivision_reason'] = f"High Su variation: {min_su:.1f}-{max_su:.1f} kPa (ratio: {max_su/min_su:.1f}x)" # Update description to highlight the issue layer['description'] += f" [SUBDIVISION RECOMMENDED: Su varies {min_su:.1f}-{max_su:.1f} kPa]" st.warning(f"๐Ÿ”„ Layer {layer.get('layer_id', '?')}: High Su variation detected - subdivision recommended") else: # Low variation - use average layer['su_averaged'] = True layer['su_values_found'] = su_nums layer['su_average_used'] = avg_su layer['strength_value'] = avg_su layer['description'] += f" [Su averaged from {len(su_nums)} values: {', '.join([f'{v:.1f}' for v in su_nums])} kPa โ†’ {avg_su:.1f} kPa]" st.info(f"๐Ÿ“Š Layer {layer.get('layer_id', '?')}: Averaged {len(su_nums)} Su values: {avg_su:.1f} kPa") elif range_match: # Su range found min_su = float(range_match.group(1)) max_su = float(range_match.group(2)) avg_su = (min_su + max_su) / 2 layer['su_range_found'] = True layer['su_range'] = [min_su, max_su] layer['su_range_average'] = avg_su layer['strength_value'] = avg_su layer['description'] += f" [Su range {min_su:.1f}-{max_su:.1f} kPa, using average {avg_su:.1f} kPa]" st.info(f"๐Ÿ“Š Layer {layer.get('layer_id', '?')}: Su range processed, using average {avg_su:.1f} kPa") elif avg_match: # Averaged Su value already mentioned avg_su = float(avg_match.group(1)) layer['su_pre_averaged'] = True layer['su_average_value'] = avg_su layer['strength_value'] = avg_su # Add metadata for tracking layer['su_processing_applied'] = True enhanced_layers.append(layer) return enhanced_layers def _optimize_layers(self, state: SoilAnalysisState) -> SoilAnalysisState: """Optimize layer division and grouping""" st.info("โš™๏ธ Step 8: Optimizing layer division...") try: from soil_analyzer import SoilLayerAnalyzer analyzer = SoilLayerAnalyzer() # Validate layer continuity validated_layers = analyzer.validate_layer_continuity(state["processed_layers"]) # Calculate statistics stats = analyzer.calculate_layer_statistics(validated_layers) state["validation_stats"] = stats # Optimize layer division optimization = analyzer.optimize_layer_division( validated_layers, merge_similar=state.get("merge_similar", True), split_thick=state.get("split_thick", True) ) state["optimization_results"] = optimization # Use optimized layers state["processed_layers"] = optimization.get("optimized_layers", validated_layers) state["workflow_status"] = "optimized" st.success("โœ… Layer optimization completed") except Exception as e: state["extraction_errors"] = state.get("extraction_errors", []) + [f"Optimization error: {str(e)}"] state["workflow_status"] = "optimization_error" st.error(f"โŒ Layer optimization failed: {str(e)}") return state def _finalize_results(self, state: SoilAnalysisState) -> SoilAnalysisState: """Finalize and package results""" st.info("๐Ÿ“ฆ Step 9: Finalizing results...") try: # Generate processing summary processing_summary = self.soil_processor.get_processing_summary(state["processed_layers"]) state["processing_summary"] = processing_summary # Package final results final_soil_data = { "project_info": state["project_info"], "soil_layers": state["processed_layers"], "water_table": state["water_table"], "notes": state["notes"], "processing_summary": processing_summary, "validation_stats": state.get("validation_stats", {}), "optimization_results": state.get("optimization_results", {}), "workflow_metadata": { "model_used": state["model"], "processing_steps": 9, "total_layers": len(state["processed_layers"]), "ss_samples": processing_summary.get("ss_samples", 0), "st_samples": processing_summary.get("st_samples", 0) } } state["final_soil_data"] = final_soil_data state["workflow_status"] = "completed" st.success("๐ŸŽ‰ Unified soil analysis workflow completed successfully!") except Exception as e: state["extraction_errors"] = state.get("extraction_errors", []) + [f"Finalization error: {str(e)}"] state["workflow_status"] = "finalization_error" st.error(f"โŒ Result finalization failed: {str(e)}") return state def _handle_errors(self, state: SoilAnalysisState) -> SoilAnalysisState: """Handle workflow errors""" st.error("โŒ Workflow encountered errors") errors = state.get("extraction_errors", []) for error in errors: st.error(f" โ€ข {error}") state["workflow_status"] = "failed" state["final_soil_data"] = { "error": "Workflow failed", "errors": errors, "raw_response": state.get("raw_llm_response", "") } return state # Conditional routing functions def _should_continue_after_validation(self, state: SoilAnalysisState) -> str: """Determine next step after input validation""" if state["workflow_status"] == "validated": return "continue" else: return "error" def _should_continue_after_extraction(self, state: SoilAnalysisState) -> str: """Determine next step after LLM extraction - simplified without retry loops""" workflow_status = state.get("workflow_status", "unknown") if workflow_status == "extraction_validated": st.info("โœ… Proceeding to SS/ST classification...") return "continue" else: st.error(f"โŒ Extraction validation failed with status: {workflow_status}") return "error" def _get_gemini_safe_prompt(self) -> str: """Get a simplified, safer prompt for Gemini models to avoid content filtering""" return """You are a geotechnical engineer analyzing soil data. Extract information from soil boring logs and return ONLY valid JSON. Required JSON format: { "project_info": { "project_name": "string", "boring_id": "string", "location": "string", "date": "string", "depth_total": 10.0 }, "soil_layers": [ { "layer_id": 1, "depth_from": 0.0, "depth_to": 2.0, "soil_type": "clay", "description": "description text", "sample_type": "SS", "strength_parameter": "SPT-N", "strength_value": 15, "water_content": 25, "color": "brown", "consistency": "soft" } ], "water_table": {"depth": 3.0, "date_encountered": "2024-01-01"}, "notes": "Additional notes" } Key rules: 1. Look for SS-* or ST-* sample identifiers in first column 2. SS samples use SPT-N values, ST samples use Su values 3. **CRITICAL - READ COLUMN HEADERS FOR UNITS**: Look at table headers to identify Su units: - If header shows "Su t/mยฒ" or "Su (t/mยฒ)" โ†’ Units are t/mยฒ - If header shows "Su kPa" or "Su (kPa)" โ†’ Units are kPa - If header shows "Su ksc" or "Su (ksc)" โ†’ Units are ksc 4. **CAREFULLY convert Su units to kPa BASED ON HEADER**: - t/mยฒ โ†’ kPa: multiply by 9.81 (CRITICAL - MOST COMMON ERROR) - ksc/kg/cmยฒ โ†’ kPa: multiply by 98.0 - psi โ†’ kPa: multiply by 6.895 - MPa โ†’ kPa: multiply by 1000 - kPa โ†’ kPa: no conversion (use directly) 5. Extract water content when available 6. Check Su-water content correlation (soft clay: Su<50kPa, w%>30%) 7. Group similar layers (maximum 7 layers total) 8. Return ONLY the JSON object, no explanatory text 9. Start response with { and end with }""" def _get_unified_system_prompt(self) -> str: """Get the comprehensive system prompt for unified processing""" return """You are an expert geotechnical engineer specializing in soil boring log interpretation. IMPORTANT: You must respond with ONLY valid JSON data. Do not include any text before or after the JSON. SAMPLE TYPE IDENTIFICATION (CRITICAL - FOLLOW EXACT ORDER): **STEP 1 - FIRST COLUMN STRATIFICATION SYMBOLS (ABSOLUTE HIGHEST PRIORITY):** ALWAYS look at the FIRST COLUMN of each layer for stratification symbols: - **SS-1, SS-2, SS-18, SS18, SS-5** โ†’ SS (Split Spoon) sample - **ST-1, ST-2, ST-5, ST5, ST-12** โ†’ ST (Shelby Tube) sample - **SS1, SS2, SS3** (without dash) โ†’ SS sample - **ST1, ST2, ST3** (without dash) โ†’ ST sample - **Look for pattern: [SS|ST][-]?[0-9]+** in first column **EXAMPLES of First Column Recognition:** ``` SS-18 | Brown clay, N=8 โ†’ sample_type="SS" (SS-18 in first column) ST-5 | Gray clay, Su=45 kPa โ†’ sample_type="ST" (ST-5 in first column) SS12 | Sandy clay, SPT test โ†’ sample_type="SS" (SS12 in first column) ST3 | Soft clay, unconfined โ†’ sample_type="ST" (ST3 in first column) ``` **STEP 2 - If NO first column symbols, then check description keywords:** - SS indicators: "split spoon", "SPT", "standard penetration", "disturbed" - ST indicators: "shelby", "tube", "undisturbed", "UT", "unconfined compression" **STEP 3 - If still unclear, use strength parameter type:** - SPT-N values present โ†’ likely SS sample - Su values from unconfined test โ†’ likely ST sample CRITICAL SOIL CLASSIFICATION RULES (MANDATORY): **SAND LAYER CLASSIFICATION REQUIREMENTS:** 1. **Sand layers MUST have sieve analysis evidence** - Look for: - "Sieve #200: X% passing" or "#200 passing: X%" - "Fines content: X%" (same as sieve #200) - "Particle size analysis" or "gradation test" - "% passing 0.075mm" (equivalent to #200 sieve) 2. **Classification Rules**: - Sieve #200 >50% passing โ†’ CLAY (fine-grained) - Sieve #200 <50% passing โ†’ SAND/GRAVEL (coarse-grained) 3. **NO SIEVE ANALYSIS = ASSUME CLAY (MANDATORY)**: - If no sieve analysis data found โ†’ ALWAYS classify as CLAY - Include note: "Assumed clay - no sieve analysis data available" - Set sieve_200_passing: null (not a number) **CRITICAL**: Never classify as sand/silt without explicit sieve analysis evidence **CRITICAL**: Always look for sieve #200 data before classifying as sand CRITICAL SS/ST SAMPLE RULES (MUST FOLLOW): FOR SS (Split Spoon) SAMPLES: 1. ALWAYS use RAW N-VALUE (not N-corrected, N-correction, or adjusted N) 2. Look for: "N = 15", "SPT-N = 8", "raw N = 20", "field N = 12" 3. IGNORE: "N-corrected = 25", "N-correction = 18", "adjusted N = 30" 4. For clay: Use SPT-N parameter (will be converted to Su using Su=5*N) 5. For sand/silt: Use SPT-N parameter (will be converted to friction angle) 6. NEVER use unconfined compression Su values for SS samples - ONLY use N values FOR ST (Shelby Tube) SAMPLES: 1. ALWAYS USE DIRECT Su values from unconfined compression test 2. If ST sample has Su value (e.g., "Su = 25 kPa"), use that EXACT value 3. NEVER convert SPT-N to Su for ST samples when direct Su is available 4. Priority: Direct Su measurement > any other value CRITICAL SU VALUE EXTRACTION - MULTIPLE VALUES PER LAYER: **EXTRACT ALL SU VALUES IN COLUMN (CRITICAL ENHANCEMENT):** **STEP 1 - SCAN ENTIRE SU COLUMN FOR EACH LAYER:** 1. Look for ALL Su values that fall within each layer's depth range 2. Extract EVERY Su value found in the Su column for that depth interval 3. Record ALL values with their exact depths if specified 4. Note: A single layer may have multiple Su measurements at different depths **STEP 2 - HANDLE MULTIPLE SU VALUES PER LAYER:** For layers with multiple Su values, you have several options: Option A - **LAYER SUBDIVISION (PREFERRED for significant variation):** - If Su values vary by >50% or have >2x ratio โ†’ Split into sublayers - Example: Layer 2.0-6.0m has Su values [25, 45, 80] kPa - Split into: Layer 2.0-3.5m (Su=25kPa), Layer 3.5-5.0m (Su=45kPa), Layer 5.0-6.0m (Su=80kPa) Option B - **AVERAGE SU VALUES (for similar values):** - If Su values are within ยฑ30% of mean โ†’ Use average - Example: Layer 1.0-3.0m has Su values [35, 40, 38] kPa โ†’ Use Su=37.7kPa - Include note: "Su averaged from 3 measurements: 35, 40, 38 kPa" Option C - **REPRESENTATIVE VALUE (for clusters):** - If multiple similar values with one outlier โ†’ Use cluster average - Example: Su values [25, 28, 26, 45] โ†’ Use 26.3kPa (ignore outlier 45) **STEP 3 - DOCUMENT ALL VALUES FOUND:** Always include in description: - "Su values found: 25, 35, 42 kPa (averaged to 34 kPa)" - "Multiple Su measurements: 30, 28, 32 kPa at depths 2.1, 2.5, 2.8m" - "Su ranges from 40-60 kPa, used average 50 kPa" CRITICAL UNIT CONVERSION REQUIREMENTS (MUST APPLY): **MANDATORY SU UNIT CONVERSION - READ COLUMN HEADERS FIRST:** **STEP 1 - IDENTIFY UNITS FROM TABLE HEADERS (CRITICAL):** ALWAYS look at the column headers to identify Su units: - "Su t/mยฒ" or "Su (t/mยฒ)" in header โ†’ Values are in t/mยฒ - "Su kPa" or "Su (kPa)" in header โ†’ Values are in kPa - "Su ksc" or "Su (ksc)" in header โ†’ Values are in ksc - "Su psi" or "Su (psi)" in header โ†’ Values are in psi - Just "Su" with units below โ†’ Look at unit row (e.g., "t/mยฒ") **STEP 2 - CONVERT TO kPa BASED ON IDENTIFIED UNITS:** When extracting Su values from images or text, you MUST convert to kPa BEFORE using the value: 1. **ksc or kg/cmยฒ**: Su_kPa = Su_ksc ร— 98.0 Example: "Su = 2.5 ksc" โ†’ strength_value: 245 (not 2.5) 2. **t/mยฒ (tonnes/mยฒ)**: Su_kPa = Su_tonnes ร— 9.81 Example: "Su = 3.0 t/mยฒ" โ†’ strength_value: 29.43 (not 3.0) **CRITICAL**: This is the MOST COMMON unit in boring logs! 3. **psi**: Su_kPa = Su_psi ร— 6.895 Example: "Su = 50 psi" โ†’ strength_value: 344.75 (not 50) 4. **psf**: Su_kPa = Su_psf ร— 0.048 Example: "Su = 1000 psf" โ†’ strength_value: 48 (not 1000) 5. **kPa**: Use directly (no conversion needed) Example: "Su = 75 kPa" โ†’ strength_value: 75 6. **MPa**: Su_kPa = Su_MPa ร— 1000 Example: "Su = 0.1 MPa" โ†’ strength_value: 100 (not 0.1) **CRITICAL EXAMPLES FROM BORING LOGS:** - Table header shows "Su t/mยฒ", value 1.41 โ†’ strength_value: 13.83 (1.41 ร— 9.81) - Table header shows "Su t/mยฒ", value 2.41 โ†’ strength_value: 23.64 (2.41 ร— 9.81) - Table header shows "Su kPa", value 75 โ†’ strength_value: 75 (no conversion) **IMPORTANT**: Always include original unit in description for verification **SPT-N values**: Keep as-is (no unit conversion needed) CRITICAL SU-WATER CONTENT VALIDATION (MANDATORY): **EXTRACT WATER CONTENT WHEN AVAILABLE:** Always extract water content (w%) when mentioned in the description: - \"water content = 25%\" โ†’ water_content: 25 - \"w = 30%\" โ†’ water_content: 30 - \"moisture content 35%\" โ†’ water_content: 35 **VALIDATE SU-WATER CONTENT CORRELATION:** For clay layers, Su and water content should correlate reasonably: - Very soft clay: Su < 25 kPa, w% > 40% - Soft clay: Su 25-50 kPa, w% 30-40% - Medium clay: Su 50-100 kPa, w% 20-30% - Stiff clay: Su 100-200 kPa, w% 15-25% - Very stiff clay: Su 200-400 kPa, w% 10-20% - Hard clay: Su > 400 kPa, w% < 15% **CRITICAL UNIT CHECK SCENARIOS:** - If Su > 1000 kPa with w% > 20%: CHECK if Su is in wrong units (psi, psf?) - If Su < 5 kPa with w% < 15%: CHECK if Su is in wrong units (MPa, bar?) - If correlation seems very off: VERIFY unit conversion was applied correctly CRITICAL OUTPUT FORMAT (MANDATORY): You MUST respond with ONLY a valid JSON object. Do not include: - Explanatory text before or after the JSON - Markdown formatting (```json ```) - Comments or notes - Multiple JSON objects Start your response directly with { and end with } EXAMPLE CORRECT RESPONSE FORMAT: { "project_info": { "project_name": "Sample Project", "boring_id": "BH-01", "location": "Sample Location", "date": "2024-06-25", "depth_total": 10.0 }, "soil_layers": [ { "layer_id": 1, "depth_from": 0.0, "depth_to": 2.0, "soil_type": "clay", "description": "Brown clay, soft, SS-1 sample", "sample_type": "SS", "strength_parameter": "SPT-N", "strength_value": 4, "water_content": 35, "color": "brown", "consistency": "soft" } ], "water_table": {"depth": 3.0, "date_encountered": "2024-06-25"}, "notes": "Standard soil boring analysis" } LAYER GROUPING REQUIREMENTS: 1. MAXIMUM 7 LAYERS TOTAL - Group similar adjacent layers to achieve this limit 2. CLAY AND SAND MUST BE SEPARATE - Never combine clay layers with sand layers 3. Group adjacent layers with similar properties (same soil type and similar consistency) 4. Prioritize engineering significance over minor variations Analyze the provided soil boring log and extract the following information in this exact JSON format: { "project_info": { "project_name": "string", "boring_id": "string", "location": "string", "date": "string", "depth_total": 10.0 }, "soil_layers": [ { "layer_id": 1, "depth_from": 0.0, "depth_to": 2.5, "soil_type": "clay", "description": "Brown silty clay, ST sample, Su = 25 kPa", "sample_type": "ST", "strength_parameter": "Su", "strength_value": 25, "sieve_200_passing": 65, "water_content": 35.5, "color": "brown", "moisture": "moist", "consistency": "soft", "su_source": "Unconfined Compression Test" } ], "water_table": { "depth": 3.0, "date_encountered": "2024-01-01" }, "notes": "Additional observations" } **CRITICAL EXAMPLES - MULTIPLE SU VALUES PER LAYER:** **EXAMPLE 1 - Multiple Su Values (SUBDIVISION CASE):** Layer depth 2.0-6.0m with Su column showing: - "Su at 2.5m = 25 kPa" - "Su at 4.0m = 45 kPa" - "Su at 5.5m = 80 kPa" PROCESSING: High variation (25-80 kPa, ratio 3.2x) โ†’ SUBDIVISION RECOMMENDED โ†’ Include ALL values in description: "Multiple Su values: 25, 45, 80 kPa [SUBDIVISION RECOMMENDED: High variation]" โ†’ Use representative value (middle): strength_value=45 โ†’ Add metadata: subdivision_suggested=true, su_variation_high=true **EXAMPLE 2 - Multiple Similar Su Values (AVERAGING CASE):** Layer depth 1.0-3.0m with Su column showing: - "Su = 35 kPa" - "Su = 40 kPa" - "Su = 38 kPa" PROCESSING: Low variation (ยฑ7% from mean) โ†’ USE AVERAGE โ†’ Description: "Su averaged from 3 measurements: 35, 40, 38 kPa โ†’ 37.7 kPa" โ†’ Use: strength_value=37.7 **EXAMPLE 3 - Su Range Detection:** Layer with Su column: "Su ranges 40-60 kPa" โ†’ Description: "Su range 40-60 kPa, using average 50 kPa" โ†’ Use: strength_value=50 EXAMPLES OF CORRECT FIRST COLUMN SYMBOL RECOGNITION: **SS SAMPLE EXAMPLES (First Column Priority):** 1. "SS-18 | Clay layer, N = 8, Su = 45 kPa from unconfined test" โ†’ First column: SS-18 โ†’ sample_type="SS" (HIGHEST PRIORITY) โ†’ Use: strength_parameter="SPT-N", strength_value=8 โ†’ IGNORE the Su=45 kPa value for SS samples 2. "SS18 | Soft clay, field N = 6, N-corrected = 10" โ†’ First column: SS18 โ†’ sample_type="SS" (HIGHEST PRIORITY) โ†’ Use: strength_parameter="SPT-N", strength_value=6 (raw N) โ†’ IGNORE N-corrected value 3. "SS-5 | Brown clay, split spoon test, N=12" โ†’ First column: SS-5 โ†’ sample_type="SS" (HIGHEST PRIORITY) โ†’ Use: strength_parameter="SPT-N", strength_value=12 **ST SAMPLE EXAMPLES (First Column Priority):** 1. "ST-5 | Stiff clay, Su = 85 kPa from unconfined compression" โ†’ First column: ST-5 โ†’ sample_type="ST" (HIGHEST PRIORITY) โ†’ Use: strength_parameter="Su", strength_value=85 2. "ST-12 | Medium clay, Su = 2.5 ksc from unconfined test" โ†’ First column: ST-12 โ†’ sample_type="ST" (HIGHEST PRIORITY) โ†’ Convert: 2.5 ร— 98 = 245 kPa โ†’ Use: strength_parameter="Su", strength_value=245 3. "ST3 | Clay, unconfined strength = 3.0 t/mยฒ" โ†’ First column: ST3 โ†’ sample_type="ST" (HIGHEST PRIORITY) โ†’ Convert: 3.0 ร— 9.81 = 29.43 kPa โ†’ Use: strength_parameter="Su", strength_value=29.43 4. "ST-8 | Gray clay, shelby tube, Su = 120 kPa" โ†’ First column: ST-8 โ†’ sample_type="ST" (HIGHEST PRIORITY) โ†’ Use: strength_parameter="Su", strength_value=120 5. "ST-10 | Gray clay, depth 3.0-6.0m, Su values: 35, 42, 39 kPa" โ†’ First column: ST-10 โ†’ sample_type="ST" (HIGHEST PRIORITY) โ†’ Multiple values detected: variation <30% โ†’ Use average โ†’ Use: strength_parameter="Su", strength_value=38.7 โ†’ Description: "Gray clay, shelby tube, Su averaged from 3 measurements: 35, 42, 39 kPa โ†’ 38.7 kPa" 6. "ST-15 | Stiff clay, Su measurements: 45, 85, 120 kPa at different depths" โ†’ First column: ST-15 โ†’ sample_type="ST" (HIGHEST PRIORITY) โ†’ High variation detected: ratio 2.7x โ†’ SUBDIVISION RECOMMENDED โ†’ Use: strength_parameter="Su", strength_value=85 (middle value) โ†’ Description: "Stiff clay, multiple Su values: 45, 85, 120 kPa [SUBDIVISION RECOMMENDED: High variation]" **SOIL CLASSIFICATION EXAMPLES:** 1. "Brown silty clay, no sieve analysis data" โ†’ soil_type="clay", sieve_200_passing=null โ†’ Note: "Assumed clay - no sieve analysis data available" 2. "Sandy clay, sieve #200: 75% passing" โ†’ soil_type="clay", sieve_200_passing=75 โ†’ Classification: Clay (>50% passing) 3. "Medium sand, gradation test shows 25% passing #200" โ†’ soil_type="sand", sieve_200_passing=25 โ†’ Classification: Sand (<50% passing) 4. "Dense sand layer" (NO sieve data mentioned) โ†’ soil_type="clay", sieve_200_passing=null โ†’ Note: "Assumed clay - no sieve analysis data available" โ†’ NEVER classify as sand without sieve data TECHNICAL RULES: 1. All numeric values must be numbers, not strings 2. For soil_type, use basic terms: "clay", "sand", "silt", "gravel" - do NOT include consistency 3. Include sample_type field: "SS" (Split Spoon) or "ST" (Shelby Tube) 4. Include sieve_200_passing field when available (percentage passing sieve #200) 5. Include water_content field when available (percentage water content for clay consistency checks) 6. Include su_source field: "Unconfined Compression Test" for direct measurements, or "Calculated from SPT-N" for conversions 7. Strength parameters: - SS samples: ALWAYS use "SPT-N" with RAW N-value (will be converted based on soil type) - ST samples with clay: Use "Su" with DIRECT value in kPa from unconfined compression test - For sand/gravel: Always use "SPT-N" with N-value - NEVER use Su for SS samples, NEVER calculate Su from SPT-N for ST samples that have direct Su 8. Put consistency separately in "consistency" field: "soft", "medium", "stiff", "loose", "dense", etc. 9. Ensure continuous depths (no gaps or overlaps) 10. All depths in meters, strength values as numbers 11. Return ONLY the JSON object, no additional text""" def _parse_llm_response(self, response: str) -> Dict[str, Any]: """Parse LLM JSON response with enhanced error handling""" # First check if response is empty or None if not response or not response.strip(): return {"error": "Empty response from LLM", "raw_response": response or ""} try: # Clean response json_str = response.strip() # Log raw response for debugging (first 500 chars) st.info(f"๐Ÿ“ Raw LLM response preview: {json_str[:500]}{'...' if len(json_str) > 500 else ''}") # Remove markdown code blocks if present if "```json" in json_str: json_start = json_str.find("```json") + 7 json_end = json_str.find("```", json_start) if json_end == -1: json_end = len(json_str) json_str = json_str[json_start:json_end].strip() st.info("๐Ÿ”ง Extracted JSON from markdown code block") elif "```" in json_str: json_start = json_str.find("```") + 3 json_end = json_str.rfind("```") if json_end > json_start: json_str = json_str[json_start:json_end].strip() st.info("๐Ÿ”ง Extracted content from code block") # Handle cases where LLM includes explanatory text before/after JSON # Look for JSON object boundaries more aggressively brace_start = json_str.find("{") brace_end = json_str.rfind("}") if brace_start != -1 and brace_end != -1 and brace_end > brace_start: json_str = json_str[brace_start:brace_end + 1] st.info(f"๐Ÿ”ง Extracted JSON object: {len(json_str)} characters") elif not json_str.startswith("{"): # No JSON found return { "error": f"No JSON object found in response. Response appears to be: {json_str[:200]}", "raw_response": response } # Try to parse JSON result = json.loads(json_str) # Validate structure if not isinstance(result, dict): return {"error": f"Expected JSON object, got {type(result)}", "raw_response": response} if "soil_layers" not in result: result["soil_layers"] = [] st.warning("โš ๏ธ No 'soil_layers' found in response, using empty list") if "project_info" not in result: result["project_info"] = {} st.warning("โš ๏ธ No 'project_info' found in response, using empty dict") st.success(f"โœ… JSON parsed successfully: {len(result.get('soil_layers', []))} layers found") return result except json.JSONDecodeError as e: error_msg = f"JSON parsing failed: {str(e)}" st.error(f"โŒ {error_msg}") st.error(f"๐Ÿ“ Problematic content: {json_str[:300] if 'json_str' in locals() else 'N/A'}") return {"error": error_msg, "raw_response": response} except Exception as e: error_msg = f"Response parsing failed: {str(e)}" st.error(f"โŒ {error_msg}") return {"error": error_msg, "raw_response": response} def get_workflow_visualization(self) -> str: """Get a visual representation of the workflow steps""" return """ ๐Ÿš€ **Unified Soil Analysis Workflow** ๐Ÿš€ **Step 1** ๐Ÿ” **Validate Inputs** โ†’ Check API key, content, model **Step 2** ๐Ÿค– **Extract with LLM** โ†’ Use enhanced prompts for SS/ST classification **Step 3** โœ… **Validate Extraction** โ†’ Check layer structure and data quality **Step 4** ๐Ÿงช **Process SS/ST Classification** โ†’ Apply sample-specific processing **Step 5** ๐Ÿ”ง **Apply Unit Conversions** โ†’ Convert all values to SI units (kPa) **Step 6** ๐ŸŽฏ **Validate Soil Classification** โ†’ Enforce sieve analysis requirements **Step 7** ๐Ÿ“Š **Calculate Parameters** โ†’ Compute Su, ฯ†, and other properties **Step 8** โš™๏ธ **Optimize Layers** โ†’ Group and validate layer continuity **Step 9** ๐Ÿ“ฆ **Finalize Results** โ†’ Package complete analysis results **Key Features:** โ€ข **Unified Processing**: Single workflow handles all steps โ€ข **SS/ST Classification**: Automatic sample type identification โ€ข **Unit Conversion**: All Su values converted to kPa from images/text โ€ข **Sieve Analysis Enforcement**: Sand layers require #200 sieve data โ€ข **Error Handling**: Comprehensive validation and recovery โ€ข **State Management**: Complete workflow state tracking """ def analyze_soil_boring_log(self, text_content: Optional[str] = None, image_base64: Optional[str] = None, model: str = None, api_key: str = None, merge_similar: bool = True, split_thick: bool = True) -> Dict[str, Any]: """ Run the unified soil analysis workflow Args: text_content: Extracted text from document image_base64: Base64 encoded image model: LLM model to use api_key: OpenRouter API key merge_similar: Whether to merge similar layers split_thick: Whether to split thick layers Returns: Complete soil analysis results """ # Initialize state initial_state = SoilAnalysisState( text_content=text_content, image_base64=image_base64, model=model or get_default_provider_and_model()[1], api_key=api_key or "", merge_similar=merge_similar, split_thick=split_thick, llm_extraction_success=False, extraction_errors=[], retry_count=0, # Initialize retry counter project_info={}, raw_soil_layers=[], processed_layers=[], water_table={}, notes="", processing_summary={}, validation_stats={}, optimization_results={}, final_soil_data={}, workflow_status="initializing", workflow_messages=[] ) # Run workflow st.info("๐Ÿš€ Starting unified soil analysis workflow...") try: # Execute the workflow with recursion limit protection final_state = self.workflow.invoke( initial_state, config={"recursion_limit": 50} # Set explicit recursion limit ) # Return results if final_state["workflow_status"] == "completed": st.success("๐ŸŽ‰ Unified workflow completed successfully!") return final_state["final_soil_data"] else: st.error(f"โŒ Workflow failed with status: {final_state['workflow_status']}") return final_state["final_soil_data"] except Exception as e: error_msg = str(e) if "recursion limit" in error_msg.lower(): st.error("โŒ Workflow execution failed: Recursion limit reached. This may indicate a configuration issue with the model or workflow logic.") st.info("๐Ÿ’ก Try using a different model or check your input data format.") else: st.error(f"โŒ Workflow execution failed: {error_msg}") return { "error": f"Workflow execution failed: {error_msg}", "workflow_status": "execution_failed" }