soil_profile / soil_analyzer.py
Sompote's picture
Upload 17 files
2c200f8 verified
import numpy as np
from typing import List, Dict, Any
import streamlit as st
from nearest_neighbor_grouping import NearestNeighborGrouping
class SoilLayerAnalyzer:
def __init__(self):
self.consistency_mapping = {
"soft": 1, "loose": 1,
"medium": 2, "medium dense": 2,
"stiff": 3, "dense": 3,
"very stiff": 4, "very dense": 4,
"hard": 5
}
self.nn_grouping = NearestNeighborGrouping()
def validate_layer_continuity(self, layers: List[Dict]) -> List[Dict]:
"""Validate and fix layer depth continuity"""
if not layers:
return layers
# Sort layers by depth_from
sorted_layers = sorted(layers, key=lambda x: x.get("depth_from", 0))
validated_layers = []
for i, layer in enumerate(sorted_layers):
if i == 0:
# First layer starts from 0
layer["depth_from"] = 0
else:
# Each layer starts where previous ends
layer["depth_from"] = validated_layers[-1]["depth_to"]
validated_layers.append(layer)
return validated_layers
def identify_similar_layers(self, layers: List[Dict], similarity_threshold: float = 0.8) -> List[List[int]]:
"""Identify layers that could potentially be grouped together"""
similar_groups = []
for i, layer1 in enumerate(layers):
for j, layer2 in enumerate(layers[i+1:], i+1):
similarity_score = self._calculate_layer_similarity(layer1, layer2)
if similarity_score >= similarity_threshold:
# Check if either layer is already in a group
group_found = False
for group in similar_groups:
if i in group:
if j not in group:
group.append(j)
group_found = True
break
elif j in group:
if i not in group:
group.append(i)
group_found = True
break
if not group_found:
similar_groups.append([i, j])
return similar_groups
def _calculate_layer_similarity(self, layer1: Dict, layer2: Dict) -> float:
"""Calculate similarity score between two layers"""
score = 0.0
total_weight = 0.0
# Soil type similarity (weight: 0.4)
if layer1.get("soil_type", "").lower() == layer2.get("soil_type", "").lower():
score += 0.4
total_weight += 0.4
# Strength parameter similarity (weight: 0.3)
strength1 = layer1.get("strength_value")
strength2 = layer2.get("strength_value")
if strength1 is not None and strength2 is not None:
if abs(strength1 - strength2) / max(strength1, strength2) < 0.3:
score += 0.3
total_weight += 0.3
# Consistency similarity (weight: 0.2)
consistency1 = self._extract_consistency(layer1.get("soil_type", ""))
consistency2 = self._extract_consistency(layer2.get("soil_type", ""))
if consistency1 == consistency2:
score += 0.2
total_weight += 0.2
# Color similarity (weight: 0.1)
color1 = layer1.get("color") or ""
color2 = layer2.get("color") or ""
if color1.lower() == color2.lower():
score += 0.1
total_weight += 0.1
return score / total_weight if total_weight > 0 else 0.0
def _extract_consistency(self, soil_type: str) -> str:
"""Extract consistency from soil type description"""
soil_type_lower = soil_type.lower()
for consistency in self.consistency_mapping.keys():
if consistency in soil_type_lower:
return consistency
return ""
def suggest_layer_merging(self, layers: List[Dict]) -> Dict[str, Any]:
"""Suggest which layers could be merged"""
similar_groups = self.identify_similar_layers(layers)
suggestions = []
for group in similar_groups:
if len(group) >= 2:
group_layers = [layers[i] for i in group]
# Check if layers are adjacent or close
depths = [(layer["depth_from"], layer["depth_to"]) for layer in group_layers]
depths.sort()
# Check for adjacency
is_adjacent = True
for i in range(len(depths) - 1):
if abs(depths[i][1] - depths[i+1][0]) > 0.5: # 0.5m tolerance
is_adjacent = False
break
if is_adjacent:
suggestions.append({
"layer_indices": group,
"reason": "Similar soil properties and adjacent depths",
"merged_layer": self._create_merged_layer(group_layers)
})
return {"suggestions": suggestions}
def _create_merged_layer(self, layers: List[Dict]) -> Dict:
"""Create a merged layer from multiple similar layers"""
if not layers:
return {}
merged = {
"layer_id": f"merged_{layers[0]['layer_id']}_{layers[-1]['layer_id']}",
"depth_from": min(layer["depth_from"] for layer in layers),
"depth_to": max(layer["depth_to"] for layer in layers),
"soil_type": layers[0]["soil_type"], # Use first layer's type
"description": f"Merged layer: {', '.join([layer.get('description', '') for layer in layers])}",
"strength_parameter": layers[0].get("strength_parameter", ""),
"strength_value": np.mean([layer.get("strength_value", 0) for layer in layers if layer.get("strength_value") is not None]),
"color": layers[0].get("color", ""),
"moisture": layers[0].get("moisture", ""),
"consistency": layers[0].get("consistency", "")
}
return merged
def suggest_layer_splitting(self, layers: List[Dict]) -> Dict[str, Any]:
"""Suggest which layers should be split based on thickness and variability"""
suggestions = []
for i, layer in enumerate(layers):
thickness = layer["depth_to"] - layer["depth_from"]
# Suggest splitting very thick layers (>5m)
if thickness > 5.0:
suggested_splits = int(thickness / 2.5) # Split into ~2.5m sublayers
suggestions.append({
"layer_index": i,
"reason": f"Layer is very thick ({thickness:.1f}m) - consider splitting into {suggested_splits} sublayers",
"suggested_depths": np.linspace(layer["depth_from"], layer["depth_to"], suggested_splits + 1).tolist()
})
# Check for significant strength variation indication
description = layer.get("description", "").lower()
if any(word in description for word in ["varying", "variable", "interbedded", "alternating"]):
suggestions.append({
"layer_index": i,
"reason": "Description indicates variable conditions - consider splitting based on detailed log",
"suggested_depths": [layer["depth_from"], (layer["depth_from"] + layer["depth_to"])/2, layer["depth_to"]]
})
return {"suggestions": suggestions}
def optimize_layer_division(self, layers: List[Dict], merge_similar=True, split_thick=True) -> Dict[str, Any]:
"""Optimize layer division by merging similar layers and splitting thick ones"""
optimized_layers = layers.copy()
changes_made = []
# Traditional merge suggestions
merge_suggestions = {"suggestions": []}
if merge_similar:
merge_suggestions = self.suggest_layer_merging(optimized_layers)
for suggestion in merge_suggestions["suggestions"]:
changes_made.append(f"Merged layers {suggestion['layer_indices']}: {suggestion['reason']}")
# Nearest neighbor analysis
nn_analysis = self.analyze_nearest_neighbors(optimized_layers)
# Split suggestions
split_suggestions = {"suggestions": []}
if split_thick:
split_suggestions = self.suggest_layer_splitting(optimized_layers)
for suggestion in split_suggestions["suggestions"]:
changes_made.append(f"Suggested splitting layer {suggestion['layer_index']}: {suggestion['reason']}")
return {
"optimized_layers": optimized_layers,
"changes_made": changes_made,
"merge_suggestions": merge_suggestions,
"split_suggestions": split_suggestions,
"nearest_neighbor_analysis": nn_analysis
}
def analyze_nearest_neighbors(self, layers: List[Dict], k: int = 3, similarity_threshold: float = 0.55) -> Dict[str, Any]:
"""Perform nearest neighbor analysis on soil layers"""
if len(layers) < 2:
return {"message": "Insufficient layers for neighbor analysis"}
try:
# Get nearest neighbor analysis
nn_suggestions = self.nn_grouping.suggest_layer_merging(layers, similarity_threshold)
# Get detailed neighbor report
neighbor_report = self.nn_grouping.get_layer_neighbors_report(layers, k)
return {
"neighbor_groups": nn_suggestions.get("groups", []),
"merge_recommendations": nn_suggestions.get("recommendations", []),
"cluster_labels": nn_suggestions.get("cluster_labels", []),
"neighbor_report": neighbor_report,
"analysis_parameters": {
"similarity_threshold": similarity_threshold,
"k_neighbors": k,
"total_layers": len(layers)
}
}
except Exception as e:
st.error(f"Error in nearest neighbor analysis: {str(e)}")
return {"error": str(e)}
def get_grouping_summary(self, layers: List[Dict]) -> Dict[str, Any]:
"""Get a comprehensive summary of layer grouping analysis"""
nn_analysis = self.analyze_nearest_neighbors(layers)
if "error" in nn_analysis:
return nn_analysis
summary = {
"total_layers": len(layers),
"identified_groups": len(nn_analysis.get("neighbor_groups", [])),
"merge_recommendations": len(nn_analysis.get("merge_recommendations", [])),
"group_details": []
}
# Add details for each group
for i, group in enumerate(nn_analysis.get("neighbor_groups", [])):
group_detail = {
"group_id": group.get("group_id", i+1),
"layers_in_group": group.get("group_size", 0),
"depth_range": f"{group.get('depth_range', {}).get('min', 0):.1f}-{group.get('depth_range', {}).get('max', 0):.1f}m",
"total_thickness": group.get('depth_range', {}).get('total_thickness', 0),
"dominant_soil_type": max(group.get('soil_types', {}).items(), key=lambda x: x[1])[0] if group.get('soil_types') else "unknown",
"layer_ids": group.get("layer_ids", [])
}
summary["group_details"].append(group_detail)
return summary
def calculate_layer_statistics(self, layers: List[Dict]) -> Dict[str, Any]:
"""Calculate statistics for the soil profile"""
if not layers:
return {}
total_depth = max(layer["depth_to"] for layer in layers)
layer_count = len(layers)
# Soil type distribution
soil_types = {}
for layer in layers:
soil_type = layer.get("soil_type", "unknown")
thickness = layer["depth_to"] - layer["depth_from"]
if soil_type in soil_types:
soil_types[soil_type] += thickness
else:
soil_types[soil_type] = thickness
# Convert to percentages
soil_type_percentages = {k: (v/total_depth)*100 for k, v in soil_types.items()}
# Average layer thickness
thicknesses = [layer["depth_to"] - layer["depth_from"] for layer in layers]
avg_thickness = np.mean(thicknesses)
return {
"total_depth": total_depth,
"layer_count": layer_count,
"average_layer_thickness": avg_thickness,
"soil_type_distribution": soil_type_percentages,
"thickest_layer": max(thicknesses),
"thinnest_layer": min(thicknesses)
}