import numpy as np import re import streamlit as st from typing import Dict, List, Any, Tuple class SoilCalculations: def __init__(self): # Peck correlation coefficients for friction angle calculation self.peck_coefficients = { "fine_sand": {"a": 27.1, "b": 0.3}, "medium_sand": {"a": 27.1, "b": 0.3}, "coarse_sand": {"a": 27.1, "b": 0.3}, "silty_sand": {"a": 25.4, "b": 0.3}, "clayey_sand": {"a": 25.4, "b": 0.3} } def calculate_su_from_n(self, n_value: float, correlation_factor: float = 5.0) -> float: """Calculate undrained shear strength from SPT-N value for clay Su = correlation_factor * N (typically 5-7 for clay)""" if n_value is None or n_value <= 0: return None return correlation_factor * n_value def calculate_friction_angle_peck(self, n_value: float, sand_type: str = "medium_sand", effective_stress: float = 100.0) -> float: """Calculate friction angle using Peck correlation φ = a + b * log10(N60) where N60 is corrected SPT value""" if n_value is None or n_value <= 0: return None # Apply overburden correction (simplified) n60_corrected = n_value * (effective_stress / 100.0) ** 0.5 n60_corrected = min(n60_corrected, 50) # Cap at reasonable value coeffs = self.peck_coefficients.get(sand_type, self.peck_coefficients["medium_sand"]) friction_angle = coeffs["a"] + coeffs["b"] * np.log10(max(n60_corrected, 1)) return min(friction_angle, 45) # Cap at reasonable maximum def classify_soil_consistency(self, soil_type: str, n_value: float = None, su_value: float = None) -> str: """Classify soil consistency based on strength parameters""" if "clay" in soil_type.lower() or "silt" in soil_type.lower(): # Use Su for clay classification if su_value is not None: if su_value < 25: return "very soft" elif su_value < 50: return "soft" elif su_value < 100: return "medium" elif su_value < 200: return "stiff" elif su_value < 400: return "very stiff" else: return "hard" # Use N-value for clay if Su not available elif n_value is not None: if n_value < 2: return "very soft" elif n_value < 4: return "soft" elif n_value < 8: return "medium" elif n_value < 15: return "stiff" elif n_value < 30: return "very stiff" else: return "hard" elif "sand" in soil_type.lower() or "gravel" in soil_type.lower(): # Use N-value for sand classification if n_value is not None: if n_value < 4: return "very loose" elif n_value < 10: return "loose" elif n_value < 30: return "medium dense" elif n_value < 50: return "dense" else: return "very dense" return "unknown" def standardize_units(self, text: str) -> Tuple[str, Dict[str, str]]: """Standardize units in soil boring log text before LLM processing""" unit_conversions = {} standardized_text = text # Convert feet to meters feet_pattern = r'(\d+(?:\.\d+)?)\s*(?:ft|feet|\')' feet_matches = re.findall(feet_pattern, standardized_text, re.IGNORECASE) for match in feet_matches: feet_value = float(match) meters_value = feet_value * 0.3048 old_text = f"{match}ft" new_text = f"{meters_value:.1f}m" standardized_text = standardized_text.replace(old_text, new_text) unit_conversions[old_text] = new_text # Convert psf to kPa psf_pattern = r'(\d+(?:\.\d+)?)\s*(?:psf|lbs?/ft²?)' psf_matches = re.findall(psf_pattern, standardized_text, re.IGNORECASE) for match in psf_matches: psf_value = float(match) kpa_value = psf_value * 0.047880259 old_text = f"{match}psf" new_text = f"{kpa_value:.1f}kPa" standardized_text = standardized_text.replace(old_text, new_text) unit_conversions[old_text] = new_text # Convert psi to kPa psi_pattern = r'(\d+(?:\.\d+)?)\s*(?:psi|lbs?/in²?)' psi_matches = re.findall(psi_pattern, standardized_text, re.IGNORECASE) for match in psi_matches: psi_value = float(match) kpa_value = psi_value * 6.89476 old_text = f"{match}psi" new_text = f"{kpa_value:.1f}kPa" standardized_text = standardized_text.replace(old_text, new_text) unit_conversions[old_text] = new_text # Convert ksc (kg/cm²) to kPa ksc_pattern = r'(\d+(?:\.\d+)?)\s*(?:ksc|kg/cm²?|kg/cm2)' ksc_matches = re.findall(ksc_pattern, standardized_text, re.IGNORECASE) for match in ksc_matches: ksc_value = float(match) kpa_value = ksc_value * 98.0 old_text = f"{match}ksc" new_text = f"{kpa_value:.1f}kPa" standardized_text = standardized_text.replace(old_text, new_text) unit_conversions[old_text] = new_text # Convert t/m² (tonnes per square meter) to kPa tonnes_pattern = r'(\d+(?:\.\d+)?)\s*(?:t/m²?|ton/m²?|tonnes?/m²?|tonne/m²?)' tonnes_matches = re.findall(tonnes_pattern, standardized_text, re.IGNORECASE) for match in tonnes_matches: tonnes_value = float(match) kpa_value = tonnes_value * 9.81 old_text = f"{match}t/m²" new_text = f"{kpa_value:.1f}kPa" standardized_text = standardized_text.replace(old_text, new_text) unit_conversions[old_text] = new_text # Standardize depth notation depth_pattern = r'(\d+(?:\.\d+)?)\s*-\s*(\d+(?:\.\d+)?)\s*(?:ft|feet|\')' standardized_text = re.sub(depth_pattern, lambda m: f"{float(m.group(1))*0.3048:.1f}-{float(m.group(2))*0.3048:.1f}m", standardized_text, flags=re.IGNORECASE) return standardized_text, unit_conversions def enhance_soil_layers(self, soil_layers: List[Dict]) -> List[Dict]: """Enhance soil layers with calculated parameters""" enhanced_layers = [] for layer in soil_layers: enhanced_layer = layer.copy() # Extract values n_value = layer.get("strength_value") if layer.get("strength_parameter") == "SPT-N" else None su_value = layer.get("strength_value") if layer.get("strength_parameter") == "Su" else None soil_type = layer.get("soil_type", "").lower() depth_from = layer.get("depth_from", 0) depth_to = layer.get("depth_to", 0) sample_type = layer.get("sample_type", "") su_source = layer.get("su_source", "") # CRITICAL RULE: For SS samples, ALWAYS use Su=5*N calculation, IGNORE unconfined compression Su if sample_type == "SS" and "clay" in soil_type: # For SS samples, we MUST use N-value to calculate Su, regardless of any other Su data if n_value is not None: calculated_su = self.calculate_su_from_n(n_value) enhanced_layer["strength_parameter"] = "Su" enhanced_layer["strength_value"] = calculated_su enhanced_layer["su_source"] = f"SS Sample: Calculated from raw N={n_value} (Su=5*N)" enhanced_layer["original_spt"] = n_value # Override any existing Su values for SS samples if su_value is not None and su_value != calculated_su: enhanced_layer["ignored_unconfined_su"] = su_value st.warning(f"⚠️ SS Sample Layer {enhanced_layer.get('layer_id', 'Unknown')}: Ignored unconfined Su={su_value:.0f}, using calculated Su={calculated_su:.0f} kPa from N={n_value}") st.success(f"✅ SS Sample Layer {enhanced_layer.get('layer_id', 'Unknown')}: Su = 5 × {n_value} = {calculated_su:.0f} kPa") else: st.error(f"❌ SS Sample Layer {enhanced_layer.get('layer_id', 'Unknown')}: No N-value found for Su calculation") # For ST samples, preserve direct Su measurements elif sample_type == "ST" and su_value is not None: enhanced_layer["su_source"] = su_source or "ST Sample: Direct measurement from Unconfined Compression Test" st.success(f"✅ ST Sample Layer {enhanced_layer.get('layer_id', 'Unknown')}: Using direct Su={su_value:.0f} kPa") # For other cases (no sample type specified), use previous logic but prioritize sample identification else: # Try to identify sample type from available data if n_value is not None and su_value is None and "clay" in soil_type: # Only calculate Su from N-value if no direct Su available (likely SS sample) calculated_su = self.calculate_su_from_n(n_value) enhanced_layer["calculated_su"] = calculated_su enhanced_layer["su_source"] = f"Calculated from N={n_value} (Su=5*N) - assumed SS sample" st.info(f"🔬 Layer {enhanced_layer.get('layer_id', 'Unknown')}: Calculated Su={calculated_su:.0f} kPa from N={n_value} (assumed SS)") elif su_value is not None: # Preserve direct Su values (likely ST sample) enhanced_layer["su_source"] = su_source or "Direct measurement - assumed ST sample" st.success(f"✅ Layer {enhanced_layer.get('layer_id', 'Unknown')}: Using direct Su={su_value:.0f} kPa (assumed ST)") # Handle sand/silt friction angle calculation if "sand" in soil_type and n_value is not None: # Calculate friction angle for sand mid_depth = (depth_from + depth_to) / 2 effective_stress = 20 * mid_depth # Approximate effective stress (kPa) sand_type_classification = "medium_sand" if "fine" in soil_type: sand_type_classification = "fine_sand" elif "coarse" in soil_type: sand_type_classification = "coarse_sand" elif "silt" in soil_type: sand_type_classification = "silty_sand" friction_angle = self.calculate_friction_angle_peck( n_value, sand_type_classification, effective_stress ) enhanced_layer["friction_angle"] = friction_angle enhanced_layer["friction_angle_source"] = f"Peck method from raw N={n_value}" if sample_type == "SS": st.success(f"✅ SS Sample Layer {enhanced_layer.get('layer_id', 'Unknown')}: φ = {friction_angle:.1f}° from N={n_value}") else: st.info(f"📊 Layer {enhanced_layer.get('layer_id', 'Unknown')}: φ = {friction_angle:.1f}° from N={n_value}") # Update consistency classification consistency = self.classify_soil_consistency(soil_type, n_value, su_value) if consistency != "unknown": enhanced_layer["consistency"] = consistency # Keep soil_type as basic type (clay, sand, silt) base_soil = "clay" if "clay" in soil_type else \ "sand" if "sand" in soil_type else \ "silt" if "silt" in soil_type else \ "gravel" if "gravel" in soil_type else soil_type # Remove any existing consistency terms from soil_type for consistency_term in ["very soft", "soft", "medium", "stiff", "very stiff", "hard", "very loose", "loose", "medium dense", "dense", "very dense"]: base_soil = base_soil.replace(consistency_term, "").strip() enhanced_layer["soil_type"] = base_soil enhanced_layers.append(enhanced_layer) return enhanced_layers def validate_soil_classification(self, soil_data: Dict) -> Dict: """Validate and improve soil classification""" if "soil_layers" not in soil_data: return soil_data layers = soil_data["soil_layers"] validated_layers = [] for layer in layers: validated_layer = layer.copy() # Check consistency between soil type and strength parameters soil_type = layer.get("soil_type", "").lower() strength_param = layer.get("strength_parameter", "") strength_value = layer.get("strength_value") # Fix parameter mismatches if "clay" in soil_type and strength_param == "SPT-N" and strength_value: # Clay should use Su, but if only N is available, calculate Su calculated_su = self.calculate_su_from_n(strength_value) validated_layer["calculated_su"] = calculated_su validated_layer["su_source"] = f"Calculated from N={strength_value}" elif "sand" in soil_type and strength_param == "Su": # Sand should not have Su parameter validated_layer["strength_parameter"] = "SPT-N" validated_layer["parameter_note"] = "Corrected from Su to SPT-N for sand" # Validate depth ranges if validated_layer.get("depth_from") >= validated_layer.get("depth_to"): # Fix invalid depth ranges depth_from = validated_layer.get("depth_from", 0) validated_layer["depth_to"] = depth_from + 1.0 # Default 1m thickness validated_layer["depth_note"] = "Corrected invalid depth range" validated_layers.append(validated_layer) soil_data["soil_layers"] = validated_layers return soil_data def process_with_ss_st_classification(self, soil_data: Dict[str, Any]) -> Dict[str, Any]: """ Process soil data with SS/ST sample classification """ try: from soil_classification import SoilClassificationProcessor if "soil_layers" not in soil_data: return soil_data # Initialize the enhanced processor processor = SoilClassificationProcessor() # Process layers with SS/ST classification enhanced_layers = processor.process_soil_layers(soil_data["soil_layers"]) # Update soil data soil_data["soil_layers"] = enhanced_layers # Add processing summary processing_summary = processor.get_processing_summary(enhanced_layers) soil_data["processing_summary"] = processing_summary # Display processing summary st.subheader("📊 SS/ST Processing Summary") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Total Layers", processing_summary['total_layers']) st.metric("ST Samples", processing_summary['st_samples']) with col2: st.metric("SS Samples", processing_summary['ss_samples']) st.metric("Clay Layers", processing_summary['clay_layers']) with col3: st.metric("Sand/Silt Layers", processing_summary['sand_layers']) st.metric("Su Calculated", processing_summary['su_calculated']) with col4: st.metric("φ Calculated", processing_summary['phi_calculated']) return soil_data except ImportError as e: st.warning(f"⚠️ Enhanced SS/ST classification not available: {str(e)}") return soil_data except Exception as e: st.error(f"❌ Error in SS/ST processing: {str(e)}") return soil_data