soil_profile / soil_classification.py
Sompote's picture
Upload 17 files
2c200f8 verified
import re
import numpy as np
import streamlit as st
from typing import Dict, List, Any, Tuple, Optional
class SoilClassificationProcessor:
"""
Advanced soil classification processor that handles SS and ST samples
with proper unit conversions and soil parameter calculations
"""
def __init__(self):
# Enhanced unit conversion factors to SI units
self.unit_conversions = {
# Pressure/Stress units to kPa
'psi': 6.895,
'psf': 0.04788,
'kpa': 1.0,
'kn/m2': 1.0,
'kn/m²': 1.0,
'knm2': 1.0,
'mpa': 1000.0,
'pa': 0.001,
'n/m2': 0.001,
'n/m²': 0.001,
'nm2': 0.001,
'ksf': 47.88,
'tsf': 95.76,
'kg/cm2': 98.0,
'kg/cm²': 98.0,
'kgcm2': 98.0,
'ksc': 98.0, # kilograms per square centimeter (same as kg/cm²)
'bar': 100.0,
'atm': 101.325, # atmosphere to kPa
'mmhg': 0.133322, # mmHg to kPa
'inhg': 3.386, # inHg to kPa
# Enhanced tonnes/tons per square meter conversions
't/m2': 9.81, # tonnes per square meter to kPa
't/m²': 9.81, # tonnes per square meter to kPa
'tm2': 9.81, # tm2 variant
'ton/m2': 9.81, # ton per square meter to kPa
'ton/m²': 9.81, # ton per square meter to kPa
'tonm2': 9.81, # tonm2 variant
'tonnes/m2': 9.81, # tonnes per square meter to kPa
'tonnes/m²': 9.81, # tonnes per square meter to kPa
'tonnesm2': 9.81, # tonnesm2 variant
'tonne/m2': 9.81, # tonne per square meter to kPa
'tonne/m²': 9.81, # tonne per square meter to kPa
'tonnem2': 9.81, # tonnem2 variant
# Additional international pressure units
'kgf/cm2': 98.0, # kilogram-force per cm²
'kgf/cm²': 98.0, # kilogram-force per cm²
'kgfcm2': 98.0, # variant without symbols
'lbf/in2': 6.895, # pound-force per square inch (same as psi)
'lbf/ft2': 0.04788, # pound-force per square foot (same as psf)
'lbfin2': 6.895, # variant without symbols
'lbfft2': 0.04788, # variant without symbols
# Length units to meters (enhanced)
'ft': 0.3048,
'feet': 0.3048,
'foot': 0.3048,
"'": 0.3048, # foot symbol
'in': 0.0254,
'inch': 0.0254,
'inches': 0.0254,
'"': 0.0254, # inch symbol
'cm': 0.01,
'mm': 0.001,
'km': 1000.0,
'm': 1.0,
'meter': 1.0,
'metre': 1.0,
'meters': 1.0,
'metres': 1.0,
'yd': 0.9144, # yard to meters
'yard': 0.9144,
'yards': 0.9144,
# Weight/Force units (for completeness)
'n': 1.0, # Newton (SI base)
'kn': 1000.0, # kilonewton to Newton
'kgf': 9.81, # kilogram-force to Newton
'lbf': 4.448, # pound-force to Newton
'lb': 4.448, # pound (assuming force context)
'kg': 9.81, # kilogram (assuming force context, kg*g)
}
# Soil classification criteria
self.sieve_200_threshold = 50.0 # % passing sieve #200 for clay classification
def process_soil_layers(self, layers: List[Dict]) -> List[Dict]:
"""
Process soil layers with SS/ST sample classification and parameter calculation
"""
processed_layers = []
st.info("🔬 Processing soil layers with SS/ST sample classification...")
for i, layer in enumerate(layers):
processed_layer = layer.copy()
# Step 1: Identify sample type (SS or ST)
sample_type = self._identify_sample_type(layer)
processed_layer['sample_type'] = sample_type
# Step 2: Classify soil type if not already classified
soil_type = self._classify_soil_type(layer)
processed_layer['soil_type'] = soil_type
# Step 3: Process based on sample type
if sample_type == 'ST':
processed_layer = self._process_st_sample(processed_layer)
elif sample_type == 'SS':
processed_layer = self._process_ss_sample(processed_layer)
else:
# Default processing for unidentified samples
processed_layer = self._process_default_sample(processed_layer)
# Step 4: Ensure all units are in SI
processed_layer = self._convert_to_si_units(processed_layer)
# Step 5: Validate and add engineering parameters
processed_layer = self._add_engineering_parameters(processed_layer)
# Step 6: Check clay consistency (water content vs Su)
processed_layer = self._check_clay_consistency(processed_layer)
processed_layers.append(processed_layer)
# Progress feedback
st.write(f" ✅ Layer {i+1}: {sample_type} sample, {soil_type} - {processed_layer.get('strength_parameter', 'N/A')}")
st.success(f"✅ Processed {len(processed_layers)} soil layers with SS/ST classification")
return processed_layers
def _identify_sample_type(self, layer: Dict) -> str:
"""
Identify if sample is Split Spoon (SS) or Shelby Tube (ST)
CRITICAL: Look at FIRST COLUMN stratification symbols with ABSOLUTE HIGHEST PRIORITY
"""
description = layer.get('description', '').lower()
# ABSOLUTE HIGHEST PRIORITY: Check for first column stratification symbols
# Patterns for first column recognition: SS-18, ST-5, SS18, ST3, etc.
first_column_patterns = [
# High precision patterns for first column symbols
r'^[^|]*\b(ss[-]?\d+)\b', # SS-18, SS18 at start or before pipe
r'^[^|]*\b(st[-]?\d+)\b', # ST-5, ST5 at start or before pipe
r'^\s*(ss[-]?\d+)', # SS-number at very beginning
r'^\s*(st[-]?\d+)', # ST-number at very beginning
r'\|(.*?)(ss[-]?\d+)', # After pipe separator
r'\|(.*?)(st[-]?\d+)', # After pipe separator
r'\b(ss[-]?\d+)\s*[|:]', # SS-number followed by pipe or colon
r'\b(st[-]?\d+)\s*[|:]', # ST-number followed by pipe or colon
]
for pattern in first_column_patterns:
match = re.search(pattern, description, re.IGNORECASE)
if match:
# Get the SS/ST part (could be in different groups)
matched_groups = [g for g in match.groups() if g and ('ss' in g.lower() or 'st' in g.lower())]
if matched_groups:
matched_text = matched_groups[0].lower().strip()
if matched_text.startswith('ss'):
st.success(f"🎯 FIRST COLUMN DETECTED: {matched_text.upper()} → SS sample (HIGHEST PRIORITY)")
return 'SS'
elif matched_text.startswith('st'):
st.success(f"🎯 FIRST COLUMN DETECTED: {matched_text.upper()} → ST sample (HIGHEST PRIORITY)")
return 'ST'
# FALLBACK: Check for standalone SS/ST symbols (lower priority)
standalone_patterns = [
r'\bss\b(?!\w)', # Just SS (not part of another word)
r'\bst\b(?!\w)' # Just ST (not part of another word)
]
for pattern in standalone_patterns:
match = re.search(pattern, description, re.IGNORECASE)
if match:
matched_text = match.group(0).lower()
if matched_text == 'ss':
st.info(f"📊 Standalone symbol detected: SS → SS sample")
return 'SS'
elif matched_text == 'st':
st.info(f"📊 Standalone symbol detected: ST → ST sample")
return 'ST'
# SECOND: Check for keywords in description
# Keywords for ST samples
st_keywords = ['shelby', 'tube', 'undisturbed', 'ut', 'unconfined', 'uu test', 'ucs']
# Keywords for SS samples
ss_keywords = ['split spoon', 'spt', 'standard penetration', 'disturbed', 'n-value']
# Check for ST indicators
if any(keyword in description for keyword in st_keywords):
return 'ST'
# Check for SS indicators
if any(keyword in description for keyword in ss_keywords):
return 'SS'
# THIRD: Check strength parameter types
# Check if SPT-N value is present (indicates SS)
if layer.get('strength_parameter') == 'SPT-N' or 'spt' in description:
return 'SS'
# Check if Su value is present (could indicate ST)
if layer.get('strength_parameter') == 'Su' or 'su' in description.lower():
return 'ST'
# FOURTH: Default assumption based on available data
if layer.get('strength_value') and layer.get('strength_value') > 50:
return 'SS' # High values typically SPT-N
else:
return 'ST' # Lower values typically Su
def _classify_soil_type(self, layer: Dict) -> str:
"""
Enhanced soil type classification with MANDATORY sieve analysis requirement for sand
CRITICAL: Sand layers MUST have sieve analysis evidence - otherwise assume clay
"""
# Check if soil type is already specified and validate it
existing_type = layer.get('soil_type', '').lower()
if existing_type and existing_type != 'unknown':
# If it's sand/gravel, verify sieve analysis exists
if existing_type in ['sand', 'silt', 'gravel']:
sieve_200_passing = self._extract_sieve_200_data(layer)
if sieve_200_passing is None:
st.warning(f"⚠️ '{existing_type}' classification without sieve analysis data. OVERRIDING to 'clay' per requirements.")
layer['classification_override'] = f"Changed from '{existing_type}' to 'clay' - no sieve analysis data"
return 'clay'
else:
st.success(f"✅ '{existing_type}' classification confirmed with sieve #200: {sieve_200_passing}% passing")
return existing_type
else:
return existing_type
description = layer.get('description', '').lower()
# CRITICAL: Check for sieve analysis data FIRST before any classification
sieve_200_passing = self._extract_sieve_200_data(layer)
if sieve_200_passing is not None:
# Sieve analysis data available - use it for classification
if sieve_200_passing > self.sieve_200_threshold:
classification = 'clay' # Fine-grained soil
st.success(f"✅ Classified as CLAY: {sieve_200_passing}% passing #200 (>50%)")
else:
classification = 'sand' # Coarse-grained soil
st.success(f"✅ Classified as SAND: {sieve_200_passing}% passing #200 (<50%)")
layer['sieve_200_passing'] = sieve_200_passing
layer['classification_basis'] = f"Sieve analysis: {sieve_200_passing}% passing #200"
return classification
# NO SIEVE ANALYSIS DATA - Check for explicit mentions but apply strict rules
potential_classifications = []
if any(clay_word in description for clay_word in ['clay', 'clayey', 'ch', 'cl']):
potential_classifications.append('clay')
if any(sand_word in description for sand_word in ['sand', 'sandy', 'sp', 'sw', 'sm', 'sc']):
potential_classifications.append('sand')
if any(silt_word in description for silt_word in ['silt', 'silty', 'ml', 'mh']):
potential_classifications.append('silt')
if any(gravel_word in description for gravel_word in ['gravel', 'gp', 'gw', 'gm', 'gc']):
potential_classifications.append('gravel')
# ENFORCE MANDATORY RULE: No sand/silt/gravel without sieve analysis
if any(coarse_type in potential_classifications for coarse_type in ['sand', 'silt', 'gravel']):
st.error(f"❌ CRITICAL: Found potential {potential_classifications} classification but NO sieve analysis data!")
st.warning(f"🔧 ENFORCING RULE: Classifying as 'clay' - sand/silt/gravel requires sieve analysis evidence")
layer['classification_override'] = f"Forced clay classification - found {potential_classifications} terms but no sieve data"
layer['sieve_200_passing'] = None
layer['classification_basis'] = "Assumed clay - no sieve analysis data available (mandatory requirement)"
return 'clay'
# Default to clay if only clay terms found or no clear classification
if 'clay' in potential_classifications or not potential_classifications:
st.info(f"💡 Classified as CLAY: {potential_classifications if potential_classifications else 'No explicit soil type found'}")
layer['sieve_200_passing'] = None
layer['classification_basis'] = "Assumed clay - no sieve analysis data available"
return 'clay'
# Final fallback - should not reach here
st.warning(f"⚠️ Unclear classification. Defaulting to 'clay' per mandatory requirements.")
layer['sieve_200_passing'] = None
layer['classification_basis'] = "Default clay classification - unclear soil type and no sieve data"
return 'clay'
def _extract_sieve_200_data(self, layer: Dict) -> Optional[float]:
"""
Enhanced sieve #200 passing percentage extraction with comprehensive pattern recognition
"""
description = layer.get('description', '')
# Enhanced patterns to catch all possible sieve analysis formats
patterns = [
# Standard #200 sieve patterns
r'#200[:\s]*(\d+(?:\.\d+)?)%',
r'sieve\s*#?200[:\s]*(\d+(?:\.\d+)?)%',
r'no\.?\s*200[:\s]*(\d+(?:\.\d+)?)%',
r'passing\s*#?200[:\s]*(\d+(?:\.\d+)?)%',
r'(\d+(?:\.\d+)?)%\s*passing\s*#?200',
# Fines content (equivalent to #200 passing)
r'fines[:\s]*(\d+(?:\.\d+)?)%',
r'fine[s]?\s*content[:\s]*(\d+(?:\.\d+)?)%',
r'(\d+(?:\.\d+)?)%\s*fines',
# 0.075mm equivalent (same as #200)
r'0\.075\s*mm[:\s]*(\d+(?:\.\d+)?)%\s*passing',
r'(\d+(?:\.\d+)?)%\s*passing\s*0\.075\s*mm',
r'0\.075[:\s]*(\d+(?:\.\d+)?)%',
# Particle size analysis patterns
r'particle\s*size[:\s]*(\d+(?:\.\d+)?)%\s*fines',
r'gradation[:\s]*(\d+(?:\.\d+)?)%\s*passing\s*#?200',
r'grain\s*size[:\s]*(\d+(?:\.\d+)?)%\s*fines',
# Sieve analysis results patterns
r'sieve\s*analysis[:\s].*?(\d+(?:\.\d+)?)%\s*passing\s*#?200',
r'sieve\s*analysis[:\s].*?#?200[:\s]*(\d+(?:\.\d+)?)%',
# ASTM/Standard method references
r'astm\s*d422[:\s].*?(\d+(?:\.\d+)?)%\s*passing\s*#?200',
r'astm\s*d6913[:\s].*?(\d+(?:\.\d+)?)%\s*passing\s*#?200',
# Alternative formats
r'(\d+(?:\.\d+)?)%\s*<\s*0\.075\s*mm', # Percent less than 0.075mm
r'minus\s*#?200[:\s]*(\d+(?:\.\d+)?)%', # Minus #200
r'(\d+(?:\.\d+)?)%\s*minus\s*#?200', # Percent minus #200
]
for pattern in patterns:
match = re.search(pattern, description, re.IGNORECASE)
if match:
percentage = float(match.group(1))
st.success(f"✅ Found sieve #200 data: {percentage}% passing from '{match.group(0)}'")
# Validate percentage range
if 0 <= percentage <= 100:
return percentage
else:
st.warning(f"⚠️ Invalid percentage ({percentage}%) found. Should be 0-100%.")
return None
# Check if explicitly mentioned in layer data
if 'sieve_200_passing' in layer and layer['sieve_200_passing'] is not None:
percentage = float(layer['sieve_200_passing'])
st.success(f"✅ Found sieve #200 data in layer field: {percentage}% passing")
return percentage
# Check for related field names
for field_name in ['fines_content', 'percent_fines', 'fine_content', 'passing_200']:
if field_name in layer and layer[field_name] is not None:
percentage = float(layer[field_name])
st.success(f"✅ Found sieve #200 equivalent in '{field_name}': {percentage}% passing")
return percentage
# Log that no sieve analysis was found
st.info(f"🔍 No sieve #200 analysis data found in layer description or fields")
return None
def _process_st_sample(self, layer: Dict) -> Dict:
"""
Process Shelby Tube (ST) sample - use unconfined compression test (Su) values
"""
layer['processing_method'] = 'ST - Unconfined Compression Test'
# Look for Su values in the data
su_value = self._extract_su_value(layer)
if su_value is not None:
layer['strength_parameter'] = 'Su'
layer['strength_value'] = su_value
layer['su_source'] = 'Unconfined Compression Test'
else:
# If no Su value found, check for SPT and convert
spt_value = self._extract_spt_value(layer)
if spt_value is not None:
su_calculated = self._convert_spt_to_su(spt_value)
layer['strength_parameter'] = 'Su'
layer['strength_value'] = su_calculated
layer['su_source'] = f'Calculated from SPT-N={spt_value} (Su=5*N)'
layer['original_spt'] = spt_value
return layer
def _process_ss_sample(self, layer: Dict) -> Dict:
"""
Process Split Spoon (SS) sample - ALWAYS use SPT values and convert to Su using Su=5*N
FOR SS SAMPLES: IGNORE any unconfined compression test Su values, ONLY use calculated Su=5*N
"""
layer['processing_method'] = 'SS - SPT Conversion (Su=5*N)'
# CRITICAL: For SS samples, extract the raw SPT-N value and calculate Su from it
spt_value = self._extract_spt_value(layer)
soil_type = layer.get('soil_type', 'clay')
if spt_value is not None:
if soil_type == 'clay':
# MANDATORY: Convert SPT to undrained shear strength using Su = 5*N
# IGNORE any existing Su values from unconfined compression tests
calculated_su = self._convert_spt_to_su(spt_value)
# Override any existing Su values for SS samples
layer['strength_parameter'] = 'Su'
layer['strength_value'] = calculated_su
layer['su_source'] = f'Calculated from raw N={spt_value} (Su=5*N) - SS Sample'
layer['original_spt'] = spt_value
# Clear any conflicting unconfined compression data for SS samples
if 'unconfined_su' in layer:
layer['unconfined_su_ignored'] = layer.pop('unconfined_su')
st.warning(f"⚠️ SS Sample: Ignored unconfined compression Su, using calculated Su={calculated_su:.0f} kPa from N={spt_value}")
st.success(f"✅ SS Sample: Su = 5 × {spt_value} = {calculated_su:.0f} kPa")
elif soil_type in ['sand', 'silt']:
# Convert SPT to friction angle for granular soils
phi_value = self._convert_spt_to_friction_angle(spt_value)
layer['strength_parameter'] = 'φ'
layer['strength_value'] = phi_value
layer['friction_angle'] = phi_value
layer['phi_source'] = f'Calculated from raw N={spt_value} (Peck method) - SS Sample'
layer['original_spt'] = spt_value
st.success(f"✅ SS Sample: φ = {phi_value:.1f}° from N={spt_value}")
else:
# Keep SPT value for other soil types
layer['strength_parameter'] = 'SPT-N'
layer['strength_value'] = spt_value
layer['original_spt'] = spt_value
st.info(f"📊 SS Sample: Using raw N={spt_value} for {soil_type}")
else:
st.error(f"❌ SS Sample: No SPT-N value found in layer data")
return layer
def _process_default_sample(self, layer: Dict) -> Dict:
"""
Process sample with unknown type - use available data intelligently
"""
layer['processing_method'] = 'Default - Based on available data'
# Try to identify and process based on existing parameters
existing_param = layer.get('strength_parameter', '').lower()
if 'su' in existing_param:
# Already has Su value
return self._process_st_sample(layer)
elif 'spt' in existing_param or 'n' in existing_param:
# Has SPT value
return self._process_ss_sample(layer)
else:
# Make best guess based on strength value
strength_val = layer.get('strength_value', 0)
if strength_val and strength_val > 50:
# Likely SPT value
layer['strength_parameter'] = 'SPT-N'
return self._process_ss_sample(layer)
else:
# Likely Su value
layer['strength_parameter'] = 'Su'
return self._process_st_sample(layer)
def _extract_su_value(self, layer: Dict) -> Optional[float]:
"""
Enhanced Su (undrained shear strength) extraction with MANDATORY unit conversion checking
CRITICAL: All Su values must be converted to kPa before processing
"""
# Check direct Su field first - but validate units
if layer.get('strength_parameter') == 'Su' and layer.get('strength_value') is not None:
su_value = float(layer['strength_value'])
# Check if this value needs unit conversion (warn if suspiciously low/high)
if su_value < 5:
st.warning(f"⚠️ Su value {su_value} seems low - verify it's in kPa, not MPa or other units")
elif su_value > 2000:
st.warning(f"⚠️ Su value {su_value} seems high - verify it's in kPa, not psi or other units")
return su_value
# Look in description for Su values with enhanced unit detection
description = layer.get('description', '')
# CRITICAL: Enhanced patterns with explicit unit capture for conversion
patterns = [
# Direct Su values with units - CAPTURE UNITS EXPLICITLY
r'su[:\s=]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)',
r'undrained[:\s]*shear[:\s]*strength[:\s]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)',
r'shear\s*strength[:\s]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)',
r'ucs[:\s]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)',
r'unconfined[:\s]*compression[:\s]*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)',
# Equation-style patterns
r'su\s*=\s*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)',
r'strength\s*=\s*(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²|psi|psf|ksc|kg/cm2|kg/cm²|t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²|mpa)',
# Embedded unit patterns
r'(\d+(?:\.\d+)?)\s*(kpa|kn/m2|kn/m²)\s*(?:su|strength)',
r'(\d+(?:\.\d+)?)\s*(ksc|kg/cm2|kg/cm²)\s*(?:su|strength)',
r'(\d+(?:\.\d+)?)\s*(t/m2|t/m²|ton/m2|ton/m²|tonnes?/m2|tonnes?/m²)\s*(?:su|strength)',
r'(\d+(?:\.\d+)?)\s*(psi|psf)\s*(?:su|strength)',
r'(\d+(?:\.\d+)?)\s*(mpa)\s*(?:su|strength)',
# Common non-SI units that need conversion
r'(\d+(?:\.\d+)?)\s*ksc\b', # ksc without explicit "su"
r'(\d+(?:\.\d+)?)\s*t/m²?\b', # tonnes/m²
r'(\d+(?:\.\d+)?)\s*psi\b', # psi
]
for pattern in patterns:
match = re.search(pattern, description, re.IGNORECASE)
if match:
value = float(match.group(1))
unit = match.group(2).lower() if len(match.groups()) > 1 and match.group(2) else 'kpa'
# CRITICAL: Alert if unit conversion is needed
if unit != 'kpa':
st.warning(f"🔧 UNIT CONVERSION REQUIRED: Found Su = {value} {unit.upper()}")
# Convert to kPa with detailed logging
converted_value = self._convert_pressure_to_kpa(value, unit)
# Store original values for verification
layer['original_su_value'] = value
layer['original_su_unit'] = unit.upper()
layer['converted_su_note'] = f"Converted from {value} {unit.upper()} to {converted_value:.1f} kPa"
# Enhanced validation with context-aware warnings
if converted_value < 1:
st.error(f"❌ Very low Su = {converted_value:.3f} kPa after conversion. Check original value: {value} {unit}")
elif converted_value > 2000:
st.warning(f"⚠️ Very high Su = {converted_value:.0f} kPa after conversion from {value} {unit}. Verify this is correct.")
elif 1 <= converted_value <= 1000:
st.success(f"✅ Su = {converted_value:.1f} kPa (converted from {value} {unit.upper()})")
else:
st.info(f"📊 Su = {converted_value:.1f} kPa (converted from {value} {unit.upper()}) - unusual but accepted")
return converted_value
# Check for unitless Su values (assume kPa but warn)
unitless_patterns = [
r'su[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[a-zA-Z])', # Su value not followed by units
r'shear\s*strength[:\s]*(\d+(?:\.\d+)?)\b(?!\s*[a-zA-Z])',
r'unconfined[:\s]*(\d+(?:\.\d+)?)\b(?!\s*[a-zA-Z])',
]
for pattern in unitless_patterns:
match = re.search(pattern, description, re.IGNORECASE)
if match:
value = float(match.group(1))
st.warning(f"⚠️ Found Su = {value} WITHOUT UNITS! Assuming kPa - please verify.")
layer['assumed_unit_warning'] = f"Assumed {value} is in kPa (no units specified)"
return value
# Check for explicit Su field in layer data
if 'su_value' in layer and layer['su_value'] is not None:
value = float(layer['su_value'])
st.info(f"📊 Using Su = {value:.1f} from field 'su_value' (assumed kPa)")
return value
# Check for other strength-related fields that might contain Su
for field_name in ['undrained_strength', 'unconfined_strength', 'cohesion']:
if field_name in layer and layer[field_name] is not None:
value = float(layer[field_name])
st.info(f"📊 Using Su = {value:.1f} kPa from field '{field_name}' (assumed kPa)")
return value
return None
def _extract_spt_value(self, layer: Dict) -> Optional[float]:
"""
Enhanced SPT-N value extraction for SS samples - USE RAW N VALUE ONLY, NOT N-CORRECTED
Improved pattern matching for better SS layer division
"""
# Check direct SPT field
if layer.get('strength_parameter') == 'SPT-N' and layer.get('strength_value'):
return float(layer['strength_value'])
# Look in description for SPT values - PRIORITIZE RAW N VALUES
description = layer.get('description', '')
# ENHANCED: Look for raw N value patterns with better precision
raw_n_patterns = [
# High priority patterns for raw N values
r'\braw[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Raw N value
r'\bfield[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Field N value
r'\bmeasured[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Measured N value
r'\bactual[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Actual N value
r'\bobserved[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Observed N value
# Standard N patterns NOT followed by correction terms
r'\bn[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # N value NOT corrected
r'\bspt[:\s]*n[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # SPT-N NOT corrected
r'\bn[-\s]?value[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # N-value NOT corrected
r'\bn\s*=\s*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))', # N = value NOT corrected
# Blow count patterns
r'\bblow[s]?[:\s]*count[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))',
r'\bblows[:\s]*per[:\s]*foot[:\s=]*(\d+(?:\.\d+)?)',
r'\bblow[s]?[:\s=]*(\d+(?:\.\d+)?)\b(?!\s*[-]?(?:corr|correct|adj|adjust))',
# SS sample specific patterns
r'\bss[-\s]*\d*[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # SS sample with N
r'\bsplit[:\s]*spoon[:\s]*n[:\s=]*(\d+(?:\.\d+)?)', # Split spoon N
]
# First try to find raw N values with enhanced logging
for i, pattern in enumerate(raw_n_patterns):
match = re.search(pattern, description, re.IGNORECASE)
if match:
n_value = float(match.group(1))
pattern_type = ["Raw N", "Field N", "Measured N", "Actual N", "Observed N",
"Standard N", "SPT-N", "N-value", "N=", "Blow count",
"Blows/ft", "Blows", "SS N", "Split spoon N"][min(i, 13)]
st.success(f"✅ SS Sample: Using {pattern_type} = {n_value} from: '{match.group(0)}'")
# Additional validation for SS samples
if n_value > 100:
st.warning(f"⚠️ Very high N value ({n_value}) detected. Please verify this is correct.")
elif n_value == 0:
st.warning(f"⚠️ Zero N value detected. May indicate very soft soil or measurement issue.")
return n_value
# Enhanced fallback patterns with warnings
fallback_patterns = [
r'\bn[:\s=]*(\d+(?:\.\d+)?)',
r'\bspt[:\s]*(\d+(?:\.\d+)?)',
r'(\d+(?:\.\d+)?)\s*(?:blow|n)',
r'penetration[:\s]*(\d+(?:\.\d+)?)',
r'resistance[:\s]*(\d+(?:\.\d+)?)'
]
for pattern in fallback_patterns:
match = re.search(pattern, description, re.IGNORECASE)
if match:
n_value = float(match.group(1))
# Enhanced warnings for SS samples
warning_indicators = ['corr', 'correct', 'adj', 'adjust', 'modified', 'norm']
has_correction_indicator = any(indicator in description.lower() for indicator in warning_indicators)
if has_correction_indicator:
st.error(f"❌ SS Sample: Found N = {n_value} but description contains correction terms. This may be corrected N, not raw N!")
st.info("💡 For SS samples, use only raw field N values (not corrected). Check original field logs.")
# Still return the value but flag it
layer['n_value_warning'] = f"Potentially corrected N value: {n_value}"
else:
st.info(f"📊 SS Sample: Using N = {n_value} from: '{match.group(0)}' (fallback pattern)")
return n_value
# If no N value found, provide specific guidance for SS samples
st.error(f"❌ SS Sample: No SPT-N value found in layer data")
st.info("💡 SS samples require SPT-N values. Look for: N=X, SPT-N=X, raw N=X, field N=X, or blow count.")
return None
def _convert_spt_to_su(self, spt_n: float) -> float:
"""
Convert SPT-N to undrained shear strength (Su) using Su = 5*N correlation
Enhanced for SS samples with validation
"""
if spt_n <= 0:
st.warning(f"⚠️ Invalid N value ({spt_n}) for Su calculation. Using N=1 as minimum.")
spt_n = 1.0
su_calculated = 5.0 * spt_n
# Add validation and guidance for SS clay samples
if su_calculated < 10:
st.info(f"💡 Very low Su = {su_calculated:.0f} kPa from N={spt_n}. Indicates very soft clay.")
elif su_calculated > 500:
st.warning(f"⚠️ Very high Su = {su_calculated:.0f} kPa from N={spt_n}. Verify N value is raw (not corrected).")
return su_calculated
def _convert_spt_to_friction_angle(self, spt_n: float) -> float:
"""
Enhanced SPT-N to friction angle conversion for sand/silt layers in SS samples
Uses improved Peck method with soil type considerations
"""
if spt_n <= 0:
st.warning(f"⚠️ Invalid N value ({spt_n}) for friction angle calculation. Using N=1 as minimum.")
spt_n = 1.0
# Enhanced Peck correlation with improvements:
# φ = 27.1 + 0.3 * N - 0.00054 * N² (for fine to medium sand)
# Valid for N up to 50, with adjustments for different sand types
n_limited = min(spt_n, 50) # Cap at 50 for correlation validity
# Base Peck correlation
phi = 27.1 + 0.3 * n_limited - 0.00054 * (n_limited ** 2)
# Ensure reasonable minimum
phi_final = max(phi, 28) # Minimum reasonable friction angle for sand
phi_final = min(phi_final, 45) # Maximum reasonable friction angle
# Add validation and guidance for SS sand samples
if phi_final < 30:
st.info(f"💡 Low φ = {phi_final:.1f}° from N={spt_n}. Indicates loose sand or silty sand.")
elif phi_final > 40:
st.info(f"💡 High φ = {phi_final:.1f}° from N={spt_n}. Indicates dense, well-graded sand.")
# Special handling for very low or high N values
if spt_n < 4:
st.warning(f"⚠️ Very low N={spt_n} for sand. May indicate loose sand or silt. Consider checking soil classification.")
elif spt_n > 40:
st.info(f"💡 Very high N={spt_n} for sand. Indicates very dense sand or possible gravel content.")
return phi_final
def _convert_pressure_to_kpa(self, value: float, unit: str) -> float:
"""
Enhanced pressure value conversion to kPa with comprehensive unit support
"""
if not unit or unit.lower() in ['', 'none', 'null']:
return value # Assume already in kPa if no unit specified
# Normalize unit string for better matching
unit_clean = unit.lower().replace('/', '').replace(' ', '').replace('²', '2').replace('³', '3')
# Remove common punctuation and extra characters
unit_clean = unit_clean.replace('.', '').replace('-', '').replace('_', '')
# Handle specific variations that need special processing
special_cases = {
# Tonne/ton variations
'tm2': 9.81, 'tonm2': 9.81, 'tonnesm2': 9.81, 'tonnem2': 9.81,
# kg/cm² variations
'kgcm2': 98.0, 'kgfcm2': 98.0,
# kN/m² variations
'knm2': 1.0,
# Other common variations
'psig': 6.895, # psi gauge
'psia': 6.895, # psi absolute
'psfa': 0.04788, # psf absolute
'torr': 0.133322, # torr (same as mmHg)
}
# Check special cases first
if unit_clean in special_cases:
conversion_factor = special_cases[unit_clean]
else:
# Standard conversion using enhanced dictionary
conversion_factor = self.unit_conversions.get(unit_clean, None)
# If no exact match found, try intelligent partial matching
if conversion_factor is None:
for known_unit, factor in self.unit_conversions.items():
# Try various normalization approaches
known_normalized = known_unit.replace('/', '').replace('²', '2').replace(' ', '')
if known_normalized == unit_clean:
conversion_factor = factor
break
# Check if unit contains the known unit (for compound units)
if known_unit != unit_clean and known_unit in unit_clean:
conversion_factor = factor
break
# Final fallback - assume kPa if still no match found
if conversion_factor is None:
st.warning(f"⚠️ Unknown pressure unit '{unit}'. Assuming kPa - please verify.")
conversion_factor = 1.0
converted_value = value * conversion_factor
# Enhanced logging with validation
if conversion_factor != 1.0:
st.success(f"🔧 Unit conversion: {value} {unit} = {converted_value:.1f} kPa (×{conversion_factor})")
# Add validation warnings for unusual results
if converted_value > 10000:
st.warning(f"⚠️ Very high pressure result ({converted_value:.0f} kPa). Please verify unit conversion.")
elif converted_value < 0.1 and value > 0:
st.warning(f"⚠️ Very low pressure result ({converted_value:.3f} kPa). Please verify unit conversion.")
return converted_value
def _convert_to_si_units(self, layer: Dict) -> Dict:
"""
Convert all measurements to SI units
"""
# Convert depths to meters
for depth_field in ['depth_from', 'depth_to']:
if depth_field in layer:
depth_val, depth_unit = self._extract_value_and_unit(
str(layer[depth_field]), default_unit='m'
)
layer[depth_field] = self._convert_length_to_meters(depth_val, depth_unit)
# Convert strength values to appropriate SI units
if 'strength_value' in layer and 'strength_parameter' in layer:
param = layer['strength_parameter'].lower()
if param == 'su':
# Convert Su to kPa
strength_val, strength_unit = self._extract_value_and_unit(
str(layer['strength_value']), default_unit='kpa'
)
layer['strength_value'] = self._convert_pressure_to_kpa(strength_val, strength_unit)
layer['strength_unit'] = 'kPa'
# Validate Su value against water content if available
validation_result = self._validate_su_with_water_content(layer)
if validation_result.get('needs_unit_check'):
st.warning(f"⚠️ Su-water content validation: {validation_result['message']}")
layer['unit_validation_warning'] = validation_result['message']
if validation_result['recommendations']:
st.info("💡 Recommendations: " + "; ".join(validation_result['recommendations']))
elif param in ['φ', 'phi', 'friction_angle']:
# Friction angle should be in degrees (already SI)
layer['strength_unit'] = 'degrees'
elif param == 'spt-n':
# SPT-N is dimensionless
layer['strength_unit'] = 'blows/30cm'
return layer
def _extract_value_and_unit(self, value_str: str, default_unit: str = '') -> Tuple[float, str]:
"""
Extract numeric value and unit from a string
"""
# Remove extra spaces and convert to lowercase
clean_str = value_str.strip().lower()
# Pattern to match number followed by optional unit
pattern = r'(\d+(?:\.\d+)?)\s*([a-zA-Z/²]+)?'
match = re.search(pattern, clean_str)
if match:
value = float(match.group(1))
unit = match.group(2) if match.group(2) else default_unit
return value, unit
try:
return float(clean_str), default_unit
except ValueError:
return 0.0, default_unit
def _convert_length_to_meters(self, value: float, unit: str) -> float:
"""
Convert length value to meters
"""
unit_clean = unit.lower().replace(' ', '')
conversion_factor = self.unit_conversions.get(unit_clean, 1.0)
return value * conversion_factor
def _detect_t_m2_unit_error(self, layer: Dict) -> Dict:
"""
Detect if LLM failed to convert t/m² units to kPa
This is the most common unit conversion error
"""
result = {"needs_conversion": False, "critical_error": False}
# Only check layers with Su values
if layer.get("strength_parameter") != "Su" or not layer.get("strength_value"):
return result
su = float(layer["strength_value"])
wc = layer.get("water_content", 0)
description = layer.get("description", "")
# Critical detection: Su values that are likely t/m² but not converted
# Typical t/m² values are 1-8, typical kPa values are 10-400 for clay
# Pattern 1: Su 1-8 with reasonable water content (15-50%)
if 1.0 <= su <= 8.0 and 15 <= wc <= 50:
converted_su = su * 9.81
result.update({
"needs_conversion": True,
"critical_error": True,
"original_su": su,
"converted_su": converted_su,
"unit_error": "t/m²",
"message": f"⚠️ CRITICAL: Su={su:.2f} appears to be in t/m² units, should be {converted_su:.1f} kPa",
"correction": f"{su:.2f} t/m² × 9.81 = {converted_su:.1f} kPa"
})
# Pattern 2: Very low Su (<5) with low water content - could be t/m²
elif su < 5.0 and wc > 0 and wc < 25:
converted_su = su * 9.81
result.update({
"needs_conversion": True,
"critical_error": True,
"original_su": su,
"converted_su": converted_su,
"unit_error": "t/m²",
"message": f"⚠️ POSSIBLE: Su={su:.2f} might be in t/m² units, check if should be {converted_su:.1f} kPa",
"correction": f"{su:.2f} t/m² × 9.81 = {converted_su:.1f} kPa"
})
# Pattern 3: Check description for t/m² mentions
if any(unit in description.lower() for unit in ['t/m²', 't/m2', 'ton/m²', 'ton/m2', 'tonnes/m²']):
if su < 10: # If description mentions t/m² but Su is low, likely not converted
converted_su = su * 9.81
result.update({
"needs_conversion": True,
"critical_error": True,
"original_su": su,
"converted_su": converted_su,
"unit_error": "t/m² (found in description)",
"message": f"⚠️ CRITICAL: Description mentions t/m² but Su={su:.2f} appears unconverted, should be {converted_su:.1f} kPa",
"correction": f"{su:.2f} t/m² × 9.81 = {converted_su:.1f} kPa"
})
return result
def _validate_su_with_water_content(self, layer: Dict) -> Dict:
"""
ENHANCED Su-water content validation with comprehensive unit checking
Standard correlations for clay (empirical relationships):
- Very soft clay: Su < 25 kPa, w% > 40%
- Soft clay: Su 25-50 kPa, w% 30-40%
- Medium clay: Su 50-100 kPa, w% 20-30%
- Stiff clay: Su 100-200 kPa, w% 15-25%
- Very stiff clay: Su 200-400 kPa, w% 10-20%
- Hard clay: Su > 400 kPa, w% < 15%
Key unit conversions to check:
- t/m² → kPa: ×9.81 (CRITICAL)
- ksc → kPa: ×98.0
- psi → kPa: ×6.895
- MPa → kPa: ×1000
"""
validation_result = {
'valid': True,
'needs_unit_check': False,
'critical_unit_error': False,
'suggested_conversion': None,
'message': '',
'recommendations': [],
'recheck_image': False
}
su_value = layer.get('strength_value')
water_content = layer.get('water_content')
soil_type = layer.get('soil_type', '')
description = layer.get('description', '')
# Only validate for clay layers with both Su and water content
if soil_type != 'clay' or not su_value or not water_content:
return validation_result
try:
su = float(su_value)
wc = float(water_content)
# STEP 1: Check for t/m² unit errors first (most common issue)
t_m2_check = self._detect_t_m2_unit_error(layer)
if t_m2_check.get('critical_error'):
validation_result.update({
'critical_unit_error': True,
'needs_conversion': True,
'original_value': t_m2_check['original_su'],
'suggested_value': t_m2_check['converted_su'],
'unit_error_type': t_m2_check['unit_error'],
'suggested_conversion': t_m2_check['correction'],
'message': t_m2_check['message'],
'recheck_image': True,
'reload_picture': True
})
return validation_result
# STEP 2: Check for other unit conversion errors
unit_check_results = self._check_su_unit_conversions(su, wc, description)
if unit_check_results['needs_conversion']:
validation_result.update(unit_check_results)
validation_result['critical_unit_error'] = True
validation_result['recheck_image'] = True
return validation_result
# STEP 3: Detailed correlation analysis
inconsistencies = []
correlation_score = self._calculate_correlation_score(su, wc)
# Very specific clay consistency checks
if su < 25 and wc < 30:
inconsistencies.append(f"Very soft clay (Su={su:.0f}kPa) typically has w%>30%, found {wc:.1f}%")
if wc < 20:
validation_result['recheck_image'] = True
inconsistencies.append("VERIFY: Water content seems too low for very soft clay")
if su > 400 and wc > 30:
inconsistencies.append(f"Hard clay (Su={su:.0f}kPa) typically has w%<20%, found {wc:.1f}%")
validation_result['recheck_image'] = True
inconsistencies.append("VERIFY: Water content seems too high for hard clay")
# Medium-range mismatches
if 50 <= su <= 200 and (wc > 45 or wc < 10):
inconsistencies.append(f"Medium-stiff clay (Su={su:.0f}kPa) with unusual w%={wc:.1f}%")
validation_result['recheck_image'] = True
# STEP 4: Empirical correlation bounds (Terzaghi-Peck relationships)
expected_su_range = self._get_expected_su_range(wc)
if su < expected_su_range['min'] * 0.2 or su > expected_su_range['max'] * 5:
validation_result['needs_unit_check'] = True
validation_result['recheck_image'] = True
inconsistencies.append(f"Su-w% correlation severely off: Expected {expected_su_range['min']:.0f}-{expected_su_range['max']:.0f}kPa for w%={wc:.1f}%, got {su:.0f}kPa")
# STEP 4: Finalize results
if inconsistencies:
validation_result['valid'] = False
validation_result['message'] = '; '.join(inconsistencies)
# Enhanced recommendations
if validation_result['needs_unit_check']:
validation_result['recommendations'].extend([
"⚠️ CRITICAL: Check Su unit conversion carefully",
"t/m² → kPa: multiply by 9.81",
"ksc → kPa: multiply by 98.0",
"psi → kPa: multiply by 6.895",
"MPa → kPa: multiply by 1000",
"🔍 Re-examine the original image/document"
])
if validation_result['recheck_image']:
validation_result['recommendations'].extend([
"📷 RECHECK IMAGE: Values seem inconsistent",
"🔄 Consider reloading the image",
"📋 Verify both Su and water content readings"
])
else:
validation_result['message'] = f"Su-water content correlation acceptable (score: {correlation_score:.1f})"
except (ValueError, TypeError) as e:
validation_result['valid'] = False
validation_result['message'] = f"Could not validate Su-water content: {str(e)}"
validation_result['recheck_image'] = True
return validation_result
def _check_su_unit_conversions(self, su: float, wc: float, description: str) -> Dict:
"""Check for specific unit conversion errors"""
result = {
'needs_conversion': False,
'suggested_conversion': None,
'critical_unit_error': False,
'message': ''
}
# Check for t/m² that wasn't converted (very common error)
if 2 <= su <= 10 and 15 <= wc <= 40:
suggested_su = su * 9.81
result.update({
'needs_conversion': True,
'suggested_conversion': f"{su} t/m² → {suggested_su:.1f} kPa (×9.81)",
'critical_unit_error': True,
'message': f"CRITICAL: Su={su:.1f} appears to be in t/m² (should be {suggested_su:.1f} kPa)"
})
return result
# Check for ksc that wasn't converted
if 0.5 <= su <= 5 and 15 <= wc <= 50:
suggested_su = su * 98.0
result.update({
'needs_conversion': True,
'suggested_conversion': f"{su} ksc → {suggested_su:.1f} kPa (×98)",
'critical_unit_error': True,
'message': f"CRITICAL: Su={su:.1f} appears to be in ksc (should be {suggested_su:.1f} kPa)"
})
return result
# Check for psi that wasn't converted (high values)
if 50 <= su <= 500 and 10 <= wc <= 35:
suggested_su = su * 6.895
result.update({
'needs_conversion': True,
'suggested_conversion': f"{su} psi → {suggested_su:.1f} kPa (×6.895)",
'critical_unit_error': True,
'message': f"CRITICAL: Su={su:.0f} appears to be in psi (should be {suggested_su:.1f} kPa)"
})
return result
# Check for MPa that wasn't converted (very low values)
if 0.01 <= su <= 0.5 and 10 <= wc <= 40:
suggested_su = su * 1000
result.update({
'needs_conversion': True,
'suggested_conversion': f"{su} MPa → {suggested_su:.1f} kPa (×1000)",
'critical_unit_error': True,
'message': f"CRITICAL: Su={su:.2f} appears to be in MPa (should be {suggested_su:.1f} kPa)"
})
return result
return result
def _get_expected_su_range(self, water_content: float) -> Dict[str, float]:
"""Get expected Su range based on water content (empirical correlations)"""
wc = water_content
# Conservative empirical relationships
if wc >= 50:
return {'min': 5, 'max': 20} # Very soft clay
elif wc >= 40:
return {'min': 10, 'max': 35} # Soft clay
elif wc >= 30:
return {'min': 20, 'max': 60} # Medium clay
elif wc >= 20:
return {'min': 40, 'max': 150} # Stiff clay
elif wc >= 15:
return {'min': 80, 'max': 250} # Very stiff clay
else:
return {'min': 150, 'max': 500} # Hard clay
def _calculate_correlation_score(self, su: float, wc: float) -> float:
"""Calculate correlation score (0-10, higher is better)"""
# Simple scoring based on typical relationships
expected_range = self._get_expected_su_range(wc)
if expected_range['min'] <= su <= expected_range['max']:
return 10.0 # Perfect correlation
elif expected_range['min'] * 0.5 <= su <= expected_range['max'] * 2:
return 7.0 # Good correlation
elif expected_range['min'] * 0.2 <= su <= expected_range['max'] * 5:
return 4.0 # Acceptable correlation
else:
return 1.0 # Poor correlation
def _add_engineering_parameters(self, layer: Dict) -> Dict:
"""
Add additional engineering parameters based on soil properties
"""
soil_type = layer.get('soil_type', '')
# Add typical engineering properties based on soil type and strength
if soil_type == 'clay':
su_value = layer.get('strength_value', 0)
if su_value > 0:
# Estimate consistency based on Su
if su_value < 25:
layer['consistency'] = 'very soft'
elif su_value < 50:
layer['consistency'] = 'soft'
elif su_value < 100:
layer['consistency'] = 'medium'
elif su_value < 200:
layer['consistency'] = 'stiff'
elif su_value < 400:
layer['consistency'] = 'very stiff'
else:
layer['consistency'] = 'hard'
# Estimate unit weight (kN/m³)
layer['unit_weight'] = 16 + su_value / 50 # Empirical correlation
layer['unit_weight_unit'] = 'kN/m³'
elif soil_type in ['sand', 'silt']:
# For sand/silt, use SPT-N or friction angle
if 'original_spt' in layer:
spt_n = layer['original_spt']
# Estimate relative density based on SPT-N
if spt_n < 4:
layer['consistency'] = 'very loose'
elif spt_n < 10:
layer['consistency'] = 'loose'
elif spt_n < 30:
layer['consistency'] = 'medium dense'
elif spt_n < 50:
layer['consistency'] = 'dense'
else:
layer['consistency'] = 'very dense'
# Estimate unit weight (kN/m³)
layer['unit_weight'] = 14 + spt_n / 5 # Empirical correlation
layer['unit_weight_unit'] = 'kN/m³'
return layer
def _check_clay_consistency(self, layer: Dict) -> Dict:
"""
Check consistency between water content and Su for clay soils
"""
soil_type = layer.get('soil_type', '')
if soil_type != 'clay':
return layer
su_value = layer.get('strength_value')
water_content = self._extract_water_content(layer)
if su_value and water_content:
# Perform consistency check
consistency_result = self._validate_clay_water_content_su_relationship(
water_content, su_value
)
layer['water_content'] = water_content
layer['water_content_unit'] = '%'
layer['clay_consistency_check'] = consistency_result
# Add consistency notes
if consistency_result['is_consistent']:
layer['consistency_note'] = f"✅ Water content ({water_content}%) consistent with Su ({su_value} kPa)"
else:
layer['consistency_note'] = f"⚠️ {consistency_result['warning']}"
return layer
def _extract_water_content(self, layer: Dict) -> Optional[float]:
"""
Extract water content from layer data
"""
# Check if water content is directly specified
if 'water_content' in layer:
return float(layer['water_content'])
# Look in description for water content values
description = layer.get('description', '')
patterns = [
r'w[:\s=]*(\d+(?:\.\d+)?)\s*%',
r'water\s*content[:\s]*(\d+(?:\.\d+)?)\s*%',
r'moisture\s*content[:\s]*(\d+(?:\.\d+)?)\s*%',
r'wc[:\s=]*(\d+(?:\.\d+)?)\s*%',
r'(\d+(?:\.\d+)?)\s*%\s*moisture',
r'(\d+(?:\.\d+)?)\s*%\s*water'
]
for pattern in patterns:
match = re.search(pattern, description, re.IGNORECASE)
if match:
return float(match.group(1))
return None
def _validate_clay_water_content_su_relationship(self, water_content: float, su_value: float) -> Dict:
"""
Validate the relationship between water content and undrained shear strength for clay
Enhanced analysis for ST layer soil division based on water content and unconfined test results:
- Higher water content generally corresponds to lower Su
- Different clay types have different relationships
- Consider stress history and plasticity effects
"""
# Enhanced empirical relationships for clay consistency with expanded ranges
consistency_ranges = {
'very_soft': {'w_range': (40, 150), 'su_range': (0, 25), 'description': 'High plasticity, organic clays'},
'soft': {'w_range': (25, 70), 'su_range': (25, 50), 'description': 'Normally consolidated clays'},
'medium': {'w_range': (18, 40), 'su_range': (50, 100), 'description': 'Lightly overconsolidated clays'},
'stiff': {'w_range': (12, 28), 'su_range': (100, 200), 'description': 'Overconsolidated clays'},
'very_stiff': {'w_range': (8, 20), 'su_range': (200, 400), 'description': 'Heavily overconsolidated clays'},
'hard': {'w_range': (5, 15), 'su_range': (400, 1000), 'description': 'Desiccated or cemented clays'}
}
# Determine expected consistency based on Su
su_consistency = None
for consistency, ranges in consistency_ranges.items():
if ranges['su_range'][0] <= su_value <= ranges['su_range'][1]:
su_consistency = consistency
break
# Determine expected consistency based on water content
w_consistency = None
for consistency, ranges in consistency_ranges.items():
if ranges['w_range'][0] <= water_content <= ranges['w_range'][1]:
w_consistency = consistency
break
# Check consistency
result = {
'water_content': water_content,
'su_value': su_value,
'w_consistency': w_consistency,
'su_consistency': su_consistency,
'is_consistent': False,
'warning': '',
'note': ''
}
if su_consistency and w_consistency:
if su_consistency == w_consistency:
result['is_consistent'] = True
result['note'] = f"Water content and Su both indicate {su_consistency.replace('_', ' ')} clay"
else:
result['warning'] = f"Inconsistent: Water content suggests {w_consistency.replace('_', ' ')} clay, but Su suggests {su_consistency.replace('_', ' ')} clay"
elif su_consistency and not w_consistency:
if water_content > 60:
result['warning'] = f"Very high water content ({water_content}%) for Su = {su_value} kPa. Check if clay is highly plastic or organic."
elif water_content < 10:
result['warning'] = f"Very low water content ({water_content}%) for clay. Check if sample was dried or is highly over-consolidated."
else:
result['note'] = f"Water content outside typical ranges but Su indicates {su_consistency.replace('_', ' ')} clay"
elif w_consistency and not su_consistency:
result['warning'] = f"Su value ({su_value} kPa) outside typical ranges for clay with {water_content}% water content"
else:
result['warning'] = f"Both water content ({water_content}%) and Su ({su_value} kPa) outside typical clay ranges"
# Enhanced empirical correlation checks for ST layer division
if water_content and su_value:
# Advanced correlation analysis for ST samples
# Check for high plasticity clay indicators
if water_content > 80:
if su_value < 25:
result['note'] = f"High plasticity clay indicated: w={water_content}%, Su={su_value} kPa. Possible CH or organic clay."
elif su_value > 50:
result['warning'] = f"Inconsistent: Very high water content ({water_content}%) with moderate/high Su ({su_value} kPa). Check sample integrity or clay type."
# Check for low plasticity clay indicators
elif water_content < 15:
if su_value > 200:
result['note'] = f"Low plasticity, overconsolidated clay: w={water_content}%, Su={su_value} kPa. Possible CL or aged clay."
elif su_value < 100:
result['warning'] = f"Low water content ({water_content}%) with low Su ({su_value} kPa). Unusual - check if sample was dried."
# Check stress history indicators
ocr_estimate = self._estimate_overconsolidation_ratio(water_content, su_value)
if ocr_estimate > 1.5:
result['note'] = result.get('note', '') + f" Estimated OCR ≈ {ocr_estimate:.1f} (overconsolidated)"
elif ocr_estimate < 0.8:
result['note'] = result.get('note', '') + f" Estimated OCR ≈ {ocr_estimate:.1f} (possibly underconsolidated)"
# Soil division recommendations for ST samples
result['st_division_recommendation'] = self._recommend_st_layer_division(water_content, su_value)
return result
def _estimate_overconsolidation_ratio(self, water_content: float, su_value: float) -> float:
"""
Estimate overconsolidation ratio (OCR) from water content and Su
Based on empirical correlations for ST samples
"""
# Simplified correlation: OCR ≈ (Su_measured / Su_normally_consolidated)
# For normally consolidated clays: Su ≈ 0.22 * σ'v
# Approximate σ'v from water content using typical correlations
if water_content > 50:
# High water content suggests normally consolidated or slightly overconsolidated
expected_su_nc = max(15, 100 - water_content) # Simplified correlation
else:
# Lower water content suggests overconsolidation
expected_su_nc = max(50, 150 - 2 * water_content)
ocr_estimate = su_value / expected_su_nc if expected_su_nc > 0 else 1.0
return max(0.5, min(ocr_estimate, 10.0)) # Reasonable bounds
def _recommend_st_layer_division(self, water_content: float, su_value: float) -> Dict:
"""
Recommend layer division strategy for ST samples based on water content and Su results
"""
recommendation = {
'division_strategy': 'single_layer',
'reason': 'Uniform properties',
'subdivision_criteria': []
}
# Check for significant property variations that suggest subdivision
if water_content > 60 and su_value > 75:
recommendation['division_strategy'] = 'check_variation'
recommendation['reason'] = 'Conflicting water content and strength - check for property variations'
recommendation['subdivision_criteria'].append('Water content variation > 10%')
recommendation['subdivision_criteria'].append('Su variation > 30%')
elif water_content < 20 and su_value < 80:
recommendation['division_strategy'] = 'check_variation'
recommendation['reason'] = 'Both low water content and Su - check for soil type variations'
recommendation['subdivision_criteria'].append('Plasticity index variations')
recommendation['subdivision_criteria'].append('Sieve analysis variations')
elif abs(water_content - 30) > 20 or su_value > 300:
recommendation['division_strategy'] = 'subdivide_recommended'
recommendation['reason'] = 'Extreme properties suggest heterogeneous layer'
recommendation['subdivision_criteria'].append('Test at multiple depths')
recommendation['subdivision_criteria'].append('Check for interbedded materials')
return recommendation
def get_processing_summary(self, layers: List[Dict]) -> Dict[str, Any]:
"""
Generate a summary of the soil layer processing
"""
summary = {
'total_layers': len(layers),
'st_samples': 0,
'ss_samples': 0,
'clay_layers': 0,
'sand_layers': 0,
'su_calculated': 0,
'phi_calculated': 0,
'clay_consistency_checks': 0,
'consistent_clays': 0,
'inconsistent_clays': 0,
'unit_conversions': [],
'processing_notes': []
}
for layer in layers:
# Count sample types
sample_type = layer.get('sample_type', '')
if sample_type == 'ST':
summary['st_samples'] += 1
elif sample_type == 'SS':
summary['ss_samples'] += 1
# Count soil types
soil_type = layer.get('soil_type', '')
if soil_type == 'clay':
summary['clay_layers'] += 1
elif soil_type in ['sand', 'silt']:
summary['sand_layers'] += 1
# Count calculated parameters
if 'su_source' in layer and 'Calculated' in layer['su_source']:
summary['su_calculated'] += 1
if 'phi_source' in layer and 'Calculated' in layer['phi_source']:
summary['phi_calculated'] += 1
# Count clay consistency checks
if 'clay_consistency_check' in layer:
summary['clay_consistency_checks'] += 1
consistency_result = layer['clay_consistency_check']
if consistency_result.get('is_consistent', False):
summary['consistent_clays'] += 1
else:
summary['inconsistent_clays'] += 1
return summary