import re import numpy as np import streamlit as st from typing import Dict, List, Any, Tuple, Optional class SoilClassificationProcessor: """ Advanced soil classification processor that handles SS and ST samples with proper unit conversions and soil parameter calculations """ def __init__(self): # Enhanced unit conversion factors to SI units self.unit_conversions = { # Pressure/Stress units to kPa 'psi': 6.895, 'psf': 0.04788, 'kpa': 1.0, 'kn/m2': 1.0, 'kn/m²': 1.0, 'knm2': 1.0, 'mpa': 1000.0, 'pa': 0.001, 'n/m2': 0.001, 'n/m²': 0.001, 'nm2': 0.001, 'ksf': 47.88, 'tsf': 95.76, 'kg/cm2': 98.0, 'kg/cm²': 98.0, 'kgcm2': 98.0, 'ksc': 98.0, # kilograms per square centimeter (same as kg/cm²) 'bar': 100.0, 'atm': 101.325, # atmosphere to kPa 'mmhg': 0.133322, # mmHg to kPa 'inhg': 3.386, # inHg to kPa # Enhanced tonnes/tons per square meter conversions 't/m2': 9.81, # tonnes per square meter to kPa 't/m²': 9.81, # tonnes per square meter to kPa 'tm2': 9.81, # tm2 variant 'ton/m2': 9.81, # ton per square meter to kPa 'ton/m²': 9.81, # ton per square meter to kPa 'tonm2': 9.81, # tonm2 variant 'tonnes/m2': 9.81, # tonnes per square meter to kPa 'tonnes/m²': 9.81, # tonnes per square meter to kPa 'tonnesm2': 9.81, # tonnesm2 variant 'tonne/m2': 9.81, # tonne per square meter to kPa 'tonne/m²': 9.81, # tonne per square meter to kPa 'tonnem2': 9.81, # tonnem2 variant # Additional international pressure units 'kgf/cm2': 98.0, # kilogram-force per cm² 'kgf/cm²': 98.0, # kilogram-force per cm² 'kgfcm2': 98.0, # variant without symbols 'lbf/in2': 6.895, # pound-force per square inch (same as psi) 'lbf/ft2': 0.04788, # pound-force per square foot (same as psf) 'lbfin2': 6.895, # variant without symbols 'lbfft2': 0.04788, # variant without symbols # Length units to meters (enhanced) 'ft': 0.3048, 'feet': 0.3048, 'foot': 0.3048, "'": 0.3048, # foot symbol 'in': 0.0254, 'inch': 0.0254, 'inches': 0.0254, '"': 0.0254, # inch symbol 'cm': 0.01, 'mm': 0.001, 'km': 1000.0, 'm': 1.0, 'meter': 1.0, 'metre': 1.0, 'meters': 1.0, 'metres': 1.0, 'yd': 0.9144, # yard to meters 'yard': 0.9144, 'yards': 0.9144, # Weight/Force units (for completeness) 'n': 1.0, # Newton (SI base) 'kn': 1000.0, # kilonewton to Newton 'kgf': 9.81, # kilogram-force to Newton 'lbf': 4.448, # pound-force to Newton 'lb': 4.448, # pound (assuming force context) 'kg': 9.81, # kilogram (assuming force context, kg*g) } # Soil classification criteria self.sieve_200_threshold = 50.0 # % passing sieve #200 for clay classification def process_soil_layers(self, layers: List[Dict]) -> List[Dict]: """ Process soil layers with SS/ST sample classification and parameter calculation """ processed_layers = [] st.info("🔬 Processing soil layers with SS/ST sample classification...") for i, layer in enumerate(layers): processed_layer = layer.copy() # Step 1: Identify sample type (SS or ST) sample_type = self._identify_sample_type(layer) processed_layer['sample_type'] = sample_type # Step 2: Classify soil type if not already classified soil_type = self._classify_soil_type(layer) processed_layer['soil_type'] = soil_type # Step 3: Process based on sample type if sample_type == 'ST': processed_layer = self._process_st_sample(processed_layer) elif sample_type == 'SS': processed_layer = self._process_ss_sample(processed_layer) else: # Default processing for unidentified samples processed_layer = self._process_default_sample(processed_layer) # Step 4: Ensure all units are in SI processed_layer = self._convert_to_si_units(processed_layer) # Step 5: Validate and add engineering parameters processed_layer = self._add_engineering_parameters(processed_layer) # Step 6: Check clay consistency (water content vs Su) processed_layer = self._check_clay_consistency(processed_layer) processed_layers.append(processed_layer) # Progress feedback st.write(f" ✅ Layer {i+1}: {sample_type} sample, {soil_type} - {processed_layer.get('strength_parameter', 'N/A')}") st.success(f"✅ Processed {len(processed_layers)} soil layers with SS/ST classification") return processed_layers def _identify_sample_type(self, layer: Dict) -> str: """ Identify if sample is Split Spoon (SS) or Shelby Tube (ST) CRITICAL: Look at FIRST COLUMN stratification symbols with ABSOLUTE HIGHEST PRIORITY """ description = layer.get('description', '').lower() # ABSOLUTE HIGHEST PRIORITY: Check for first column stratification symbols # Patterns for first column recognition: SS-18, ST-5, SS18, ST3, etc. first_column_patterns = [ # High precision patterns for first column symbols r'^[^|]*\b(ss[-]?\d+)\b', # SS-18, SS18 at start or before pipe r'^[^|]*\b(st[-]?\d+)\b', # ST-5, ST5 at start or before pipe r'^\s*(ss[-]?\d+)', # SS-number at very beginning r'^\s*(st[-]?\d+)', # ST-number at very beginning r'\|(.*?)(ss[-]?\d+)', # After pipe separator r'\|(.*?)(st[-]?\d+)', # After pipe separator r'\b(ss[-]?\d+)\s*[|:]', # SS-number followed by pipe or colon r'\b(st[-]?\d+)\s*[|:]', # ST-number followed by pipe or colon ] for pattern in first_column_patterns: match = re.search(pattern, description, re.IGNORECASE) if match: # Get the SS/ST part (could be in different groups) matched_groups = [g for g in match.groups() if g and ('ss' in g.lower() or 'st' in g.lower())] if matched_groups: matched_text = matched_groups[0].lower().strip() if matched_text.startswith('ss'): st.success(f"🎯 FIRST COLUMN DETECTED: {matched_text.upper()} → SS sample (HIGHEST PRIORITY)") return 'SS' elif matched_text.startswith('st'): st.success(f"🎯 FIRST COLUMN DETECTED: {matched_text.upper()} → ST sample (HIGHEST PRIORITY)") return 'ST' # FALLBACK: Check for standalone SS/ST symbols (lower priority) standalone_patterns = [ r'\bss\b(?!\w)', # Just SS (not part of another word) r'\bst\b(?!\w)' # Just ST (not part of another word) ] for pattern in standalone_patterns: match = re.search(pattern, description, re.IGNORECASE) if match: matched_text = match.group(0).lower() if matched_text == 'ss': st.info(f"📊 Standalone symbol detected: SS → SS sample") return 'SS' elif matched_text == 'st': st.info(f"📊 Standalone symbol detected: ST → ST sample") return 'ST' # SECOND: Check for keywords in description # Keywords for ST samples st_keywords = ['shelby', 'tube', 'undisturbed', 'ut', 'unconfined', 'uu test', 'ucs'] # Keywords for SS samples ss_keywords = ['split spoon', 'spt', 'standard penetration', 'disturbed', 'n-value'] # Check for ST indicators if any(keyword in description for keyword in st_keywords): return 'ST' # Check for SS indicators if any(keyword in description for keyword in ss_keywords): return 'SS' # THIRD: Check strength parameter types # Check if SPT-N value is present (indicates SS) if layer.get('strength_parameter') == 'SPT-N' or 'spt' in description: return 'SS' # Check if Su value is present (could indicate ST) if layer.get('strength_parameter') == 'Su' or 'su' in description.lower(): return 'ST' # FOURTH: Default assumption based on available data if layer.get('strength_value') and layer.get('strength_value') > 50: return 'SS' # High values typically SPT-N else: return 'ST' # Lower values typically Su def _classify_soil_type(self, layer: Dict) -> str: """ Enhanced soil type classification with MANDATORY sieve analysis requirement for sand CRITICAL: Sand layers MUST have sieve analysis evidence - otherwise assume clay """ # Check if soil type is already specified and validate it existing_type = layer.get('soil_type', '').lower() if existing_type and existing_type != 'unknown': # If it's sand/gravel, verify sieve analysis exists if existing_type in ['sand', 'silt', 'gravel']: sieve_200_passing = self._extract_sieve_200_data(layer) if sieve_200_passing is None: st.warning(f"⚠️ '{existing_type}' classification without sieve analysis data. OVERRIDING to 'clay' per requirements.") layer['classification_override'] = f"Changed from '{existing_type}' to 'clay' - no sieve analysis data" return 'clay' else: st.success(f"✅ '{existing_type}' classification confirmed with sieve #200: {sieve_200_passing}% passing") return existing_type else: return existing_type description = layer.get('description', '').lower() # CRITICAL: Check for sieve analysis data FIRST before any classification sieve_200_passing = self._extract_sieve_200_data(layer) if sieve_200_passing is not None: # Sieve analysis data available - use it for classification if sieve_200_passing > self.sieve_200_threshold: classification = 'clay' # Fine-grained soil st.success(f"✅ Classified as CLAY: {sieve_200_passing}% passing #200 (>50%)") else: classification = 'sand' # Coarse-grained soil st.success(f"✅ Classified as SAND: {sieve_200_passing}% passing #200 (<50%)") layer['sieve_200_passing'] = sieve_200_passing layer['classification_basis'] = f"Sieve analysis: {sieve_200_passing}% passing #200" return classification # NO SIEVE ANALYSIS DATA - Check for explicit mentions but apply strict rules potential_classifications = [] if any(clay_word in description for clay_word in ['clay', 'clayey', 'ch', 'cl']): potential_classifications.append('clay') if any(sand_word in description for sand_word in ['sand', 'sandy', 'sp', 'sw', 'sm', 'sc']): potential_classifications.append('sand') if any(silt_word in description for silt_word in ['silt', 'silty', 'ml', 'mh']): potential_classifications.append('silt') if any(gravel_word in description for gravel_word in ['gravel', 'gp', 'gw', 'gm', 'gc']): potential_classifications.append('gravel') # ENFORCE MANDATORY RULE: No sand/silt/gravel without sieve analysis if any(coarse_type in potential_classifications for coarse_type in ['sand', 'silt', 'gravel']): st.error(f"❌ CRITICAL: Found potential {potential_classifications} classification but NO sieve analysis data!") st.warning(f"🔧 ENFORCING RULE: Classifying as 'clay' - sand/silt/gravel requires sieve analysis evidence") layer['classification_override'] = f"Forced clay classification - found {potential_classifications} terms but no sieve data" layer['sieve_200_passing'] = None layer['classification_basis'] = "Assumed clay - no sieve analysis data available (mandatory requirement)" return 'clay' # Default to clay if only clay terms found or no clear classification if 'clay' in potential_classifications or not potential_classifications: st.info(f"💡 Classified as CLAY: {potential_classifications if potential_classifications else 'No explicit soil type found'}") layer['sieve_200_passing'] = None layer['classification_basis'] = "Assumed clay - no sieve analysis data available" return 'clay' # Final fallback - should not reach here st.warning(f"⚠️ Unclear classification. Defaulting to 'clay' per mandatory requirements.") layer['sieve_200_passing'] = None layer['classification_basis'] = "Default clay classification - unclear soil type and no sieve data" return 'clay' def _extract_sieve_200_data(self, layer: Dict) -> Optional[float]: """ Enhanced sieve #200 passing percentage extraction with comprehensive pattern recognition """ description = layer.get('description', '') # Enhanced patterns to catch all possible sieve analysis formats patterns = [ # Standard #200 sieve patterns r'#200[:\s]*(\d+(?:\.\d+)?)%', r'sieve\s*#?200[:\s]*(\d+(?:\.\d+)?)%', r'no\.?\s*200[:\s]*(\d+(?:\.\d+)?)%', r'passing\s*#?200[:\s]*(\d+(?:\.\d+)?)%', r'(\d+(?:\.\d+)?)%\s*passing\s*#?200', # Fines content (equivalent to #200 passing) r'fines[:\s]*(\d+(?:\.\d+)?)%', r'fine[s]?\s*content[:\s]*(\d+(?:\.\d+)?)%', r'(\d+(?:\.\d+)?)%\s*fines', # 0.075mm equivalent (same as #200) r'0\.075\s*mm[:\s]*(\d+(?:\.\d+)?)%\s*passing', r'(\d+(?:\.\d+)?)%\s*passing\s*0\.075\s*mm', r'0\.075[:\s]*(\d+(?:\.\d+)?)%', # Particle size analysis patterns r'particle\s*size[:\s]*(\d+(?:\.\d+)?)%\s*fines', r'gradation[:\s]*(\d+(?:\.\d+)?)%\s*passing\s*#?200', r'grain\s*size[:\s]*(\d+(?:\.\d+)?)%\s*fines', # Sieve analysis results patterns r'sieve\s*analysis[:\s].*?(\d+(?:\.\d+)?)%\s*passing\s*#?200', r'sieve\s*analysis[:\s].*?#?200[:\s]*(\d+(?:\.\d+)?)%', # ASTM/Standard method references r'astm\s*d422[:\s].*?(\d+(?:\.\d+)?)%\s*passing\s*#?200', r'astm\s*d6913[:\s].*?(\d+(?:\.\d+)?)%\s*passing\s*#?200', # Alternative formats r'(\d+(?:\.\d+)?)%\s*<\s*0\.075\s*mm', # Percent less than 0.075mm r'minus\s*#?200[:\s]*(\d+(?:\.\d+)?)%', # Minus #200 r'(\d+(?:\.\d+)?)%\s*minus\s*#?200', # Percent minus #200 ] for pattern in patterns: match = re.search(pattern, description, re.IGNORECASE) if match: percentage = float(match.group(1)) st.success(f"✅ Found sieve #200 data: {percentage}% passing from '{match.group(0)}'") # Validate percentage range if 0 <= percentage <= 100: return percentage else: st.warning(f"⚠️ Invalid percentage ({percentage}%) found. Should be 0-100%.") return None # Check if explicitly mentioned in layer data if 'sieve_200_passing' in layer and layer['sieve_200_passing'] is not None: percentage = float(layer['sieve_200_passing']) st.success(f"✅ Found sieve #200 data in layer field: {percentage}% passing") return percentage # Check for related field names for field_name in ['fines_content', 'percent_fines', 'fine_content', 'passing_200']: if field_name in layer and layer[field_name] is not None: percentage = float(layer[field_name]) st.success(f"✅ Found sieve #200 equivalent in '{field_name}': {percentage}% passing") return percentage # Log that no sieve analysis was found st.info(f"🔍 No sieve #200 analysis data found in layer description or fields") return None def _process_st_sample(self, layer: Dict) -> Dict: """ Process Shelby Tube (ST) sample - use unconfined compression test (Su) values """ layer['processing_method'] = 'ST - Unconfined Compression Test' # Look for Su values in the data su_value = self._extract_su_value(layer) if su_value is not None: layer['strength_parameter'] = 'Su' layer['strength_value'] = su_value layer['su_source'] = 'Unconfined Compression Test' else: # If no Su value found, check for SPT and convert spt_value = self._extract_spt_value(layer) if spt_value is not None: su_calculated = self._convert_spt_to_su(spt_value) layer['strength_parameter'] = 'Su' layer['strength_value'] = su_calculated layer['su_source'] = f'Calculated from SPT-N={spt_value} (Su=5*N)' layer['original_spt'] = spt_value return layer def _process_ss_sample(self, layer: Dict) -> Dict: """ Process Split Spoon (SS) sample - ALWAYS use SPT values and convert to Su using Su=5*N FOR SS SAMPLES: IGNORE any unconfined compression test Su values, ONLY use calculated Su=5*N """ layer['processing_method'] = 'SS - SPT Conversion (Su=5*N)' # CRITICAL: For SS samples, extract the raw SPT-N value and calculate Su from it spt_value = self._extract_spt_value(layer) soil_type = layer.get('soil_type', 'clay') if spt_value is not None: if soil_type == 'clay': # MANDATORY: Convert SPT to undrained shear strength using Su = 5*N # IGNORE any existing Su values from unconfined compression tests calculated_su = self._convert_spt_to_su(spt_value) # Override any existing Su values for SS samples layer['strength_parameter'] = 'Su' layer['strength_value'] = calculated_su layer['su_source'] = f'Calculated from raw N={spt_value} (Su=5*N) - SS Sample' layer['original_spt'] = spt_value # Clear any conflicting unconfined compression data for SS samples if 'unconfined_su' in layer: layer['unconfined_su_ignored'] = layer.pop('unconfined_su') st.warning(f"⚠️ SS Sample: Ignored unconfined compression Su, using calculated Su={calculated_su:.0f} kPa from N={spt_value}") st.success(f"✅ SS Sample: Su = 5 × {spt_value} = {calculated_su:.0f} kPa") elif soil_type in ['sand', 'silt']: # Convert SPT to friction angle for granular soils phi_value = self._convert_spt_to_friction_angle(spt_value) layer['strength_parameter'] = 'φ' layer['strength_value'] = phi_value layer['friction_angle'] = phi_value layer['phi_source'] = f'Calculated from raw N={spt_value} (Peck method) - SS Sample' layer['original_spt'] = spt_value st.success(f"✅ SS Sample: φ = {phi_value:.1f}° from N={spt_value}") else: # Keep SPT value for other soil types layer['strength_parameter'] = 'SPT-N' layer['strength_value'] = spt_value layer['original_spt'] = spt_value st.info(f"📊 SS Sample: Using raw N={spt_value} for {soil_type}") else: st.error(f"❌ SS Sample: No SPT-N value found in layer data") return layer def _process_default_sample(self, layer: Dict) -> Dict: """ Process sample with unknown type - use available data intelligently """ layer['processing_method'] = 'Default - Based on available data' # Try to identify and process based on existing parameters existing_param = layer.get('strength_parameter', '').lower() if 'su' in existing_param: # Already has Su value return self._process_st_sample(layer) elif 'spt' in existing_param or 'n' in existing_param: # Has SPT value return self._process_ss_sample(layer) else: # Make best guess based on strength value strength_val = layer.get('strength_value', 0) if strength_val and strength_val > 50: # Likely SPT value layer['strength_parameter'] = 'SPT-N' return self._process_ss_sample(layer) else: # Likely Su value layer['strength_parameter'] = 'Su' return self._process_st_sample(layer) def _extract_su_value(self, layer: Dict) -> Optional[float]: """ Enhanced Su (undrained shear strength) extraction with MANDATORY unit conversion checking CRITICAL: All Su values must be converted to kPa before processing """ # Check direct Su field first - but validate units if layer.get('strength_parameter') == 'Su' and layer.get('strength_value') is not None: su_value = float(layer['strength_value']) # Check if this value needs unit conversion (warn if suspiciously low/high) if su_value < 5: st.warning(f"⚠️ Su value {su_value} seems low - verify it's in kPa, not MPa or other units") elif su_value > 2000: st.warning(f"⚠️ Su value {su_value} seems high - verify it's in kPa, not psi or other units") return su_value # Look in description for Su values with enhanced unit detection description = layer.get('description', '') # CRITICAL: Enhanced patterns with explicit unit capture for conversion patterns = [ # Direct Su values with units - CAPTURE UNITS EXPLICITLY r'su[:\s=]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', r'undrained[:\s]*shear[:\s]*strength[:\s]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', r'shear\s*strength[:\s]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', r'ucs[:\s]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', r'unconfined[:\s]*compression[:\s]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', # Equation-style patterns r'su\s*=\s*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', r'strength\s*=\s*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', # Embedded unit patterns r'(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²)\s*(?:su|strength)', r'(\d+(?:\.\d+)?)\s*(ksc|kg/cm2|kg/cm²)\s*(?:su|strength)', r'(\d+(?:\.\d+)?)\s*(t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²)\s*(?:su|strength)', r'(\d+(?:\.\d+)?)\s*(psi|psf)\s*(?:su|strength)', r'(\d+(?:\.\d+)?)\s*(mpa)\s*(?:su|strength)', # Common non-SI units that need conversion r'(\d+(?:\.\d+)?)\s*ksc\b', # ksc without explicit "su" r'(\d+(?:\.\d+)?)\s*t/m²?\b', # tonnes/m² r'(\d+(?:\.\d+)?)\s*psi\b', # psi ] for pattern in patterns: match = re.search(pattern, description, re.IGNORECASE) if match: value = float(match.group(1)) unit = match.group(2).lower() if len(match.groups()) > 1 and match.group(2) else 'kpa' # CRITICAL: Alert if unit conversion is needed if unit != 'kpa': st.warning(f"🔧 UNIT CONVERSION REQUIRED: Found Su = {value} {unit.upper()}") # Convert to kPa with detailed logging converted_value = self._convert_pressure_to_kpa(value, unit) # Store original values for verification layer['original_su_value'] = value layer['original_su_unit'] = unit.upper() layer['converted_su_note'] = f"Converted from {value} {unit.upper()} to {converted_value:.1f} kPa" # Enhanced validation with context-aware warnings if converted_value < 1: st.error(f"❌ Very low Su = {converted_value:.3f} kPa after conversion. Check original value: {value} {unit}") elif converted_value > 2000: st.warning(f"⚠️ Very high Su = {converted_value:.0f} kPa after conversion from {value} {unit}. Verify this is correct.") elif 1 <= converted_value <= 1000: st.success(f"✅ Su = {converted_value:.1f} kPa (converted from {value} {unit.upper()})") else: st.info(f"📊 Su = {converted_value:.1f} kPa (converted from {value} {unit.upper()}) - unusual but accepted") return converted_value # Check for unitless Su values (assume kPa but warn) unitless_patterns = [ r'su[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[a-zA-Z])', # Su value not followed by units r'shear\s*strength[:\s]*(\d+(?:\.\d+)?)\b(?!\s*[a-zA-Z])', r'unconfined[:\s]*(\d+(?:\.\d+)?)\b(?!\s*[a-zA-Z])', ] for pattern in unitless_patterns: match = re.search(pattern, description, re.IGNORECASE) if match: value = float(match.group(1)) st.warning(f"⚠️ Found Su = {value} WITHOUT UNITS! Assuming kPa - please verify.") layer['assumed_unit_warning'] = f"Assumed {value} is in kPa (no units specified)" return value # Check for explicit Su field in layer data if 'su_value' in layer and layer['su_value'] is not None: value = float(layer['su_value']) st.info(f"📊 Using Su = {value:.1f} from field 'su_value' (assumed kPa)") return value # Check for other strength-related fields that might contain Su for field_name in ['undrained_strength', 'unconfined_strength', 'cohesion']: if field_name in layer and layer[field_name] is not None: value = float(layer[field_name]) st.info(f"📊 Using Su = {value:.1f} kPa from field '{field_name}' (assumed kPa)") return value return None def _extract_spt_value(self, layer: Dict) -> Optional[float]: """ Enhanced SPT-N value extraction for SS samples - USE RAW N VALUE ONLY, NOT N-CORRECTED Improved pattern matching for better SS layer division """ # Check direct SPT field if layer.get('strength_parameter') == 'SPT-N' and layer.get('strength_value'): return float(layer['strength_value']) # Look in description for SPT values - PRIORITIZE RAW N VALUES description = layer.get('description', '') # ENHANCED: Look for raw N value patterns with better precision raw_n_patterns = [ # High priority patterns for raw N values r'\braw[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Raw N value r'\bfield[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Field N value r'\bmeasured[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Measured N value r'\bactual[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Actual N value r'\bobserved[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Observed N value # Standard N patterns NOT followed by correction terms r'\bn[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # N value NOT corrected r'\bspt[:\s]*n[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # SPT-N NOT corrected r'\bn[-\s]?value[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # N-value NOT corrected r'\bn\s*=\s*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # N = value NOT corrected # Blow count patterns r'\bblow[s]?[:\s]*count[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', r'\bblows[:\s]*per[:\s]*foot[:\s=]*(\d+(?:\.\d+)?)', r'\bblow[s]?[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # SS sample specific patterns r'\bss[-\s]*\d*[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # SS sample with N r'\bsplit[:\s]*spoon[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Split spoon N ] # First try to find raw N values with enhanced logging for i, pattern in enumerate(raw_n_patterns): match = re.search(pattern, description, re.IGNORECASE) if match: n_value = float(match.group(1)) pattern_type = ["Raw N", "Field N", "Measured N", "Actual N", "Observed N", "Standard N", "SPT-N", "N-value", "N=", "Blow count", "Blows/ft", "Blows", "SS N", "Split spoon N"][min(i, 13)] st.success(f"✅ SS Sample: Using {pattern_type} = {n_value} from: '{match.group(0)}'") # Additional validation for SS samples if n_value > 100: st.warning(f"⚠️ Very high N value ({n_value}) detected. Please verify this is correct.") elif n_value == 0: st.warning(f"⚠️ Zero N value detected. May indicate very soft soil or measurement issue.") return n_value # Enhanced fallback patterns with warnings fallback_patterns = [ r'\bn[:\s=]*(\d+(?:\.\d+)?)', r'\bspt[:\s]*(\d+(?:\.\d+)?)', r'(\d+(?:\.\d+)?)\s*(?:blow|n)', r'penetration[:\s]*(\d+(?:\.\d+)?)', r'resistance[:\s]*(\d+(?:\.\d+)?)' ] for pattern in fallback_patterns: match = re.search(pattern, description, re.IGNORECASE) if match: n_value = float(match.group(1)) # Enhanced warnings for SS samples warning_indicators = ['corr', 'correct', 'adj', 'adjust', 'modified', 'norm'] has_correction_indicator = any(indicator in description.lower() for indicator in warning_indicators) if has_correction_indicator: st.error(f"❌ SS Sample: Found N = {n_value} but description contains correction terms. This may be corrected N, not raw N!") st.info("💡 For SS samples, use only raw field N values (not corrected). Check original field logs.") # Still return the value but flag it layer['n_value_warning'] = f"Potentially corrected N value: {n_value}" else: st.info(f"📊 SS Sample: Using N = {n_value} from: '{match.group(0)}' (fallback pattern)") return n_value # If no N value found, provide specific guidance for SS samples st.error(f"❌ SS Sample: No SPT-N value found in layer data") st.info("💡 SS samples require SPT-N values. Look for: N=X, SPT-N=X, raw N=X, field N=X, or blow count.") return None def _convert_spt_to_su(self, spt_n: float) -> float: """ Convert SPT-N to undrained shear strength (Su) using Su = 5*N correlation Enhanced for SS samples with validation """ if spt_n <= 0: st.warning(f"⚠️ Invalid N value ({spt_n}) for Su calculation. Using N=1 as minimum.") spt_n = 1.0 su_calculated = 5.0 * spt_n # Add validation and guidance for SS clay samples if su_calculated < 10: st.info(f"💡 Very low Su = {su_calculated:.0f} kPa from N={spt_n}. Indicates very soft clay.") elif su_calculated > 500: st.warning(f"⚠️ Very high Su = {su_calculated:.0f} kPa from N={spt_n}. Verify N value is raw (not corrected).") return su_calculated def _convert_spt_to_friction_angle(self, spt_n: float) -> float: """ Enhanced SPT-N to friction angle conversion for sand/silt layers in SS samples Uses improved Peck method with soil type considerations """ if spt_n <= 0: st.warning(f"⚠️ Invalid N value ({spt_n}) for friction angle calculation. Using N=1 as minimum.") spt_n = 1.0 # Enhanced Peck correlation with improvements: # φ = 27.1 + 0.3 * N - 0.00054 * N² (for fine to medium sand) # Valid for N up to 50, with adjustments for different sand types n_limited = min(spt_n, 50) # Cap at 50 for correlation validity # Base Peck correlation phi = 27.1 + 0.3 * n_limited - 0.00054 * (n_limited ** 2) # Ensure reasonable minimum phi_final = max(phi, 28) # Minimum reasonable friction angle for sand phi_final = min(phi_final, 45) # Maximum reasonable friction angle # Add validation and guidance for SS sand samples if phi_final < 30: st.info(f"💡 Low φ = {phi_final:.1f}° from N={spt_n}. Indicates loose sand or silty sand.") elif phi_final > 40: st.info(f"💡 High φ = {phi_final:.1f}° from N={spt_n}. Indicates dense, well-graded sand.") # Special handling for very low or high N values if spt_n < 4: st.warning(f"⚠️ Very low N={spt_n} for sand. May indicate loose sand or silt. Consider checking soil classification.") elif spt_n > 40: st.info(f"💡 Very high N={spt_n} for sand. Indicates very dense sand or possible gravel content.") return phi_final def _convert_pressure_to_kpa(self, value: float, unit: str) -> float: """ Enhanced pressure value conversion to kPa with comprehensive unit support """ if not unit or unit.lower() in ['', 'none', 'null']: return value # Assume already in kPa if no unit specified # Normalize unit string for better matching unit_clean = unit.lower().replace('/', '').replace(' ', '').replace('²', '2').replace('³', '3') # Remove common punctuation and extra characters unit_clean = unit_clean.replace('.', '').replace('-', '').replace('_', '') # Handle specific variations that need special processing special_cases = { # Tonne/ton variations 'tm2': 9.81, 'tonm2': 9.81, 'tonnesm2': 9.81, 'tonnem2': 9.81, # kg/cm² variations 'kgcm2': 98.0, 'kgfcm2': 98.0, # kN/m² variations 'knm2': 1.0, # Other common variations 'psig': 6.895, # psi gauge 'psia': 6.895, # psi absolute 'psfa': 0.04788, # psf absolute 'torr': 0.133322, # torr (same as mmHg) } # Check special cases first if unit_clean in special_cases: conversion_factor = special_cases[unit_clean] else: # Standard conversion using enhanced dictionary conversion_factor = self.unit_conversions.get(unit_clean, None) # If no exact match found, try intelligent partial matching if conversion_factor is None: for known_unit, factor in self.unit_conversions.items(): # Try various normalization approaches known_normalized = known_unit.replace('/', '').replace('²', '2').replace(' ', '') if known_normalized == unit_clean: conversion_factor = factor break # Check if unit contains the known unit (for compound units) if known_unit != unit_clean and known_unit in unit_clean: conversion_factor = factor break # Final fallback - assume kPa if still no match found if conversion_factor is None: st.warning(f"⚠️ Unknown pressure unit '{unit}'. Assuming kPa - please verify.") conversion_factor = 1.0 converted_value = value * conversion_factor # Enhanced logging with validation if conversion_factor != 1.0: st.success(f"🔧 Unit conversion: {value} {unit} = {converted_value:.1f} kPa (×{conversion_factor})") # Add validation warnings for unusual results if converted_value > 10000: st.warning(f"⚠️ Very high pressure result ({converted_value:.0f} kPa). Please verify unit conversion.") elif converted_value < 0.1 and value > 0: st.warning(f"⚠️ Very low pressure result ({converted_value:.3f} kPa). Please verify unit conversion.") return converted_value def _convert_to_si_units(self, layer: Dict) -> Dict: """ Convert all measurements to SI units """ # Convert depths to meters for depth_field in ['depth_from', 'depth_to']: if depth_field in layer: depth_val, depth_unit = self._extract_value_and_unit( str(layer[depth_field]), default_unit='m' ) layer[depth_field] = self._convert_length_to_meters(depth_val, depth_unit) # Convert strength values to appropriate SI units if 'strength_value' in layer and 'strength_parameter' in layer: param = layer['strength_parameter'].lower() if param == 'su': # Convert Su to kPa strength_val, strength_unit = self._extract_value_and_unit( str(layer['strength_value']), default_unit='kpa' ) layer['strength_value'] = self._convert_pressure_to_kpa(strength_val, strength_unit) layer['strength_unit'] = 'kPa' # Validate Su value against water content if available validation_result = self._validate_su_with_water_content(layer) if validation_result.get('needs_unit_check'): st.warning(f"⚠️ Su-water content validation: {validation_result['message']}") layer['unit_validation_warning'] = validation_result['message'] if validation_result['recommendations']: st.info("💡 Recommendations: " + "; ".join(validation_result['recommendations'])) elif param in ['φ', 'phi', 'friction_angle']: # Friction angle should be in degrees (already SI) layer['strength_unit'] = 'degrees' elif param == 'spt-n': # SPT-N is dimensionless layer['strength_unit'] = 'blows/30cm' return layer def _extract_value_and_unit(self, value_str: str, default_unit: str = '') -> Tuple[float, str]: """ Extract numeric value and unit from a string """ # Remove extra spaces and convert to lowercase clean_str = value_str.strip().lower() # Pattern to match number followed by optional unit pattern = r'(\d+(?:\.\d+)?)\s*([a-zA-Z/²]+)?' match = re.search(pattern, clean_str) if match: value = float(match.group(1)) unit = match.group(2) if match.group(2) else default_unit return value, unit try: return float(clean_str), default_unit except ValueError: return 0.0, default_unit def _convert_length_to_meters(self, value: float, unit: str) -> float: """ Convert length value to meters """ unit_clean = unit.lower().replace(' ', '') conversion_factor = self.unit_conversions.get(unit_clean, 1.0) return value * conversion_factor def _detect_t_m2_unit_error(self, layer: Dict) -> Dict: """ Detect if LLM failed to convert t/m² units to kPa This is the most common unit conversion error """ result = {"needs_conversion": False, "critical_error": False} # Only check layers with Su values if layer.get("strength_parameter") != "Su" or not layer.get("strength_value"): return result su = float(layer["strength_value"]) wc = layer.get("water_content", 0) description = layer.get("description", "") # Critical detection: Su values that are likely t/m² but not converted # Typical t/m² values are 1-8, typical kPa values are 10-400 for clay # Pattern 1: Su 1-8 with reasonable water content (15-50%) if 1.0 <= su <= 8.0 and 15 <= wc <= 50: converted_su = su * 9.81 result.update({ "needs_conversion": True, "critical_error": True, "original_su": su, "converted_su": converted_su, "unit_error": "t/m²", "message": f"⚠️ CRITICAL: Su={su:.2f} appears to be in t/m² units, should be {converted_su:.1f} kPa", "correction": f"{su:.2f} t/m² × 9.81 = {converted_su:.1f} kPa" }) # Pattern 2: Very low Su (<5) with low water content - could be t/m² elif su < 5.0 and wc > 0 and wc < 25: converted_su = su * 9.81 result.update({ "needs_conversion": True, "critical_error": True, "original_su": su, "converted_su": converted_su, "unit_error": "t/m²", "message": f"⚠️ POSSIBLE: Su={su:.2f} might be in t/m² units, check if should be {converted_su:.1f} kPa", "correction": f"{su:.2f} t/m² × 9.81 = {converted_su:.1f} kPa" }) # Pattern 3: Check description for t/m² mentions if any(unit in description.lower() for unit in ['t/m²', 't/m2', 'ton/m²', 'ton/m2', 'tonnes/m²']): if su < 10: # If description mentions t/m² but Su is low, likely not converted converted_su = su * 9.81 result.update({ "needs_conversion": True, "critical_error": True, "original_su": su, "converted_su": converted_su, "unit_error": "t/m² (found in description)", "message": f"⚠️ CRITICAL: Description mentions t/m² but Su={su:.2f} appears unconverted, should be {converted_su:.1f} kPa", "correction": f"{su:.2f} t/m² × 9.81 = {converted_su:.1f} kPa" }) return result def _validate_su_with_water_content(self, layer: Dict) -> Dict: """ ENHANCED Su-water content validation with comprehensive unit checking Standard correlations for clay (empirical relationships): - Very soft clay: Su < 25 kPa, w% > 40% - Soft clay: Su 25-50 kPa, w% 30-40% - Medium clay: Su 50-100 kPa, w% 20-30% - Stiff clay: Su 100-200 kPa, w% 15-25% - Very stiff clay: Su 200-400 kPa, w% 10-20% - Hard clay: Su > 400 kPa, w% < 15% Key unit conversions to check: - t/m² → kPa: ×9.81 (CRITICAL) - ksc → kPa: ×98.0 - psi → kPa: ×6.895 - MPa → kPa: ×1000 """ validation_result = { 'valid': True, 'needs_unit_check': False, 'critical_unit_error': False, 'suggested_conversion': None, 'message': '', 'recommendations': [], 'recheck_image': False } su_value = layer.get('strength_value') water_content = layer.get('water_content') soil_type = layer.get('soil_type', '') description = layer.get('description', '') # Only validate for clay layers with both Su and water content if soil_type != 'clay' or not su_value or not water_content: return validation_result try: su = float(su_value) wc = float(water_content) # STEP 1: Check for t/m² unit errors first (most common issue) t_m2_check = self._detect_t_m2_unit_error(layer) if t_m2_check.get('critical_error'): validation_result.update({ 'critical_unit_error': True, 'needs_conversion': True, 'original_value': t_m2_check['original_su'], 'suggested_value': t_m2_check['converted_su'], 'unit_error_type': t_m2_check['unit_error'], 'suggested_conversion': t_m2_check['correction'], 'message': t_m2_check['message'], 'recheck_image': True, 'reload_picture': True }) return validation_result # STEP 2: Check for other unit conversion errors unit_check_results = self._check_su_unit_conversions(su, wc, description) if unit_check_results['needs_conversion']: validation_result.update(unit_check_results) validation_result['critical_unit_error'] = True validation_result['recheck_image'] = True return validation_result # STEP 3: Detailed correlation analysis inconsistencies = [] correlation_score = self._calculate_correlation_score(su, wc) # Very specific clay consistency checks if su < 25 and wc < 30: inconsistencies.append(f"Very soft clay (Su={su:.0f}kPa) typically has w%>30%, found {wc:.1f}%") if wc < 20: validation_result['recheck_image'] = True inconsistencies.append("VERIFY: Water content seems too low for very soft clay") if su > 400 and wc > 30: inconsistencies.append(f"Hard clay (Su={su:.0f}kPa) typically has w%<20%, found {wc:.1f}%") validation_result['recheck_image'] = True inconsistencies.append("VERIFY: Water content seems too high for hard clay") # Medium-range mismatches if 50 <= su <= 200 and (wc > 45 or wc < 10): inconsistencies.append(f"Medium-stiff clay (Su={su:.0f}kPa) with unusual w%={wc:.1f}%") validation_result['recheck_image'] = True # STEP 4: Empirical correlation bounds (Terzaghi-Peck relationships) expected_su_range = self._get_expected_su_range(wc) if su < expected_su_range['min'] * 0.2 or su > expected_su_range['max'] * 5: validation_result['needs_unit_check'] = True validation_result['recheck_image'] = True inconsistencies.append(f"Su-w% correlation severely off: Expected {expected_su_range['min']:.0f}-{expected_su_range['max']:.0f}kPa for w%={wc:.1f}%, got {su:.0f}kPa") # STEP 4: Finalize results if inconsistencies: validation_result['valid'] = False validation_result['message'] = '; '.join(inconsistencies) # Enhanced recommendations if validation_result['needs_unit_check']: validation_result['recommendations'].extend([ "⚠️ CRITICAL: Check Su unit conversion carefully", "t/m² → kPa: multiply by 9.81", "ksc → kPa: multiply by 98.0", "psi → kPa: multiply by 6.895", "MPa → kPa: multiply by 1000", "🔍 Re-examine the original image/document" ]) if validation_result['recheck_image']: validation_result['recommendations'].extend([ "📷 RECHECK IMAGE: Values seem inconsistent", "🔄 Consider reloading the image", "📋 Verify both Su and water content readings" ]) else: validation_result['message'] = f"Su-water content correlation acceptable (score: {correlation_score:.1f})" except (ValueError, TypeError) as e: validation_result['valid'] = False validation_result['message'] = f"Could not validate Su-water content: {str(e)}" validation_result['recheck_image'] = True return validation_result def _check_su_unit_conversions(self, su: float, wc: float, description: str) -> Dict: """Check for specific unit conversion errors""" result = { 'needs_conversion': False, 'suggested_conversion': None, 'critical_unit_error': False, 'message': '' } # Check for t/m² that wasn't converted (very common error) if 2 <= su <= 10 and 15 <= wc <= 40: suggested_su = su * 9.81 result.update({ 'needs_conversion': True, 'suggested_conversion': f"{su} t/m² → {suggested_su:.1f} kPa (×9.81)", 'critical_unit_error': True, 'message': f"CRITICAL: Su={su:.1f} appears to be in t/m² (should be {suggested_su:.1f} kPa)" }) return result # Check for ksc that wasn't converted if 0.5 <= su <= 5 and 15 <= wc <= 50: suggested_su = su * 98.0 result.update({ 'needs_conversion': True, 'suggested_conversion': f"{su} ksc → {suggested_su:.1f} kPa (×98)", 'critical_unit_error': True, 'message': f"CRITICAL: Su={su:.1f} appears to be in ksc (should be {suggested_su:.1f} kPa)" }) return result # Check for psi that wasn't converted (high values) if 50 <= su <= 500 and 10 <= wc <= 35: suggested_su = su * 6.895 result.update({ 'needs_conversion': True, 'suggested_conversion': f"{su} psi → {suggested_su:.1f} kPa (×6.895)", 'critical_unit_error': True, 'message': f"CRITICAL: Su={su:.0f} appears to be in psi (should be {suggested_su:.1f} kPa)" }) return result # Check for MPa that wasn't converted (very low values) if 0.01 <= su <= 0.5 and 10 <= wc <= 40: suggested_su = su * 1000 result.update({ 'needs_conversion': True, 'suggested_conversion': f"{su} MPa → {suggested_su:.1f} kPa (×1000)", 'critical_unit_error': True, 'message': f"CRITICAL: Su={su:.2f} appears to be in MPa (should be {suggested_su:.1f} kPa)" }) return result return result def _get_expected_su_range(self, water_content: float) -> Dict[str, float]: """Get expected Su range based on water content (empirical correlations)""" wc = water_content # Conservative empirical relationships if wc >= 50: return {'min': 5, 'max': 20} # Very soft clay elif wc >= 40: return {'min': 10, 'max': 35} # Soft clay elif wc >= 30: return {'min': 20, 'max': 60} # Medium clay elif wc >= 20: return {'min': 40, 'max': 150} # Stiff clay elif wc >= 15: return {'min': 80, 'max': 250} # Very stiff clay else: return {'min': 150, 'max': 500} # Hard clay def _calculate_correlation_score(self, su: float, wc: float) -> float: """Calculate correlation score (0-10, higher is better)""" # Simple scoring based on typical relationships expected_range = self._get_expected_su_range(wc) if expected_range['min'] <= su <= expected_range['max']: return 10.0 # Perfect correlation elif expected_range['min'] * 0.5 <= su <= expected_range['max'] * 2: return 7.0 # Good correlation elif expected_range['min'] * 0.2 <= su <= expected_range['max'] * 5: return 4.0 # Acceptable correlation else: return 1.0 # Poor correlation def _add_engineering_parameters(self, layer: Dict) -> Dict: """ Add additional engineering parameters based on soil properties """ soil_type = layer.get('soil_type', '') # Add typical engineering properties based on soil type and strength if soil_type == 'clay': su_value = layer.get('strength_value', 0) if su_value > 0: # Estimate consistency based on Su if su_value < 25: layer['consistency'] = 'very soft' elif su_value < 50: layer['consistency'] = 'soft' elif su_value < 100: layer['consistency'] = 'medium' elif su_value < 200: layer['consistency'] = 'stiff' elif su_value < 400: layer['consistency'] = 'very stiff' else: layer['consistency'] = 'hard' # Estimate unit weight (kN/m³) layer['unit_weight'] = 16 + su_value / 50 # Empirical correlation layer['unit_weight_unit'] = 'kN/m³' elif soil_type in ['sand', 'silt']: # For sand/silt, use SPT-N or friction angle if 'original_spt' in layer: spt_n = layer['original_spt'] # Estimate relative density based on SPT-N if spt_n < 4: layer['consistency'] = 'very loose' elif spt_n < 10: layer['consistency'] = 'loose' elif spt_n < 30: layer['consistency'] = 'medium dense' elif spt_n < 50: layer['consistency'] = 'dense' else: layer['consistency'] = 'very dense' # Estimate unit weight (kN/m³) layer['unit_weight'] = 14 + spt_n / 5 # Empirical correlation layer['unit_weight_unit'] = 'kN/m³' return layer def _check_clay_consistency(self, layer: Dict) -> Dict: """ Check consistency between water content and Su for clay soils """ soil_type = layer.get('soil_type', '') if soil_type != 'clay': return layer su_value = layer.get('strength_value') water_content = self._extract_water_content(layer) if su_value and water_content: # Perform consistency check consistency_result = self._validate_clay_water_content_su_relationship( water_content, su_value ) layer['water_content'] = water_content layer['water_content_unit'] = '%' layer['clay_consistency_check'] = consistency_result # Add consistency notes if consistency_result['is_consistent']: layer['consistency_note'] = f"✅ Water content ({water_content}%) consistent with Su ({su_value} kPa)" else: layer['consistency_note'] = f"⚠️ {consistency_result['warning']}" return layer def _extract_water_content(self, layer: Dict) -> Optional[float]: """ Extract water content from layer data """ # Check if water content is directly specified if 'water_content' in layer: return float(layer['water_content']) # Look in description for water content values description = layer.get('description', '') patterns = [ r'w[:\s=]*(\d+(?:\.\d+)?)\s*%', r'water\s*content[:\s]*(\d+(?:\.\d+)?)\s*%', r'moisture\s*content[:\s]*(\d+(?:\.\d+)?)\s*%', r'wc[:\s=]*(\d+(?:\.\d+)?)\s*%', r'(\d+(?:\.\d+)?)\s*%\s*moisture', r'(\d+(?:\.\d+)?)\s*%\s*water' ] for pattern in patterns: match = re.search(pattern, description, re.IGNORECASE) if match: return float(match.group(1)) return None def _validate_clay_water_content_su_relationship(self, water_content: float, su_value: float) -> Dict: """ Validate the relationship between water content and undrained shear strength for clay Enhanced analysis for ST layer soil division based on water content and unconfined test results: - Higher water content generally corresponds to lower Su - Different clay types have different relationships - Consider stress history and plasticity effects """ # Enhanced empirical relationships for clay consistency with expanded ranges consistency_ranges = { 'very_soft': {'w_range': (40, 150), 'su_range': (0, 25), 'description': 'High plasticity, organic clays'}, 'soft': {'w_range': (25, 70), 'su_range': (25, 50), 'description': 'Normally consolidated clays'}, 'medium': {'w_range': (18, 40), 'su_range': (50, 100), 'description': 'Lightly overconsolidated clays'}, 'stiff': {'w_range': (12, 28), 'su_range': (100, 200), 'description': 'Overconsolidated clays'}, 'very_stiff': {'w_range': (8, 20), 'su_range': (200, 400), 'description': 'Heavily overconsolidated clays'}, 'hard': {'w_range': (5, 15), 'su_range': (400, 1000), 'description': 'Desiccated or cemented clays'} } # Determine expected consistency based on Su su_consistency = None for consistency, ranges in consistency_ranges.items(): if ranges['su_range'][0] <= su_value <= ranges['su_range'][1]: su_consistency = consistency break # Determine expected consistency based on water content w_consistency = None for consistency, ranges in consistency_ranges.items(): if ranges['w_range'][0] <= water_content <= ranges['w_range'][1]: w_consistency = consistency break # Check consistency result = { 'water_content': water_content, 'su_value': su_value, 'w_consistency': w_consistency, 'su_consistency': su_consistency, 'is_consistent': False, 'warning': '', 'note': '' } if su_consistency and w_consistency: if su_consistency == w_consistency: result['is_consistent'] = True result['note'] = f"Water content and Su both indicate {su_consistency.replace('_', ' ')} clay" else: result['warning'] = f"Inconsistent: Water content suggests {w_consistency.replace('_', ' ')} clay, but Su suggests {su_consistency.replace('_', ' ')} clay" elif su_consistency and not w_consistency: if water_content > 60: result['warning'] = f"Very high water content ({water_content}%) for Su = {su_value} kPa. Check if clay is highly plastic or organic." elif water_content < 10: result['warning'] = f"Very low water content ({water_content}%) for clay. Check if sample was dried or is highly over-consolidated." else: result['note'] = f"Water content outside typical ranges but Su indicates {su_consistency.replace('_', ' ')} clay" elif w_consistency and not su_consistency: result['warning'] = f"Su value ({su_value} kPa) outside typical ranges for clay with {water_content}% water content" else: result['warning'] = f"Both water content ({water_content}%) and Su ({su_value} kPa) outside typical clay ranges" # Enhanced empirical correlation checks for ST layer division if water_content and su_value: # Advanced correlation analysis for ST samples # Check for high plasticity clay indicators if water_content > 80: if su_value < 25: result['note'] = f"High plasticity clay indicated: w={water_content}%, Su={su_value} kPa. Possible CH or organic clay." elif su_value > 50: result['warning'] = f"Inconsistent: Very high water content ({water_content}%) with moderate/high Su ({su_value} kPa). Check sample integrity or clay type." # Check for low plasticity clay indicators elif water_content < 15: if su_value > 200: result['note'] = f"Low plasticity, overconsolidated clay: w={water_content}%, Su={su_value} kPa. Possible CL or aged clay." elif su_value < 100: result['warning'] = f"Low water content ({water_content}%) with low Su ({su_value} kPa). Unusual - check if sample was dried." # Check stress history indicators ocr_estimate = self._estimate_overconsolidation_ratio(water_content, su_value) if ocr_estimate > 1.5: result['note'] = result.get('note', '') + f" Estimated OCR ≈ {ocr_estimate:.1f} (overconsolidated)" elif ocr_estimate < 0.8: result['note'] = result.get('note', '') + f" Estimated OCR ≈ {ocr_estimate:.1f} (possibly underconsolidated)" # Soil division recommendations for ST samples result['st_division_recommendation'] = self._recommend_st_layer_division(water_content, su_value) return result def _estimate_overconsolidation_ratio(self, water_content: float, su_value: float) -> float: """ Estimate overconsolidation ratio (OCR) from water content and Su Based on empirical correlations for ST samples """ # Simplified correlation: OCR ≈ (Su_measured / Su_normally_consolidated) # For normally consolidated clays: Su ≈ 0.22 * σ'v # Approximate σ'v from water content using typical correlations if water_content > 50: # High water content suggests normally consolidated or slightly overconsolidated expected_su_nc = max(15, 100 - water_content) # Simplified correlation else: # Lower water content suggests overconsolidation expected_su_nc = max(50, 150 - 2 * water_content) ocr_estimate = su_value / expected_su_nc if expected_su_nc > 0 else 1.0 return max(0.5, min(ocr_estimate, 10.0)) # Reasonable bounds def _recommend_st_layer_division(self, water_content: float, su_value: float) -> Dict: """ Recommend layer division strategy for ST samples based on water content and Su results """ recommendation = { 'division_strategy': 'single_layer', 'reason': 'Uniform properties', 'subdivision_criteria': [] } # Check for significant property variations that suggest subdivision if water_content > 60 and su_value > 75: recommendation['division_strategy'] = 'check_variation' recommendation['reason'] = 'Conflicting water content and strength - check for property variations' recommendation['subdivision_criteria'].append('Water content variation > 10%') recommendation['subdivision_criteria'].append('Su variation > 30%') elif water_content < 20 and su_value < 80: recommendation['division_strategy'] = 'check_variation' recommendation['reason'] = 'Both low water content and Su - check for soil type variations' recommendation['subdivision_criteria'].append('Plasticity index variations') recommendation['subdivision_criteria'].append('Sieve analysis variations') elif abs(water_content - 30) > 20 or su_value > 300: recommendation['division_strategy'] = 'subdivide_recommended' recommendation['reason'] = 'Extreme properties suggest heterogeneous layer' recommendation['subdivision_criteria'].append('Test at multiple depths') recommendation['subdivision_criteria'].append('Check for interbedded materials') return recommendation def get_processing_summary(self, layers: List[Dict]) -> Dict[str, Any]: """ Generate a summary of the soil layer processing """ summary = { 'total_layers': len(layers), 'st_samples': 0, 'ss_samples': 0, 'clay_layers': 0, 'sand_layers': 0, 'su_calculated': 0, 'phi_calculated': 0, 'clay_consistency_checks': 0, 'consistent_clays': 0, 'inconsistent_clays': 0, 'unit_conversions': [], 'processing_notes': [] } for layer in layers: # Count sample types sample_type = layer.get('sample_type', '') if sample_type == 'ST': summary['st_samples'] += 1 elif sample_type == 'SS': summary['ss_samples'] += 1 # Count soil types soil_type = layer.get('soil_type', '') if soil_type == 'clay': summary['clay_layers'] += 1 elif soil_type in ['sand', 'silt']: summary['sand_layers'] += 1 # Count calculated parameters if 'su_source' in layer and 'Calculated' in layer['su_source']: summary['su_calculated'] += 1 if 'phi_source' in layer and 'Calculated' in layer['phi_source']: summary['phi_calculated'] += 1 # Count clay consistency checks if 'clay_consistency_check' in layer: summary['clay_consistency_checks'] += 1 consistency_result = layer['clay_consistency_check'] if consistency_result.get('is_consistent', False): summary['consistent_clays'] += 1 else: summary['inconsistent_clays'] += 1 return summary