Spaces:
Sleeping
Sleeping
import numpy as np | |
import re | |
import streamlit as st | |
from typing import Dict, List, Any, Tuple | |
class SoilCalculations: | |
def __init__(self): | |
# Peck correlation coefficients for friction angle calculation | |
self.peck_coefficients = { | |
"fine_sand": {"a": 27.1, "b": 0.3}, | |
"medium_sand": {"a": 27.1, "b": 0.3}, | |
"coarse_sand": {"a": 27.1, "b": 0.3}, | |
"silty_sand": {"a": 25.4, "b": 0.3}, | |
"clayey_sand": {"a": 25.4, "b": 0.3} | |
} | |
def calculate_su_from_n(self, n_value: float, correlation_factor: float = 5.0) -> float: | |
"""Calculate undrained shear strength from SPT-N value for clay | |
Su = correlation_factor * N (typically 5-7 for clay)""" | |
if n_value is None or n_value <= 0: | |
return None | |
return correlation_factor * n_value | |
def calculate_friction_angle_peck(self, n_value: float, sand_type: str = "medium_sand", | |
effective_stress: float = 100.0) -> float: | |
"""Calculate friction angle using Peck correlation | |
ฯ = a + b * log10(N60) where N60 is corrected SPT value""" | |
if n_value is None or n_value <= 0: | |
return None | |
# Apply overburden correction (simplified) | |
n60_corrected = n_value * (effective_stress / 100.0) ** 0.5 | |
n60_corrected = min(n60_corrected, 50) # Cap at reasonable value | |
coeffs = self.peck_coefficients.get(sand_type, self.peck_coefficients["medium_sand"]) | |
friction_angle = coeffs["a"] + coeffs["b"] * np.log10(max(n60_corrected, 1)) | |
return min(friction_angle, 45) # Cap at reasonable maximum | |
def classify_soil_consistency(self, soil_type: str, n_value: float = None, su_value: float = None) -> str: | |
"""Classify soil consistency based on strength parameters""" | |
if "clay" in soil_type.lower() or "silt" in soil_type.lower(): | |
# Use Su for clay classification | |
if su_value is not None: | |
if su_value < 25: | |
return "very soft" | |
elif su_value < 50: | |
return "soft" | |
elif su_value < 100: | |
return "medium" | |
elif su_value < 200: | |
return "stiff" | |
elif su_value < 400: | |
return "very stiff" | |
else: | |
return "hard" | |
# Use N-value for clay if Su not available | |
elif n_value is not None: | |
if n_value < 2: | |
return "very soft" | |
elif n_value < 4: | |
return "soft" | |
elif n_value < 8: | |
return "medium" | |
elif n_value < 15: | |
return "stiff" | |
elif n_value < 30: | |
return "very stiff" | |
else: | |
return "hard" | |
elif "sand" in soil_type.lower() or "gravel" in soil_type.lower(): | |
# Use N-value for sand classification | |
if n_value is not None: | |
if n_value < 4: | |
return "very loose" | |
elif n_value < 10: | |
return "loose" | |
elif n_value < 30: | |
return "medium dense" | |
elif n_value < 50: | |
return "dense" | |
else: | |
return "very dense" | |
return "unknown" | |
def standardize_units(self, text: str) -> Tuple[str, Dict[str, str]]: | |
"""Standardize units in soil boring log text before LLM processing""" | |
unit_conversions = {} | |
standardized_text = text | |
# Convert feet to meters | |
feet_pattern = r'(\d+(?:\.\d+)?)\s*(?:ft|feet|\')' | |
feet_matches = re.findall(feet_pattern, standardized_text, re.IGNORECASE) | |
for match in feet_matches: | |
feet_value = float(match) | |
meters_value = feet_value * 0.3048 | |
old_text = f"{match}ft" | |
new_text = f"{meters_value:.1f}m" | |
standardized_text = standardized_text.replace(old_text, new_text) | |
unit_conversions[old_text] = new_text | |
# Convert psf to kPa | |
psf_pattern = r'(\d+(?:\.\d+)?)\s*(?:psf|lbs?/ftยฒ?)' | |
psf_matches = re.findall(psf_pattern, standardized_text, re.IGNORECASE) | |
for match in psf_matches: | |
psf_value = float(match) | |
kpa_value = psf_value * 0.047880259 | |
old_text = f"{match}psf" | |
new_text = f"{kpa_value:.1f}kPa" | |
standardized_text = standardized_text.replace(old_text, new_text) | |
unit_conversions[old_text] = new_text | |
# Convert psi to kPa | |
psi_pattern = r'(\d+(?:\.\d+)?)\s*(?:psi|lbs?/inยฒ?)' | |
psi_matches = re.findall(psi_pattern, standardized_text, re.IGNORECASE) | |
for match in psi_matches: | |
psi_value = float(match) | |
kpa_value = psi_value * 6.89476 | |
old_text = f"{match}psi" | |
new_text = f"{kpa_value:.1f}kPa" | |
standardized_text = standardized_text.replace(old_text, new_text) | |
unit_conversions[old_text] = new_text | |
# Convert ksc (kg/cmยฒ) to kPa | |
ksc_pattern = r'(\d+(?:\.\d+)?)\s*(?:ksc|kg/cmยฒ?|kg/cm2)' | |
ksc_matches = re.findall(ksc_pattern, standardized_text, re.IGNORECASE) | |
for match in ksc_matches: | |
ksc_value = float(match) | |
kpa_value = ksc_value * 98.0 | |
old_text = f"{match}ksc" | |
new_text = f"{kpa_value:.1f}kPa" | |
standardized_text = standardized_text.replace(old_text, new_text) | |
unit_conversions[old_text] = new_text | |
# Convert t/mยฒ (tonnes per square meter) to kPa | |
tonnes_pattern = r'(\d+(?:\.\d+)?)\s*(?:t/mยฒ?|ton/mยฒ?|tonnes?/mยฒ?|tonne/mยฒ?)' | |
tonnes_matches = re.findall(tonnes_pattern, standardized_text, re.IGNORECASE) | |
for match in tonnes_matches: | |
tonnes_value = float(match) | |
kpa_value = tonnes_value * 9.81 | |
old_text = f"{match}t/mยฒ" | |
new_text = f"{kpa_value:.1f}kPa" | |
standardized_text = standardized_text.replace(old_text, new_text) | |
unit_conversions[old_text] = new_text | |
# Standardize depth notation | |
depth_pattern = r'(\d+(?:\.\d+)?)\s*-\s*(\d+(?:\.\d+)?)\s*(?:ft|feet|\')' | |
standardized_text = re.sub(depth_pattern, | |
lambda m: f"{float(m.group(1))*0.3048:.1f}-{float(m.group(2))*0.3048:.1f}m", | |
standardized_text, flags=re.IGNORECASE) | |
return standardized_text, unit_conversions | |
def enhance_soil_layers(self, soil_layers: List[Dict]) -> List[Dict]: | |
"""Enhance soil layers with calculated parameters""" | |
enhanced_layers = [] | |
for layer in soil_layers: | |
enhanced_layer = layer.copy() | |
# Extract values | |
n_value = layer.get("strength_value") if layer.get("strength_parameter") == "SPT-N" else None | |
su_value = layer.get("strength_value") if layer.get("strength_parameter") == "Su" else None | |
soil_type = layer.get("soil_type", "").lower() | |
depth_from = layer.get("depth_from", 0) | |
depth_to = layer.get("depth_to", 0) | |
sample_type = layer.get("sample_type", "") | |
su_source = layer.get("su_source", "") | |
# CRITICAL RULE: For SS samples, ALWAYS use Su=5*N calculation, IGNORE unconfined compression Su | |
if sample_type == "SS" and "clay" in soil_type: | |
# For SS samples, we MUST use N-value to calculate Su, regardless of any other Su data | |
if n_value is not None: | |
calculated_su = self.calculate_su_from_n(n_value) | |
enhanced_layer["strength_parameter"] = "Su" | |
enhanced_layer["strength_value"] = calculated_su | |
enhanced_layer["su_source"] = f"SS Sample: Calculated from raw N={n_value} (Su=5*N)" | |
enhanced_layer["original_spt"] = n_value | |
# Override any existing Su values for SS samples | |
if su_value is not None and su_value != calculated_su: | |
enhanced_layer["ignored_unconfined_su"] = su_value | |
st.warning(f"โ ๏ธ SS Sample Layer {enhanced_layer.get('layer_id', 'Unknown')}: Ignored unconfined Su={su_value:.0f}, using calculated Su={calculated_su:.0f} kPa from N={n_value}") | |
st.success(f"โ SS Sample Layer {enhanced_layer.get('layer_id', 'Unknown')}: Su = 5 ร {n_value} = {calculated_su:.0f} kPa") | |
else: | |
st.error(f"โ SS Sample Layer {enhanced_layer.get('layer_id', 'Unknown')}: No N-value found for Su calculation") | |
# For ST samples, preserve direct Su measurements | |
elif sample_type == "ST" and su_value is not None: | |
enhanced_layer["su_source"] = su_source or "ST Sample: Direct measurement from Unconfined Compression Test" | |
st.success(f"โ ST Sample Layer {enhanced_layer.get('layer_id', 'Unknown')}: Using direct Su={su_value:.0f} kPa") | |
# For other cases (no sample type specified), use previous logic but prioritize sample identification | |
else: | |
# Try to identify sample type from available data | |
if n_value is not None and su_value is None and "clay" in soil_type: | |
# Only calculate Su from N-value if no direct Su available (likely SS sample) | |
calculated_su = self.calculate_su_from_n(n_value) | |
enhanced_layer["calculated_su"] = calculated_su | |
enhanced_layer["su_source"] = f"Calculated from N={n_value} (Su=5*N) - assumed SS sample" | |
st.info(f"๐ฌ Layer {enhanced_layer.get('layer_id', 'Unknown')}: Calculated Su={calculated_su:.0f} kPa from N={n_value} (assumed SS)") | |
elif su_value is not None: | |
# Preserve direct Su values (likely ST sample) | |
enhanced_layer["su_source"] = su_source or "Direct measurement - assumed ST sample" | |
st.success(f"โ Layer {enhanced_layer.get('layer_id', 'Unknown')}: Using direct Su={su_value:.0f} kPa (assumed ST)") | |
# Handle sand/silt friction angle calculation | |
if "sand" in soil_type and n_value is not None: | |
# Calculate friction angle for sand | |
mid_depth = (depth_from + depth_to) / 2 | |
effective_stress = 20 * mid_depth # Approximate effective stress (kPa) | |
sand_type_classification = "medium_sand" | |
if "fine" in soil_type: | |
sand_type_classification = "fine_sand" | |
elif "coarse" in soil_type: | |
sand_type_classification = "coarse_sand" | |
elif "silt" in soil_type: | |
sand_type_classification = "silty_sand" | |
friction_angle = self.calculate_friction_angle_peck( | |
n_value, sand_type_classification, effective_stress | |
) | |
enhanced_layer["friction_angle"] = friction_angle | |
enhanced_layer["friction_angle_source"] = f"Peck method from raw N={n_value}" | |
if sample_type == "SS": | |
st.success(f"โ SS Sample Layer {enhanced_layer.get('layer_id', 'Unknown')}: ฯ = {friction_angle:.1f}ยฐ from N={n_value}") | |
else: | |
st.info(f"๐ Layer {enhanced_layer.get('layer_id', 'Unknown')}: ฯ = {friction_angle:.1f}ยฐ from N={n_value}") | |
# Update consistency classification | |
consistency = self.classify_soil_consistency(soil_type, n_value, su_value) | |
if consistency != "unknown": | |
enhanced_layer["consistency"] = consistency | |
# Keep soil_type as basic type (clay, sand, silt) | |
base_soil = "clay" if "clay" in soil_type else \ | |
"sand" if "sand" in soil_type else \ | |
"silt" if "silt" in soil_type else \ | |
"gravel" if "gravel" in soil_type else soil_type | |
# Remove any existing consistency terms from soil_type | |
for consistency_term in ["very soft", "soft", "medium", "stiff", "very stiff", "hard", | |
"very loose", "loose", "medium dense", "dense", "very dense"]: | |
base_soil = base_soil.replace(consistency_term, "").strip() | |
enhanced_layer["soil_type"] = base_soil | |
enhanced_layers.append(enhanced_layer) | |
return enhanced_layers | |
def validate_soil_classification(self, soil_data: Dict) -> Dict: | |
"""Validate and improve soil classification""" | |
if "soil_layers" not in soil_data: | |
return soil_data | |
layers = soil_data["soil_layers"] | |
validated_layers = [] | |
for layer in layers: | |
validated_layer = layer.copy() | |
# Check consistency between soil type and strength parameters | |
soil_type = layer.get("soil_type", "").lower() | |
strength_param = layer.get("strength_parameter", "") | |
strength_value = layer.get("strength_value") | |
# Fix parameter mismatches | |
if "clay" in soil_type and strength_param == "SPT-N" and strength_value: | |
# Clay should use Su, but if only N is available, calculate Su | |
calculated_su = self.calculate_su_from_n(strength_value) | |
validated_layer["calculated_su"] = calculated_su | |
validated_layer["su_source"] = f"Calculated from N={strength_value}" | |
elif "sand" in soil_type and strength_param == "Su": | |
# Sand should not have Su parameter | |
validated_layer["strength_parameter"] = "SPT-N" | |
validated_layer["parameter_note"] = "Corrected from Su to SPT-N for sand" | |
# Validate depth ranges | |
if validated_layer.get("depth_from") >= validated_layer.get("depth_to"): | |
# Fix invalid depth ranges | |
depth_from = validated_layer.get("depth_from", 0) | |
validated_layer["depth_to"] = depth_from + 1.0 # Default 1m thickness | |
validated_layer["depth_note"] = "Corrected invalid depth range" | |
validated_layers.append(validated_layer) | |
soil_data["soil_layers"] = validated_layers | |
return soil_data | |
def process_with_ss_st_classification(self, soil_data: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Process soil data with SS/ST sample classification | |
""" | |
try: | |
from soil_classification import SoilClassificationProcessor | |
if "soil_layers" not in soil_data: | |
return soil_data | |
# Initialize the enhanced processor | |
processor = SoilClassificationProcessor() | |
# Process layers with SS/ST classification | |
enhanced_layers = processor.process_soil_layers(soil_data["soil_layers"]) | |
# Update soil data | |
soil_data["soil_layers"] = enhanced_layers | |
# Add processing summary | |
processing_summary = processor.get_processing_summary(enhanced_layers) | |
soil_data["processing_summary"] = processing_summary | |
# Display processing summary | |
st.subheader("๐ SS/ST Processing Summary") | |
col1, col2, col3, col4 = st.columns(4) | |
with col1: | |
st.metric("Total Layers", processing_summary['total_layers']) | |
st.metric("ST Samples", processing_summary['st_samples']) | |
with col2: | |
st.metric("SS Samples", processing_summary['ss_samples']) | |
st.metric("Clay Layers", processing_summary['clay_layers']) | |
with col3: | |
st.metric("Sand/Silt Layers", processing_summary['sand_layers']) | |
st.metric("Su Calculated", processing_summary['su_calculated']) | |
with col4: | |
st.metric("ฯ Calculated", processing_summary['phi_calculated']) | |
return soil_data | |
except ImportError as e: | |
st.warning(f"โ ๏ธ Enhanced SS/ST classification not available: {str(e)}") | |
return soil_data | |
except Exception as e: | |
st.error(f"โ Error in SS/ST processing: {str(e)}") | |
return soil_data |