Spaces:
Running
Running
import re | |
import numpy as np | |
import streamlit as st | |
from typing import Dict, List, Any, Tuple, Optional | |
class SoilClassificationProcessor: | |
""" | |
Advanced soil classification processor that handles SS and ST samples | |
with proper unit conversions and soil parameter calculations | |
""" | |
def __init__(self): | |
# Enhanced unit conversion factors to SI units | |
self.unit_conversions = { | |
# Pressure/Stress units to kPa | |
'psi': 6.895, | |
'psf': 0.04788, | |
'kpa': 1.0, | |
'kn/m2': 1.0, | |
'kn/m²': 1.0, | |
'knm2': 1.0, | |
'mpa': 1000.0, | |
'pa': 0.001, | |
'n/m2': 0.001, | |
'n/m²': 0.001, | |
'nm2': 0.001, | |
'ksf': 47.88, | |
'tsf': 95.76, | |
'kg/cm2': 98.0, | |
'kg/cm²': 98.0, | |
'kgcm2': 98.0, | |
'ksc': 98.0, # kilograms per square centimeter (same as kg/cm²) | |
'bar': 100.0, | |
'atm': 101.325, # atmosphere to kPa | |
'mmhg': 0.133322, # mmHg to kPa | |
'inhg': 3.386, # inHg to kPa | |
# Enhanced tonnes/tons per square meter conversions | |
't/m2': 9.81, # tonnes per square meter to kPa | |
't/m²': 9.81, # tonnes per square meter to kPa | |
'tm2': 9.81, # tm2 variant | |
'ton/m2': 9.81, # ton per square meter to kPa | |
'ton/m²': 9.81, # ton per square meter to kPa | |
'tonm2': 9.81, # tonm2 variant | |
'tonnes/m2': 9.81, # tonnes per square meter to kPa | |
'tonnes/m²': 9.81, # tonnes per square meter to kPa | |
'tonnesm2': 9.81, # tonnesm2 variant | |
'tonne/m2': 9.81, # tonne per square meter to kPa | |
'tonne/m²': 9.81, # tonne per square meter to kPa | |
'tonnem2': 9.81, # tonnem2 variant | |
# Additional international pressure units | |
'kgf/cm2': 98.0, # kilogram-force per cm² | |
'kgf/cm²': 98.0, # kilogram-force per cm² | |
'kgfcm2': 98.0, # variant without symbols | |
'lbf/in2': 6.895, # pound-force per square inch (same as psi) | |
'lbf/ft2': 0.04788, # pound-force per square foot (same as psf) | |
'lbfin2': 6.895, # variant without symbols | |
'lbfft2': 0.04788, # variant without symbols | |
# Length units to meters (enhanced) | |
'ft': 0.3048, | |
'feet': 0.3048, | |
'foot': 0.3048, | |
"'": 0.3048, # foot symbol | |
'in': 0.0254, | |
'inch': 0.0254, | |
'inches': 0.0254, | |
'"': 0.0254, # inch symbol | |
'cm': 0.01, | |
'mm': 0.001, | |
'km': 1000.0, | |
'm': 1.0, | |
'meter': 1.0, | |
'metre': 1.0, | |
'meters': 1.0, | |
'metres': 1.0, | |
'yd': 0.9144, # yard to meters | |
'yard': 0.9144, | |
'yards': 0.9144, | |
# Weight/Force units (for completeness) | |
'n': 1.0, # Newton (SI base) | |
'kn': 1000.0, # kilonewton to Newton | |
'kgf': 9.81, # kilogram-force to Newton | |
'lbf': 4.448, # pound-force to Newton | |
'lb': 4.448, # pound (assuming force context) | |
'kg': 9.81, # kilogram (assuming force context, kg*g) | |
} | |
# Soil classification criteria | |
self.sieve_200_threshold = 50.0 # % passing sieve #200 for clay classification | |
def process_soil_layers(self, layers: List[Dict]) -> List[Dict]: | |
""" | |
Process soil layers with SS/ST sample classification and parameter calculation | |
""" | |
processed_layers = [] | |
st.info("🔬 Processing soil layers with SS/ST sample classification...") | |
for i, layer in enumerate(layers): | |
processed_layer = layer.copy() | |
# Step 1: Identify sample type (SS or ST) | |
sample_type = self._identify_sample_type(layer) | |
processed_layer['sample_type'] = sample_type | |
# Step 2: Classify soil type if not already classified | |
soil_type = self._classify_soil_type(layer) | |
processed_layer['soil_type'] = soil_type | |
# Step 3: Process based on sample type | |
if sample_type == 'ST': | |
processed_layer = self._process_st_sample(processed_layer) | |
elif sample_type == 'SS': | |
processed_layer = self._process_ss_sample(processed_layer) | |
else: | |
# Default processing for unidentified samples | |
processed_layer = self._process_default_sample(processed_layer) | |
# Step 4: Ensure all units are in SI | |
processed_layer = self._convert_to_si_units(processed_layer) | |
# Step 5: Validate and add engineering parameters | |
processed_layer = self._add_engineering_parameters(processed_layer) | |
# Step 6: Check clay consistency (water content vs Su) | |
processed_layer = self._check_clay_consistency(processed_layer) | |
processed_layers.append(processed_layer) | |
# Progress feedback | |
st.write(f" ✅ Layer {i+1}: {sample_type} sample, {soil_type} - {processed_layer.get('strength_parameter', 'N/A')}") | |
st.success(f"✅ Processed {len(processed_layers)} soil layers with SS/ST classification") | |
return processed_layers | |
def _identify_sample_type(self, layer: Dict) -> str: | |
""" | |
Identify if sample is Split Spoon (SS) or Shelby Tube (ST) | |
CRITICAL: Look at FIRST COLUMN stratification symbols with ABSOLUTE HIGHEST PRIORITY | |
""" | |
description = layer.get('description', '').lower() | |
# ABSOLUTE HIGHEST PRIORITY: Check for first column stratification symbols | |
# Patterns for first column recognition: SS-18, ST-5, SS18, ST3, etc. | |
first_column_patterns = [ | |
# High precision patterns for first column symbols | |
r'^[^|]*\b(ss[-]?\d+)\b', # SS-18, SS18 at start or before pipe | |
r'^[^|]*\b(st[-]?\d+)\b', # ST-5, ST5 at start or before pipe | |
r'^\s*(ss[-]?\d+)', # SS-number at very beginning | |
r'^\s*(st[-]?\d+)', # ST-number at very beginning | |
r'\|(.*?)(ss[-]?\d+)', # After pipe separator | |
r'\|(.*?)(st[-]?\d+)', # After pipe separator | |
r'\b(ss[-]?\d+)\s*[|:]', # SS-number followed by pipe or colon | |
r'\b(st[-]?\d+)\s*[|:]', # ST-number followed by pipe or colon | |
] | |
for pattern in first_column_patterns: | |
match = re.search(pattern, description, re.IGNORECASE) | |
if match: | |
# Get the SS/ST part (could be in different groups) | |
matched_groups = [g for g in match.groups() if g and ('ss' in g.lower() or 'st' in g.lower())] | |
if matched_groups: | |
matched_text = matched_groups[0].lower().strip() | |
if matched_text.startswith('ss'): | |
st.success(f"🎯 FIRST COLUMN DETECTED: {matched_text.upper()} → SS sample (HIGHEST PRIORITY)") | |
return 'SS' | |
elif matched_text.startswith('st'): | |
st.success(f"🎯 FIRST COLUMN DETECTED: {matched_text.upper()} → ST sample (HIGHEST PRIORITY)") | |
return 'ST' | |
# FALLBACK: Check for standalone SS/ST symbols (lower priority) | |
standalone_patterns = [ | |
r'\bss\b(?!\w)', # Just SS (not part of another word) | |
r'\bst\b(?!\w)' # Just ST (not part of another word) | |
] | |
for pattern in standalone_patterns: | |
match = re.search(pattern, description, re.IGNORECASE) | |
if match: | |
matched_text = match.group(0).lower() | |
if matched_text == 'ss': | |
st.info(f"📊 Standalone symbol detected: SS → SS sample") | |
return 'SS' | |
elif matched_text == 'st': | |
st.info(f"📊 Standalone symbol detected: ST → ST sample") | |
return 'ST' | |
# SECOND: Check for keywords in description | |
# Keywords for ST samples | |
st_keywords = ['shelby', 'tube', 'undisturbed', 'ut', 'unconfined', 'uu test', 'ucs'] | |
# Keywords for SS samples | |
ss_keywords = ['split spoon', 'spt', 'standard penetration', 'disturbed', 'n-value'] | |
# Check for ST indicators | |
if any(keyword in description for keyword in st_keywords): | |
return 'ST' | |
# Check for SS indicators | |
if any(keyword in description for keyword in ss_keywords): | |
return 'SS' | |
# THIRD: Check strength parameter types | |
# Check if SPT-N value is present (indicates SS) | |
if layer.get('strength_parameter') == 'SPT-N' or 'spt' in description: | |
return 'SS' | |
# Check if Su value is present (could indicate ST) | |
if layer.get('strength_parameter') == 'Su' or 'su' in description.lower(): | |
return 'ST' | |
# FOURTH: Default assumption based on available data | |
if layer.get('strength_value') and layer.get('strength_value') > 50: | |
return 'SS' # High values typically SPT-N | |
else: | |
return 'ST' # Lower values typically Su | |
def _classify_soil_type(self, layer: Dict) -> str: | |
""" | |
Enhanced soil type classification with MANDATORY sieve analysis requirement for sand | |
CRITICAL: Sand layers MUST have sieve analysis evidence - otherwise assume clay | |
""" | |
# Check if soil type is already specified and validate it | |
existing_type = layer.get('soil_type', '').lower() | |
if existing_type and existing_type != 'unknown': | |
# If it's sand/gravel, verify sieve analysis exists | |
if existing_type in ['sand', 'silt', 'gravel']: | |
sieve_200_passing = self._extract_sieve_200_data(layer) | |
if sieve_200_passing is None: | |
st.warning(f"⚠️ '{existing_type}' classification without sieve analysis data. OVERRIDING to 'clay' per requirements.") | |
layer['classification_override'] = f"Changed from '{existing_type}' to 'clay' - no sieve analysis data" | |
return 'clay' | |
else: | |
st.success(f"✅ '{existing_type}' classification confirmed with sieve #200: {sieve_200_passing}% passing") | |
return existing_type | |
else: | |
return existing_type | |
description = layer.get('description', '').lower() | |
# CRITICAL: Check for sieve analysis data FIRST before any classification | |
sieve_200_passing = self._extract_sieve_200_data(layer) | |
if sieve_200_passing is not None: | |
# Sieve analysis data available - use it for classification | |
if sieve_200_passing > self.sieve_200_threshold: | |
classification = 'clay' # Fine-grained soil | |
st.success(f"✅ Classified as CLAY: {sieve_200_passing}% passing #200 (>50%)") | |
else: | |
classification = 'sand' # Coarse-grained soil | |
st.success(f"✅ Classified as SAND: {sieve_200_passing}% passing #200 (<50%)") | |
layer['sieve_200_passing'] = sieve_200_passing | |
layer['classification_basis'] = f"Sieve analysis: {sieve_200_passing}% passing #200" | |
return classification | |
# NO SIEVE ANALYSIS DATA - Check for explicit mentions but apply strict rules | |
potential_classifications = [] | |
if any(clay_word in description for clay_word in ['clay', 'clayey', 'ch', 'cl']): | |
potential_classifications.append('clay') | |
if any(sand_word in description for sand_word in ['sand', 'sandy', 'sp', 'sw', 'sm', 'sc']): | |
potential_classifications.append('sand') | |
if any(silt_word in description for silt_word in ['silt', 'silty', 'ml', 'mh']): | |
potential_classifications.append('silt') | |
if any(gravel_word in description for gravel_word in ['gravel', 'gp', 'gw', 'gm', 'gc']): | |
potential_classifications.append('gravel') | |
# ENFORCE MANDATORY RULE: No sand/silt/gravel without sieve analysis | |
if any(coarse_type in potential_classifications for coarse_type in ['sand', 'silt', 'gravel']): | |
st.error(f"❌ CRITICAL: Found potential {potential_classifications} classification but NO sieve analysis data!") | |
st.warning(f"🔧 ENFORCING RULE: Classifying as 'clay' - sand/silt/gravel requires sieve analysis evidence") | |
layer['classification_override'] = f"Forced clay classification - found {potential_classifications} terms but no sieve data" | |
layer['sieve_200_passing'] = None | |
layer['classification_basis'] = "Assumed clay - no sieve analysis data available (mandatory requirement)" | |
return 'clay' | |
# Default to clay if only clay terms found or no clear classification | |
if 'clay' in potential_classifications or not potential_classifications: | |
st.info(f"💡 Classified as CLAY: {potential_classifications if potential_classifications else 'No explicit soil type found'}") | |
layer['sieve_200_passing'] = None | |
layer['classification_basis'] = "Assumed clay - no sieve analysis data available" | |
return 'clay' | |
# Final fallback - should not reach here | |
st.warning(f"⚠️ Unclear classification. Defaulting to 'clay' per mandatory requirements.") | |
layer['sieve_200_passing'] = None | |
layer['classification_basis'] = "Default clay classification - unclear soil type and no sieve data" | |
return 'clay' | |
def _extract_sieve_200_data(self, layer: Dict) -> Optional[float]: | |
""" | |
Enhanced sieve #200 passing percentage extraction with comprehensive pattern recognition | |
""" | |
description = layer.get('description', '') | |
# Enhanced patterns to catch all possible sieve analysis formats | |
patterns = [ | |
# Standard #200 sieve patterns | |
r'#200[:\s]*(\d+(?:\.\d+)?)%', | |
r'sieve\s*#?200[:\s]*(\d+(?:\.\d+)?)%', | |
r'no\.?\s*200[:\s]*(\d+(?:\.\d+)?)%', | |
r'passing\s*#?200[:\s]*(\d+(?:\.\d+)?)%', | |
r'(\d+(?:\.\d+)?)%\s*passing\s*#?200', | |
# Fines content (equivalent to #200 passing) | |
r'fines[:\s]*(\d+(?:\.\d+)?)%', | |
r'fine[s]?\s*content[:\s]*(\d+(?:\.\d+)?)%', | |
r'(\d+(?:\.\d+)?)%\s*fines', | |
# 0.075mm equivalent (same as #200) | |
r'0\.075\s*mm[:\s]*(\d+(?:\.\d+)?)%\s*passing', | |
r'(\d+(?:\.\d+)?)%\s*passing\s*0\.075\s*mm', | |
r'0\.075[:\s]*(\d+(?:\.\d+)?)%', | |
# Particle size analysis patterns | |
r'particle\s*size[:\s]*(\d+(?:\.\d+)?)%\s*fines', | |
r'gradation[:\s]*(\d+(?:\.\d+)?)%\s*passing\s*#?200', | |
r'grain\s*size[:\s]*(\d+(?:\.\d+)?)%\s*fines', | |
# Sieve analysis results patterns | |
r'sieve\s*analysis[:\s].*?(\d+(?:\.\d+)?)%\s*passing\s*#?200', | |
r'sieve\s*analysis[:\s].*?#?200[:\s]*(\d+(?:\.\d+)?)%', | |
# ASTM/Standard method references | |
r'astm\s*d422[:\s].*?(\d+(?:\.\d+)?)%\s*passing\s*#?200', | |
r'astm\s*d6913[:\s].*?(\d+(?:\.\d+)?)%\s*passing\s*#?200', | |
# Alternative formats | |
r'(\d+(?:\.\d+)?)%\s*<\s*0\.075\s*mm', # Percent less than 0.075mm | |
r'minus\s*#?200[:\s]*(\d+(?:\.\d+)?)%', # Minus #200 | |
r'(\d+(?:\.\d+)?)%\s*minus\s*#?200', # Percent minus #200 | |
] | |
for pattern in patterns: | |
match = re.search(pattern, description, re.IGNORECASE) | |
if match: | |
percentage = float(match.group(1)) | |
st.success(f"✅ Found sieve #200 data: {percentage}% passing from '{match.group(0)}'") | |
# Validate percentage range | |
if 0 <= percentage <= 100: | |
return percentage | |
else: | |
st.warning(f"⚠️ Invalid percentage ({percentage}%) found. Should be 0-100%.") | |
return None | |
# Check if explicitly mentioned in layer data | |
if 'sieve_200_passing' in layer and layer['sieve_200_passing'] is not None: | |
percentage = float(layer['sieve_200_passing']) | |
st.success(f"✅ Found sieve #200 data in layer field: {percentage}% passing") | |
return percentage | |
# Check for related field names | |
for field_name in ['fines_content', 'percent_fines', 'fine_content', 'passing_200']: | |
if field_name in layer and layer[field_name] is not None: | |
percentage = float(layer[field_name]) | |
st.success(f"✅ Found sieve #200 equivalent in '{field_name}': {percentage}% passing") | |
return percentage | |
# Log that no sieve analysis was found | |
st.info(f"🔍 No sieve #200 analysis data found in layer description or fields") | |
return None | |
def _process_st_sample(self, layer: Dict) -> Dict: | |
""" | |
Process Shelby Tube (ST) sample - use unconfined compression test (Su) values | |
""" | |
layer['processing_method'] = 'ST - Unconfined Compression Test' | |
# Look for Su values in the data | |
su_value = self._extract_su_value(layer) | |
if su_value is not None: | |
layer['strength_parameter'] = 'Su' | |
layer['strength_value'] = su_value | |
layer['su_source'] = 'Unconfined Compression Test' | |
else: | |
# If no Su value found, check for SPT and convert | |
spt_value = self._extract_spt_value(layer) | |
if spt_value is not None: | |
su_calculated = self._convert_spt_to_su(spt_value) | |
layer['strength_parameter'] = 'Su' | |
layer['strength_value'] = su_calculated | |
layer['su_source'] = f'Calculated from SPT-N={spt_value} (Su=5*N)' | |
layer['original_spt'] = spt_value | |
return layer | |
def _process_ss_sample(self, layer: Dict) -> Dict: | |
""" | |
Process Split Spoon (SS) sample - ALWAYS use SPT values and convert to Su using Su=5*N | |
FOR SS SAMPLES: IGNORE any unconfined compression test Su values, ONLY use calculated Su=5*N | |
""" | |
layer['processing_method'] = 'SS - SPT Conversion (Su=5*N)' | |
# CRITICAL: For SS samples, extract the raw SPT-N value and calculate Su from it | |
spt_value = self._extract_spt_value(layer) | |
soil_type = layer.get('soil_type', 'clay') | |
if spt_value is not None: | |
if soil_type == 'clay': | |
# MANDATORY: Convert SPT to undrained shear strength using Su = 5*N | |
# IGNORE any existing Su values from unconfined compression tests | |
calculated_su = self._convert_spt_to_su(spt_value) | |
# Override any existing Su values for SS samples | |
layer['strength_parameter'] = 'Su' | |
layer['strength_value'] = calculated_su | |
layer['su_source'] = f'Calculated from raw N={spt_value} (Su=5*N) - SS Sample' | |
layer['original_spt'] = spt_value | |
# Clear any conflicting unconfined compression data for SS samples | |
if 'unconfined_su' in layer: | |
layer['unconfined_su_ignored'] = layer.pop('unconfined_su') | |
st.warning(f"⚠️ SS Sample: Ignored unconfined compression Su, using calculated Su={calculated_su:.0f} kPa from N={spt_value}") | |
st.success(f"✅ SS Sample: Su = 5 × {spt_value} = {calculated_su:.0f} kPa") | |
elif soil_type in ['sand', 'silt']: | |
# Convert SPT to friction angle for granular soils | |
phi_value = self._convert_spt_to_friction_angle(spt_value) | |
layer['strength_parameter'] = 'φ' | |
layer['strength_value'] = phi_value | |
layer['friction_angle'] = phi_value | |
layer['phi_source'] = f'Calculated from raw N={spt_value} (Peck method) - SS Sample' | |
layer['original_spt'] = spt_value | |
st.success(f"✅ SS Sample: φ = {phi_value:.1f}° from N={spt_value}") | |
else: | |
# Keep SPT value for other soil types | |
layer['strength_parameter'] = 'SPT-N' | |
layer['strength_value'] = spt_value | |
layer['original_spt'] = spt_value | |
st.info(f"📊 SS Sample: Using raw N={spt_value} for {soil_type}") | |
else: | |
st.error(f"❌ SS Sample: No SPT-N value found in layer data") | |
return layer | |
def _process_default_sample(self, layer: Dict) -> Dict: | |
""" | |
Process sample with unknown type - use available data intelligently | |
""" | |
layer['processing_method'] = 'Default - Based on available data' | |
# Try to identify and process based on existing parameters | |
existing_param = layer.get('strength_parameter', '').lower() | |
if 'su' in existing_param: | |
# Already has Su value | |
return self._process_st_sample(layer) | |
elif 'spt' in existing_param or 'n' in existing_param: | |
# Has SPT value | |
return self._process_ss_sample(layer) | |
else: | |
# Make best guess based on strength value | |
strength_val = layer.get('strength_value', 0) | |
if strength_val and strength_val > 50: | |
# Likely SPT value | |
layer['strength_parameter'] = 'SPT-N' | |
return self._process_ss_sample(layer) | |
else: | |
# Likely Su value | |
layer['strength_parameter'] = 'Su' | |
return self._process_st_sample(layer) | |
def _extract_su_value(self, layer: Dict) -> Optional[float]: | |
""" | |
Enhanced Su (undrained shear strength) extraction with MANDATORY unit conversion checking | |
CRITICAL: All Su values must be converted to kPa before processing | |
""" | |
# Check direct Su field first - but validate units | |
if layer.get('strength_parameter') == 'Su' and layer.get('strength_value') is not None: | |
su_value = float(layer['strength_value']) | |
# Check if this value needs unit conversion (warn if suspiciously low/high) | |
if su_value < 5: | |
st.warning(f"⚠️ Su value {su_value} seems low - verify it's in kPa, not MPa or other units") | |
elif su_value > 2000: | |
st.warning(f"⚠️ Su value {su_value} seems high - verify it's in kPa, not psi or other units") | |
return su_value | |
# Look in description for Su values with enhanced unit detection | |
description = layer.get('description', '') | |
# CRITICAL: Enhanced patterns with explicit unit capture for conversion | |
patterns = [ | |
# Direct Su values with units - CAPTURE UNITS EXPLICITLY | |
r'su[:\s=]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', | |
r'undrained[:\s]*shear[:\s]*strength[:\s]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', | |
r'shear\s*strength[:\s]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', | |
r'ucs[:\s]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', | |
r'unconfined[:\s]*compression[:\s]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', | |
# Equation-style patterns | |
r'su\s*=\s*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', | |
r'strength\s*=\s*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)', | |
# Embedded unit patterns | |
r'(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²)\s*(?:su|strength)', | |
r'(\d+(?:\.\d+)?)\s*(ksc|kg/cm2|kg/cm²)\s*(?:su|strength)', | |
r'(\d+(?:\.\d+)?)\s*(t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²)\s*(?:su|strength)', | |
r'(\d+(?:\.\d+)?)\s*(psi|psf)\s*(?:su|strength)', | |
r'(\d+(?:\.\d+)?)\s*(mpa)\s*(?:su|strength)', | |
# Common non-SI units that need conversion | |
r'(\d+(?:\.\d+)?)\s*ksc\b', # ksc without explicit "su" | |
r'(\d+(?:\.\d+)?)\s*t/m²?\b', # tonnes/m² | |
r'(\d+(?:\.\d+)?)\s*psi\b', # psi | |
] | |
for pattern in patterns: | |
match = re.search(pattern, description, re.IGNORECASE) | |
if match: | |
value = float(match.group(1)) | |
unit = match.group(2).lower() if len(match.groups()) > 1 and match.group(2) else 'kpa' | |
# CRITICAL: Alert if unit conversion is needed | |
if unit != 'kpa': | |
st.warning(f"🔧 UNIT CONVERSION REQUIRED: Found Su = {value} {unit.upper()}") | |
# Convert to kPa with detailed logging | |
converted_value = self._convert_pressure_to_kpa(value, unit) | |
# Store original values for verification | |
layer['original_su_value'] = value | |
layer['original_su_unit'] = unit.upper() | |
layer['converted_su_note'] = f"Converted from {value} {unit.upper()} to {converted_value:.1f} kPa" | |
# Enhanced validation with context-aware warnings | |
if converted_value < 1: | |
st.error(f"❌ Very low Su = {converted_value:.3f} kPa after conversion. Check original value: {value} {unit}") | |
elif converted_value > 2000: | |
st.warning(f"⚠️ Very high Su = {converted_value:.0f} kPa after conversion from {value} {unit}. Verify this is correct.") | |
elif 1 <= converted_value <= 1000: | |
st.success(f"✅ Su = {converted_value:.1f} kPa (converted from {value} {unit.upper()})") | |
else: | |
st.info(f"📊 Su = {converted_value:.1f} kPa (converted from {value} {unit.upper()}) - unusual but accepted") | |
return converted_value | |
# Check for unitless Su values (assume kPa but warn) | |
unitless_patterns = [ | |
r'su[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[a-zA-Z])', # Su value not followed by units | |
r'shear\s*strength[:\s]*(\d+(?:\.\d+)?)\b(?!\s*[a-zA-Z])', | |
r'unconfined[:\s]*(\d+(?:\.\d+)?)\b(?!\s*[a-zA-Z])', | |
] | |
for pattern in unitless_patterns: | |
match = re.search(pattern, description, re.IGNORECASE) | |
if match: | |
value = float(match.group(1)) | |
st.warning(f"⚠️ Found Su = {value} WITHOUT UNITS! Assuming kPa - please verify.") | |
layer['assumed_unit_warning'] = f"Assumed {value} is in kPa (no units specified)" | |
return value | |
# Check for explicit Su field in layer data | |
if 'su_value' in layer and layer['su_value'] is not None: | |
value = float(layer['su_value']) | |
st.info(f"📊 Using Su = {value:.1f} from field 'su_value' (assumed kPa)") | |
return value | |
# Check for other strength-related fields that might contain Su | |
for field_name in ['undrained_strength', 'unconfined_strength', 'cohesion']: | |
if field_name in layer and layer[field_name] is not None: | |
value = float(layer[field_name]) | |
st.info(f"📊 Using Su = {value:.1f} kPa from field '{field_name}' (assumed kPa)") | |
return value | |
return None | |
def _extract_spt_value(self, layer: Dict) -> Optional[float]: | |
""" | |
Enhanced SPT-N value extraction for SS samples - USE RAW N VALUE ONLY, NOT N-CORRECTED | |
Improved pattern matching for better SS layer division | |
""" | |
# Check direct SPT field | |
if layer.get('strength_parameter') == 'SPT-N' and layer.get('strength_value'): | |
return float(layer['strength_value']) | |
# Look in description for SPT values - PRIORITIZE RAW N VALUES | |
description = layer.get('description', '') | |
# ENHANCED: Look for raw N value patterns with better precision | |
raw_n_patterns = [ | |
# High priority patterns for raw N values | |
r'\braw[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Raw N value | |
r'\bfield[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Field N value | |
r'\bmeasured[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Measured N value | |
r'\bactual[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Actual N value | |
r'\bobserved[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Observed N value | |
# Standard N patterns NOT followed by correction terms | |
r'\bn[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # N value NOT corrected | |
r'\bspt[:\s]*n[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # SPT-N NOT corrected | |
r'\bn[-\s]?value[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # N-value NOT corrected | |
r'\bn\s*=\s*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # N = value NOT corrected | |
# Blow count patterns | |
r'\bblow[s]?[:\s]*count[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', | |
r'\bblows[:\s]*per[:\s]*foot[:\s=]*(\d+(?:\.\d+)?)', | |
r'\bblow[s]?[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', | |
# SS sample specific patterns | |
r'\bss[-\s]*\d*[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # SS sample with N | |
r'\bsplit[:\s]*spoon[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Split spoon N | |
] | |
# First try to find raw N values with enhanced logging | |
for i, pattern in enumerate(raw_n_patterns): | |
match = re.search(pattern, description, re.IGNORECASE) | |
if match: | |
n_value = float(match.group(1)) | |
pattern_type = ["Raw N", "Field N", "Measured N", "Actual N", "Observed N", | |
"Standard N", "SPT-N", "N-value", "N=", "Blow count", | |
"Blows/ft", "Blows", "SS N", "Split spoon N"][min(i, 13)] | |
st.success(f"✅ SS Sample: Using {pattern_type} = {n_value} from: '{match.group(0)}'") | |
# Additional validation for SS samples | |
if n_value > 100: | |
st.warning(f"⚠️ Very high N value ({n_value}) detected. Please verify this is correct.") | |
elif n_value == 0: | |
st.warning(f"⚠️ Zero N value detected. May indicate very soft soil or measurement issue.") | |
return n_value | |
# Enhanced fallback patterns with warnings | |
fallback_patterns = [ | |
r'\bn[:\s=]*(\d+(?:\.\d+)?)', | |
r'\bspt[:\s]*(\d+(?:\.\d+)?)', | |
r'(\d+(?:\.\d+)?)\s*(?:blow|n)', | |
r'penetration[:\s]*(\d+(?:\.\d+)?)', | |
r'resistance[:\s]*(\d+(?:\.\d+)?)' | |
] | |
for pattern in fallback_patterns: | |
match = re.search(pattern, description, re.IGNORECASE) | |
if match: | |
n_value = float(match.group(1)) | |
# Enhanced warnings for SS samples | |
warning_indicators = ['corr', 'correct', 'adj', 'adjust', 'modified', 'norm'] | |
has_correction_indicator = any(indicator in description.lower() for indicator in warning_indicators) | |
if has_correction_indicator: | |
st.error(f"❌ SS Sample: Found N = {n_value} but description contains correction terms. This may be corrected N, not raw N!") | |
st.info("💡 For SS samples, use only raw field N values (not corrected). Check original field logs.") | |
# Still return the value but flag it | |
layer['n_value_warning'] = f"Potentially corrected N value: {n_value}" | |
else: | |
st.info(f"📊 SS Sample: Using N = {n_value} from: '{match.group(0)}' (fallback pattern)") | |
return n_value | |
# If no N value found, provide specific guidance for SS samples | |
st.error(f"❌ SS Sample: No SPT-N value found in layer data") | |
st.info("💡 SS samples require SPT-N values. Look for: N=X, SPT-N=X, raw N=X, field N=X, or blow count.") | |
return None | |
def _convert_spt_to_su(self, spt_n: float) -> float: | |
""" | |
Convert SPT-N to undrained shear strength (Su) using Su = 5*N correlation | |
Enhanced for SS samples with validation | |
""" | |
if spt_n <= 0: | |
st.warning(f"⚠️ Invalid N value ({spt_n}) for Su calculation. Using N=1 as minimum.") | |
spt_n = 1.0 | |
su_calculated = 5.0 * spt_n | |
# Add validation and guidance for SS clay samples | |
if su_calculated < 10: | |
st.info(f"💡 Very low Su = {su_calculated:.0f} kPa from N={spt_n}. Indicates very soft clay.") | |
elif su_calculated > 500: | |
st.warning(f"⚠️ Very high Su = {su_calculated:.0f} kPa from N={spt_n}. Verify N value is raw (not corrected).") | |
return su_calculated | |
def _convert_spt_to_friction_angle(self, spt_n: float) -> float: | |
""" | |
Enhanced SPT-N to friction angle conversion for sand/silt layers in SS samples | |
Uses improved Peck method with soil type considerations | |
""" | |
if spt_n <= 0: | |
st.warning(f"⚠️ Invalid N value ({spt_n}) for friction angle calculation. Using N=1 as minimum.") | |
spt_n = 1.0 | |
# Enhanced Peck correlation with improvements: | |
# φ = 27.1 + 0.3 * N - 0.00054 * N² (for fine to medium sand) | |
# Valid for N up to 50, with adjustments for different sand types | |
n_limited = min(spt_n, 50) # Cap at 50 for correlation validity | |
# Base Peck correlation | |
phi = 27.1 + 0.3 * n_limited - 0.00054 * (n_limited ** 2) | |
# Ensure reasonable minimum | |
phi_final = max(phi, 28) # Minimum reasonable friction angle for sand | |
phi_final = min(phi_final, 45) # Maximum reasonable friction angle | |
# Add validation and guidance for SS sand samples | |
if phi_final < 30: | |
st.info(f"💡 Low φ = {phi_final:.1f}° from N={spt_n}. Indicates loose sand or silty sand.") | |
elif phi_final > 40: | |
st.info(f"💡 High φ = {phi_final:.1f}° from N={spt_n}. Indicates dense, well-graded sand.") | |
# Special handling for very low or high N values | |
if spt_n < 4: | |
st.warning(f"⚠️ Very low N={spt_n} for sand. May indicate loose sand or silt. Consider checking soil classification.") | |
elif spt_n > 40: | |
st.info(f"💡 Very high N={spt_n} for sand. Indicates very dense sand or possible gravel content.") | |
return phi_final | |
def _convert_pressure_to_kpa(self, value: float, unit: str) -> float: | |
""" | |
Enhanced pressure value conversion to kPa with comprehensive unit support | |
""" | |
if not unit or unit.lower() in ['', 'none', 'null']: | |
return value # Assume already in kPa if no unit specified | |
# Normalize unit string for better matching | |
unit_clean = unit.lower().replace('/', '').replace(' ', '').replace('²', '2').replace('³', '3') | |
# Remove common punctuation and extra characters | |
unit_clean = unit_clean.replace('.', '').replace('-', '').replace('_', '') | |
# Handle specific variations that need special processing | |
special_cases = { | |
# Tonne/ton variations | |
'tm2': 9.81, 'tonm2': 9.81, 'tonnesm2': 9.81, 'tonnem2': 9.81, | |
# kg/cm² variations | |
'kgcm2': 98.0, 'kgfcm2': 98.0, | |
# kN/m² variations | |
'knm2': 1.0, | |
# Other common variations | |
'psig': 6.895, # psi gauge | |
'psia': 6.895, # psi absolute | |
'psfa': 0.04788, # psf absolute | |
'torr': 0.133322, # torr (same as mmHg) | |
} | |
# Check special cases first | |
if unit_clean in special_cases: | |
conversion_factor = special_cases[unit_clean] | |
else: | |
# Standard conversion using enhanced dictionary | |
conversion_factor = self.unit_conversions.get(unit_clean, None) | |
# If no exact match found, try intelligent partial matching | |
if conversion_factor is None: | |
for known_unit, factor in self.unit_conversions.items(): | |
# Try various normalization approaches | |
known_normalized = known_unit.replace('/', '').replace('²', '2').replace(' ', '') | |
if known_normalized == unit_clean: | |
conversion_factor = factor | |
break | |
# Check if unit contains the known unit (for compound units) | |
if known_unit != unit_clean and known_unit in unit_clean: | |
conversion_factor = factor | |
break | |
# Final fallback - assume kPa if still no match found | |
if conversion_factor is None: | |
st.warning(f"⚠️ Unknown pressure unit '{unit}'. Assuming kPa - please verify.") | |
conversion_factor = 1.0 | |
converted_value = value * conversion_factor | |
# Enhanced logging with validation | |
if conversion_factor != 1.0: | |
st.success(f"🔧 Unit conversion: {value} {unit} = {converted_value:.1f} kPa (×{conversion_factor})") | |
# Add validation warnings for unusual results | |
if converted_value > 10000: | |
st.warning(f"⚠️ Very high pressure result ({converted_value:.0f} kPa). Please verify unit conversion.") | |
elif converted_value < 0.1 and value > 0: | |
st.warning(f"⚠️ Very low pressure result ({converted_value:.3f} kPa). Please verify unit conversion.") | |
return converted_value | |
def _convert_to_si_units(self, layer: Dict) -> Dict: | |
""" | |
Convert all measurements to SI units | |
""" | |
# Convert depths to meters | |
for depth_field in ['depth_from', 'depth_to']: | |
if depth_field in layer: | |
depth_val, depth_unit = self._extract_value_and_unit( | |
str(layer[depth_field]), default_unit='m' | |
) | |
layer[depth_field] = self._convert_length_to_meters(depth_val, depth_unit) | |
# Convert strength values to appropriate SI units | |
if 'strength_value' in layer and 'strength_parameter' in layer: | |
param = layer['strength_parameter'].lower() | |
if param == 'su': | |
# Convert Su to kPa | |
strength_val, strength_unit = self._extract_value_and_unit( | |
str(layer['strength_value']), default_unit='kpa' | |
) | |
layer['strength_value'] = self._convert_pressure_to_kpa(strength_val, strength_unit) | |
layer['strength_unit'] = 'kPa' | |
# Validate Su value against water content if available | |
validation_result = self._validate_su_with_water_content(layer) | |
if validation_result.get('needs_unit_check'): | |
st.warning(f"⚠️ Su-water content validation: {validation_result['message']}") | |
layer['unit_validation_warning'] = validation_result['message'] | |
if validation_result['recommendations']: | |
st.info("💡 Recommendations: " + "; ".join(validation_result['recommendations'])) | |
elif param in ['φ', 'phi', 'friction_angle']: | |
# Friction angle should be in degrees (already SI) | |
layer['strength_unit'] = 'degrees' | |
elif param == 'spt-n': | |
# SPT-N is dimensionless | |
layer['strength_unit'] = 'blows/30cm' | |
return layer | |
def _extract_value_and_unit(self, value_str: str, default_unit: str = '') -> Tuple[float, str]: | |
""" | |
Extract numeric value and unit from a string | |
""" | |
# Remove extra spaces and convert to lowercase | |
clean_str = value_str.strip().lower() | |
# Pattern to match number followed by optional unit | |
pattern = r'(\d+(?:\.\d+)?)\s*([a-zA-Z/²]+)?' | |
match = re.search(pattern, clean_str) | |
if match: | |
value = float(match.group(1)) | |
unit = match.group(2) if match.group(2) else default_unit | |
return value, unit | |
try: | |
return float(clean_str), default_unit | |
except ValueError: | |
return 0.0, default_unit | |
def _convert_length_to_meters(self, value: float, unit: str) -> float: | |
""" | |
Convert length value to meters | |
""" | |
unit_clean = unit.lower().replace(' ', '') | |
conversion_factor = self.unit_conversions.get(unit_clean, 1.0) | |
return value * conversion_factor | |
def _detect_t_m2_unit_error(self, layer: Dict) -> Dict: | |
""" | |
Detect if LLM failed to convert t/m² units to kPa | |
This is the most common unit conversion error | |
""" | |
result = {"needs_conversion": False, "critical_error": False} | |
# Only check layers with Su values | |
if layer.get("strength_parameter") != "Su" or not layer.get("strength_value"): | |
return result | |
su = float(layer["strength_value"]) | |
wc = layer.get("water_content", 0) | |
description = layer.get("description", "") | |
# Critical detection: Su values that are likely t/m² but not converted | |
# Typical t/m² values are 1-8, typical kPa values are 10-400 for clay | |
# Pattern 1: Su 1-8 with reasonable water content (15-50%) | |
if 1.0 <= su <= 8.0 and 15 <= wc <= 50: | |
converted_su = su * 9.81 | |
result.update({ | |
"needs_conversion": True, | |
"critical_error": True, | |
"original_su": su, | |
"converted_su": converted_su, | |
"unit_error": "t/m²", | |
"message": f"⚠️ CRITICAL: Su={su:.2f} appears to be in t/m² units, should be {converted_su:.1f} kPa", | |
"correction": f"{su:.2f} t/m² × 9.81 = {converted_su:.1f} kPa" | |
}) | |
# Pattern 2: Very low Su (<5) with low water content - could be t/m² | |
elif su < 5.0 and wc > 0 and wc < 25: | |
converted_su = su * 9.81 | |
result.update({ | |
"needs_conversion": True, | |
"critical_error": True, | |
"original_su": su, | |
"converted_su": converted_su, | |
"unit_error": "t/m²", | |
"message": f"⚠️ POSSIBLE: Su={su:.2f} might be in t/m² units, check if should be {converted_su:.1f} kPa", | |
"correction": f"{su:.2f} t/m² × 9.81 = {converted_su:.1f} kPa" | |
}) | |
# Pattern 3: Check description for t/m² mentions | |
if any(unit in description.lower() for unit in ['t/m²', 't/m2', 'ton/m²', 'ton/m2', 'tonnes/m²']): | |
if su < 10: # If description mentions t/m² but Su is low, likely not converted | |
converted_su = su * 9.81 | |
result.update({ | |
"needs_conversion": True, | |
"critical_error": True, | |
"original_su": su, | |
"converted_su": converted_su, | |
"unit_error": "t/m² (found in description)", | |
"message": f"⚠️ CRITICAL: Description mentions t/m² but Su={su:.2f} appears unconverted, should be {converted_su:.1f} kPa", | |
"correction": f"{su:.2f} t/m² × 9.81 = {converted_su:.1f} kPa" | |
}) | |
return result | |
def _validate_su_with_water_content(self, layer: Dict) -> Dict: | |
""" | |
ENHANCED Su-water content validation with comprehensive unit checking | |
Standard correlations for clay (empirical relationships): | |
- Very soft clay: Su < 25 kPa, w% > 40% | |
- Soft clay: Su 25-50 kPa, w% 30-40% | |
- Medium clay: Su 50-100 kPa, w% 20-30% | |
- Stiff clay: Su 100-200 kPa, w% 15-25% | |
- Very stiff clay: Su 200-400 kPa, w% 10-20% | |
- Hard clay: Su > 400 kPa, w% < 15% | |
Key unit conversions to check: | |
- t/m² → kPa: ×9.81 (CRITICAL) | |
- ksc → kPa: ×98.0 | |
- psi → kPa: ×6.895 | |
- MPa → kPa: ×1000 | |
""" | |
validation_result = { | |
'valid': True, | |
'needs_unit_check': False, | |
'critical_unit_error': False, | |
'suggested_conversion': None, | |
'message': '', | |
'recommendations': [], | |
'recheck_image': False | |
} | |
su_value = layer.get('strength_value') | |
water_content = layer.get('water_content') | |
soil_type = layer.get('soil_type', '') | |
description = layer.get('description', '') | |
# Only validate for clay layers with both Su and water content | |
if soil_type != 'clay' or not su_value or not water_content: | |
return validation_result | |
try: | |
su = float(su_value) | |
wc = float(water_content) | |
# STEP 1: Check for t/m² unit errors first (most common issue) | |
t_m2_check = self._detect_t_m2_unit_error(layer) | |
if t_m2_check.get('critical_error'): | |
validation_result.update({ | |
'critical_unit_error': True, | |
'needs_conversion': True, | |
'original_value': t_m2_check['original_su'], | |
'suggested_value': t_m2_check['converted_su'], | |
'unit_error_type': t_m2_check['unit_error'], | |
'suggested_conversion': t_m2_check['correction'], | |
'message': t_m2_check['message'], | |
'recheck_image': True, | |
'reload_picture': True | |
}) | |
return validation_result | |
# STEP 2: Check for other unit conversion errors | |
unit_check_results = self._check_su_unit_conversions(su, wc, description) | |
if unit_check_results['needs_conversion']: | |
validation_result.update(unit_check_results) | |
validation_result['critical_unit_error'] = True | |
validation_result['recheck_image'] = True | |
return validation_result | |
# STEP 3: Detailed correlation analysis | |
inconsistencies = [] | |
correlation_score = self._calculate_correlation_score(su, wc) | |
# Very specific clay consistency checks | |
if su < 25 and wc < 30: | |
inconsistencies.append(f"Very soft clay (Su={su:.0f}kPa) typically has w%>30%, found {wc:.1f}%") | |
if wc < 20: | |
validation_result['recheck_image'] = True | |
inconsistencies.append("VERIFY: Water content seems too low for very soft clay") | |
if su > 400 and wc > 30: | |
inconsistencies.append(f"Hard clay (Su={su:.0f}kPa) typically has w%<20%, found {wc:.1f}%") | |
validation_result['recheck_image'] = True | |
inconsistencies.append("VERIFY: Water content seems too high for hard clay") | |
# Medium-range mismatches | |
if 50 <= su <= 200 and (wc > 45 or wc < 10): | |
inconsistencies.append(f"Medium-stiff clay (Su={su:.0f}kPa) with unusual w%={wc:.1f}%") | |
validation_result['recheck_image'] = True | |
# STEP 4: Empirical correlation bounds (Terzaghi-Peck relationships) | |
expected_su_range = self._get_expected_su_range(wc) | |
if su < expected_su_range['min'] * 0.2 or su > expected_su_range['max'] * 5: | |
validation_result['needs_unit_check'] = True | |
validation_result['recheck_image'] = True | |
inconsistencies.append(f"Su-w% correlation severely off: Expected {expected_su_range['min']:.0f}-{expected_su_range['max']:.0f}kPa for w%={wc:.1f}%, got {su:.0f}kPa") | |
# STEP 4: Finalize results | |
if inconsistencies: | |
validation_result['valid'] = False | |
validation_result['message'] = '; '.join(inconsistencies) | |
# Enhanced recommendations | |
if validation_result['needs_unit_check']: | |
validation_result['recommendations'].extend([ | |
"⚠️ CRITICAL: Check Su unit conversion carefully", | |
"t/m² → kPa: multiply by 9.81", | |
"ksc → kPa: multiply by 98.0", | |
"psi → kPa: multiply by 6.895", | |
"MPa → kPa: multiply by 1000", | |
"🔍 Re-examine the original image/document" | |
]) | |
if validation_result['recheck_image']: | |
validation_result['recommendations'].extend([ | |
"📷 RECHECK IMAGE: Values seem inconsistent", | |
"🔄 Consider reloading the image", | |
"📋 Verify both Su and water content readings" | |
]) | |
else: | |
validation_result['message'] = f"Su-water content correlation acceptable (score: {correlation_score:.1f})" | |
except (ValueError, TypeError) as e: | |
validation_result['valid'] = False | |
validation_result['message'] = f"Could not validate Su-water content: {str(e)}" | |
validation_result['recheck_image'] = True | |
return validation_result | |
def _check_su_unit_conversions(self, su: float, wc: float, description: str) -> Dict: | |
"""Check for specific unit conversion errors""" | |
result = { | |
'needs_conversion': False, | |
'suggested_conversion': None, | |
'critical_unit_error': False, | |
'message': '' | |
} | |
# Check for t/m² that wasn't converted (very common error) | |
if 2 <= su <= 10 and 15 <= wc <= 40: | |
suggested_su = su * 9.81 | |
result.update({ | |
'needs_conversion': True, | |
'suggested_conversion': f"{su} t/m² → {suggested_su:.1f} kPa (×9.81)", | |
'critical_unit_error': True, | |
'message': f"CRITICAL: Su={su:.1f} appears to be in t/m² (should be {suggested_su:.1f} kPa)" | |
}) | |
return result | |
# Check for ksc that wasn't converted | |
if 0.5 <= su <= 5 and 15 <= wc <= 50: | |
suggested_su = su * 98.0 | |
result.update({ | |
'needs_conversion': True, | |
'suggested_conversion': f"{su} ksc → {suggested_su:.1f} kPa (×98)", | |
'critical_unit_error': True, | |
'message': f"CRITICAL: Su={su:.1f} appears to be in ksc (should be {suggested_su:.1f} kPa)" | |
}) | |
return result | |
# Check for psi that wasn't converted (high values) | |
if 50 <= su <= 500 and 10 <= wc <= 35: | |
suggested_su = su * 6.895 | |
result.update({ | |
'needs_conversion': True, | |
'suggested_conversion': f"{su} psi → {suggested_su:.1f} kPa (×6.895)", | |
'critical_unit_error': True, | |
'message': f"CRITICAL: Su={su:.0f} appears to be in psi (should be {suggested_su:.1f} kPa)" | |
}) | |
return result | |
# Check for MPa that wasn't converted (very low values) | |
if 0.01 <= su <= 0.5 and 10 <= wc <= 40: | |
suggested_su = su * 1000 | |
result.update({ | |
'needs_conversion': True, | |
'suggested_conversion': f"{su} MPa → {suggested_su:.1f} kPa (×1000)", | |
'critical_unit_error': True, | |
'message': f"CRITICAL: Su={su:.2f} appears to be in MPa (should be {suggested_su:.1f} kPa)" | |
}) | |
return result | |
return result | |
def _get_expected_su_range(self, water_content: float) -> Dict[str, float]: | |
"""Get expected Su range based on water content (empirical correlations)""" | |
wc = water_content | |
# Conservative empirical relationships | |
if wc >= 50: | |
return {'min': 5, 'max': 20} # Very soft clay | |
elif wc >= 40: | |
return {'min': 10, 'max': 35} # Soft clay | |
elif wc >= 30: | |
return {'min': 20, 'max': 60} # Medium clay | |
elif wc >= 20: | |
return {'min': 40, 'max': 150} # Stiff clay | |
elif wc >= 15: | |
return {'min': 80, 'max': 250} # Very stiff clay | |
else: | |
return {'min': 150, 'max': 500} # Hard clay | |
def _calculate_correlation_score(self, su: float, wc: float) -> float: | |
"""Calculate correlation score (0-10, higher is better)""" | |
# Simple scoring based on typical relationships | |
expected_range = self._get_expected_su_range(wc) | |
if expected_range['min'] <= su <= expected_range['max']: | |
return 10.0 # Perfect correlation | |
elif expected_range['min'] * 0.5 <= su <= expected_range['max'] * 2: | |
return 7.0 # Good correlation | |
elif expected_range['min'] * 0.2 <= su <= expected_range['max'] * 5: | |
return 4.0 # Acceptable correlation | |
else: | |
return 1.0 # Poor correlation | |
def _add_engineering_parameters(self, layer: Dict) -> Dict: | |
""" | |
Add additional engineering parameters based on soil properties | |
""" | |
soil_type = layer.get('soil_type', '') | |
# Add typical engineering properties based on soil type and strength | |
if soil_type == 'clay': | |
su_value = layer.get('strength_value', 0) | |
if su_value > 0: | |
# Estimate consistency based on Su | |
if su_value < 25: | |
layer['consistency'] = 'very soft' | |
elif su_value < 50: | |
layer['consistency'] = 'soft' | |
elif su_value < 100: | |
layer['consistency'] = 'medium' | |
elif su_value < 200: | |
layer['consistency'] = 'stiff' | |
elif su_value < 400: | |
layer['consistency'] = 'very stiff' | |
else: | |
layer['consistency'] = 'hard' | |
# Estimate unit weight (kN/m³) | |
layer['unit_weight'] = 16 + su_value / 50 # Empirical correlation | |
layer['unit_weight_unit'] = 'kN/m³' | |
elif soil_type in ['sand', 'silt']: | |
# For sand/silt, use SPT-N or friction angle | |
if 'original_spt' in layer: | |
spt_n = layer['original_spt'] | |
# Estimate relative density based on SPT-N | |
if spt_n < 4: | |
layer['consistency'] = 'very loose' | |
elif spt_n < 10: | |
layer['consistency'] = 'loose' | |
elif spt_n < 30: | |
layer['consistency'] = 'medium dense' | |
elif spt_n < 50: | |
layer['consistency'] = 'dense' | |
else: | |
layer['consistency'] = 'very dense' | |
# Estimate unit weight (kN/m³) | |
layer['unit_weight'] = 14 + spt_n / 5 # Empirical correlation | |
layer['unit_weight_unit'] = 'kN/m³' | |
return layer | |
def _check_clay_consistency(self, layer: Dict) -> Dict: | |
""" | |
Check consistency between water content and Su for clay soils | |
""" | |
soil_type = layer.get('soil_type', '') | |
if soil_type != 'clay': | |
return layer | |
su_value = layer.get('strength_value') | |
water_content = self._extract_water_content(layer) | |
if su_value and water_content: | |
# Perform consistency check | |
consistency_result = self._validate_clay_water_content_su_relationship( | |
water_content, su_value | |
) | |
layer['water_content'] = water_content | |
layer['water_content_unit'] = '%' | |
layer['clay_consistency_check'] = consistency_result | |
# Add consistency notes | |
if consistency_result['is_consistent']: | |
layer['consistency_note'] = f"✅ Water content ({water_content}%) consistent with Su ({su_value} kPa)" | |
else: | |
layer['consistency_note'] = f"⚠️ {consistency_result['warning']}" | |
return layer | |
def _extract_water_content(self, layer: Dict) -> Optional[float]: | |
""" | |
Extract water content from layer data | |
""" | |
# Check if water content is directly specified | |
if 'water_content' in layer: | |
return float(layer['water_content']) | |
# Look in description for water content values | |
description = layer.get('description', '') | |
patterns = [ | |
r'w[:\s=]*(\d+(?:\.\d+)?)\s*%', | |
r'water\s*content[:\s]*(\d+(?:\.\d+)?)\s*%', | |
r'moisture\s*content[:\s]*(\d+(?:\.\d+)?)\s*%', | |
r'wc[:\s=]*(\d+(?:\.\d+)?)\s*%', | |
r'(\d+(?:\.\d+)?)\s*%\s*moisture', | |
r'(\d+(?:\.\d+)?)\s*%\s*water' | |
] | |
for pattern in patterns: | |
match = re.search(pattern, description, re.IGNORECASE) | |
if match: | |
return float(match.group(1)) | |
return None | |
def _validate_clay_water_content_su_relationship(self, water_content: float, su_value: float) -> Dict: | |
""" | |
Validate the relationship between water content and undrained shear strength for clay | |
Enhanced analysis for ST layer soil division based on water content and unconfined test results: | |
- Higher water content generally corresponds to lower Su | |
- Different clay types have different relationships | |
- Consider stress history and plasticity effects | |
""" | |
# Enhanced empirical relationships for clay consistency with expanded ranges | |
consistency_ranges = { | |
'very_soft': {'w_range': (40, 150), 'su_range': (0, 25), 'description': 'High plasticity, organic clays'}, | |
'soft': {'w_range': (25, 70), 'su_range': (25, 50), 'description': 'Normally consolidated clays'}, | |
'medium': {'w_range': (18, 40), 'su_range': (50, 100), 'description': 'Lightly overconsolidated clays'}, | |
'stiff': {'w_range': (12, 28), 'su_range': (100, 200), 'description': 'Overconsolidated clays'}, | |
'very_stiff': {'w_range': (8, 20), 'su_range': (200, 400), 'description': 'Heavily overconsolidated clays'}, | |
'hard': {'w_range': (5, 15), 'su_range': (400, 1000), 'description': 'Desiccated or cemented clays'} | |
} | |
# Determine expected consistency based on Su | |
su_consistency = None | |
for consistency, ranges in consistency_ranges.items(): | |
if ranges['su_range'][0] <= su_value <= ranges['su_range'][1]: | |
su_consistency = consistency | |
break | |
# Determine expected consistency based on water content | |
w_consistency = None | |
for consistency, ranges in consistency_ranges.items(): | |
if ranges['w_range'][0] <= water_content <= ranges['w_range'][1]: | |
w_consistency = consistency | |
break | |
# Check consistency | |
result = { | |
'water_content': water_content, | |
'su_value': su_value, | |
'w_consistency': w_consistency, | |
'su_consistency': su_consistency, | |
'is_consistent': False, | |
'warning': '', | |
'note': '' | |
} | |
if su_consistency and w_consistency: | |
if su_consistency == w_consistency: | |
result['is_consistent'] = True | |
result['note'] = f"Water content and Su both indicate {su_consistency.replace('_', ' ')} clay" | |
else: | |
result['warning'] = f"Inconsistent: Water content suggests {w_consistency.replace('_', ' ')} clay, but Su suggests {su_consistency.replace('_', ' ')} clay" | |
elif su_consistency and not w_consistency: | |
if water_content > 60: | |
result['warning'] = f"Very high water content ({water_content}%) for Su = {su_value} kPa. Check if clay is highly plastic or organic." | |
elif water_content < 10: | |
result['warning'] = f"Very low water content ({water_content}%) for clay. Check if sample was dried or is highly over-consolidated." | |
else: | |
result['note'] = f"Water content outside typical ranges but Su indicates {su_consistency.replace('_', ' ')} clay" | |
elif w_consistency and not su_consistency: | |
result['warning'] = f"Su value ({su_value} kPa) outside typical ranges for clay with {water_content}% water content" | |
else: | |
result['warning'] = f"Both water content ({water_content}%) and Su ({su_value} kPa) outside typical clay ranges" | |
# Enhanced empirical correlation checks for ST layer division | |
if water_content and su_value: | |
# Advanced correlation analysis for ST samples | |
# Check for high plasticity clay indicators | |
if water_content > 80: | |
if su_value < 25: | |
result['note'] = f"High plasticity clay indicated: w={water_content}%, Su={su_value} kPa. Possible CH or organic clay." | |
elif su_value > 50: | |
result['warning'] = f"Inconsistent: Very high water content ({water_content}%) with moderate/high Su ({su_value} kPa). Check sample integrity or clay type." | |
# Check for low plasticity clay indicators | |
elif water_content < 15: | |
if su_value > 200: | |
result['note'] = f"Low plasticity, overconsolidated clay: w={water_content}%, Su={su_value} kPa. Possible CL or aged clay." | |
elif su_value < 100: | |
result['warning'] = f"Low water content ({water_content}%) with low Su ({su_value} kPa). Unusual - check if sample was dried." | |
# Check stress history indicators | |
ocr_estimate = self._estimate_overconsolidation_ratio(water_content, su_value) | |
if ocr_estimate > 1.5: | |
result['note'] = result.get('note', '') + f" Estimated OCR ≈ {ocr_estimate:.1f} (overconsolidated)" | |
elif ocr_estimate < 0.8: | |
result['note'] = result.get('note', '') + f" Estimated OCR ≈ {ocr_estimate:.1f} (possibly underconsolidated)" | |
# Soil division recommendations for ST samples | |
result['st_division_recommendation'] = self._recommend_st_layer_division(water_content, su_value) | |
return result | |
def _estimate_overconsolidation_ratio(self, water_content: float, su_value: float) -> float: | |
""" | |
Estimate overconsolidation ratio (OCR) from water content and Su | |
Based on empirical correlations for ST samples | |
""" | |
# Simplified correlation: OCR ≈ (Su_measured / Su_normally_consolidated) | |
# For normally consolidated clays: Su ≈ 0.22 * σ'v | |
# Approximate σ'v from water content using typical correlations | |
if water_content > 50: | |
# High water content suggests normally consolidated or slightly overconsolidated | |
expected_su_nc = max(15, 100 - water_content) # Simplified correlation | |
else: | |
# Lower water content suggests overconsolidation | |
expected_su_nc = max(50, 150 - 2 * water_content) | |
ocr_estimate = su_value / expected_su_nc if expected_su_nc > 0 else 1.0 | |
return max(0.5, min(ocr_estimate, 10.0)) # Reasonable bounds | |
def _recommend_st_layer_division(self, water_content: float, su_value: float) -> Dict: | |
""" | |
Recommend layer division strategy for ST samples based on water content and Su results | |
""" | |
recommendation = { | |
'division_strategy': 'single_layer', | |
'reason': 'Uniform properties', | |
'subdivision_criteria': [] | |
} | |
# Check for significant property variations that suggest subdivision | |
if water_content > 60 and su_value > 75: | |
recommendation['division_strategy'] = 'check_variation' | |
recommendation['reason'] = 'Conflicting water content and strength - check for property variations' | |
recommendation['subdivision_criteria'].append('Water content variation > 10%') | |
recommendation['subdivision_criteria'].append('Su variation > 30%') | |
elif water_content < 20 and su_value < 80: | |
recommendation['division_strategy'] = 'check_variation' | |
recommendation['reason'] = 'Both low water content and Su - check for soil type variations' | |
recommendation['subdivision_criteria'].append('Plasticity index variations') | |
recommendation['subdivision_criteria'].append('Sieve analysis variations') | |
elif abs(water_content - 30) > 20 or su_value > 300: | |
recommendation['division_strategy'] = 'subdivide_recommended' | |
recommendation['reason'] = 'Extreme properties suggest heterogeneous layer' | |
recommendation['subdivision_criteria'].append('Test at multiple depths') | |
recommendation['subdivision_criteria'].append('Check for interbedded materials') | |
return recommendation | |
def get_processing_summary(self, layers: List[Dict]) -> Dict[str, Any]: | |
""" | |
Generate a summary of the soil layer processing | |
""" | |
summary = { | |
'total_layers': len(layers), | |
'st_samples': 0, | |
'ss_samples': 0, | |
'clay_layers': 0, | |
'sand_layers': 0, | |
'su_calculated': 0, | |
'phi_calculated': 0, | |
'clay_consistency_checks': 0, | |
'consistent_clays': 0, | |
'inconsistent_clays': 0, | |
'unit_conversions': [], | |
'processing_notes': [] | |
} | |
for layer in layers: | |
# Count sample types | |
sample_type = layer.get('sample_type', '') | |
if sample_type == 'ST': | |
summary['st_samples'] += 1 | |
elif sample_type == 'SS': | |
summary['ss_samples'] += 1 | |
# Count soil types | |
soil_type = layer.get('soil_type', '') | |
if soil_type == 'clay': | |
summary['clay_layers'] += 1 | |
elif soil_type in ['sand', 'silt']: | |
summary['sand_layers'] += 1 | |
# Count calculated parameters | |
if 'su_source' in layer and 'Calculated' in layer['su_source']: | |
summary['su_calculated'] += 1 | |
if 'phi_source' in layer and 'Calculated' in layer['phi_source']: | |
summary['phi_calculated'] += 1 | |
# Count clay consistency checks | |
if 'clay_consistency_check' in layer: | |
summary['clay_consistency_checks'] += 1 | |
consistency_result = layer['clay_consistency_check'] | |
if consistency_result.get('is_consistent', False): | |
summary['consistent_clays'] += 1 | |
else: | |
summary['inconsistent_clays'] += 1 | |
return summary |