Spaces:
Sleeping
Sleeping
import numpy as np | |
from typing import List, Dict, Any | |
import streamlit as st | |
from nearest_neighbor_grouping import NearestNeighborGrouping | |
class SoilLayerAnalyzer: | |
def __init__(self): | |
self.consistency_mapping = { | |
"soft": 1, "loose": 1, | |
"medium": 2, "medium dense": 2, | |
"stiff": 3, "dense": 3, | |
"very stiff": 4, "very dense": 4, | |
"hard": 5 | |
} | |
self.nn_grouping = NearestNeighborGrouping() | |
def validate_layer_continuity(self, layers: List[Dict]) -> List[Dict]: | |
"""Validate and fix layer depth continuity""" | |
if not layers: | |
return layers | |
# Sort layers by depth_from | |
sorted_layers = sorted(layers, key=lambda x: x.get("depth_from", 0)) | |
validated_layers = [] | |
for i, layer in enumerate(sorted_layers): | |
if i == 0: | |
# First layer starts from 0 | |
layer["depth_from"] = 0 | |
else: | |
# Each layer starts where previous ends | |
layer["depth_from"] = validated_layers[-1]["depth_to"] | |
validated_layers.append(layer) | |
return validated_layers | |
def identify_similar_layers(self, layers: List[Dict], similarity_threshold: float = 0.8) -> List[List[int]]: | |
"""Identify layers that could potentially be grouped together""" | |
similar_groups = [] | |
for i, layer1 in enumerate(layers): | |
for j, layer2 in enumerate(layers[i+1:], i+1): | |
similarity_score = self._calculate_layer_similarity(layer1, layer2) | |
if similarity_score >= similarity_threshold: | |
# Check if either layer is already in a group | |
group_found = False | |
for group in similar_groups: | |
if i in group: | |
if j not in group: | |
group.append(j) | |
group_found = True | |
break | |
elif j in group: | |
if i not in group: | |
group.append(i) | |
group_found = True | |
break | |
if not group_found: | |
similar_groups.append([i, j]) | |
return similar_groups | |
def _calculate_layer_similarity(self, layer1: Dict, layer2: Dict) -> float: | |
"""Calculate similarity score between two layers""" | |
score = 0.0 | |
total_weight = 0.0 | |
# Soil type similarity (weight: 0.4) | |
if layer1.get("soil_type", "").lower() == layer2.get("soil_type", "").lower(): | |
score += 0.4 | |
total_weight += 0.4 | |
# Strength parameter similarity (weight: 0.3) | |
strength1 = layer1.get("strength_value") | |
strength2 = layer2.get("strength_value") | |
if strength1 is not None and strength2 is not None: | |
if abs(strength1 - strength2) / max(strength1, strength2) < 0.3: | |
score += 0.3 | |
total_weight += 0.3 | |
# Consistency similarity (weight: 0.2) | |
consistency1 = self._extract_consistency(layer1.get("soil_type", "")) | |
consistency2 = self._extract_consistency(layer2.get("soil_type", "")) | |
if consistency1 == consistency2: | |
score += 0.2 | |
total_weight += 0.2 | |
# Color similarity (weight: 0.1) | |
color1 = layer1.get("color") or "" | |
color2 = layer2.get("color") or "" | |
if color1.lower() == color2.lower(): | |
score += 0.1 | |
total_weight += 0.1 | |
return score / total_weight if total_weight > 0 else 0.0 | |
def _extract_consistency(self, soil_type: str) -> str: | |
"""Extract consistency from soil type description""" | |
soil_type_lower = soil_type.lower() | |
for consistency in self.consistency_mapping.keys(): | |
if consistency in soil_type_lower: | |
return consistency | |
return "" | |
def suggest_layer_merging(self, layers: List[Dict]) -> Dict[str, Any]: | |
"""Suggest which layers could be merged""" | |
similar_groups = self.identify_similar_layers(layers) | |
suggestions = [] | |
for group in similar_groups: | |
if len(group) >= 2: | |
group_layers = [layers[i] for i in group] | |
# Check if layers are adjacent or close | |
depths = [(layer["depth_from"], layer["depth_to"]) for layer in group_layers] | |
depths.sort() | |
# Check for adjacency | |
is_adjacent = True | |
for i in range(len(depths) - 1): | |
if abs(depths[i][1] - depths[i+1][0]) > 0.5: # 0.5m tolerance | |
is_adjacent = False | |
break | |
if is_adjacent: | |
suggestions.append({ | |
"layer_indices": group, | |
"reason": "Similar soil properties and adjacent depths", | |
"merged_layer": self._create_merged_layer(group_layers) | |
}) | |
return {"suggestions": suggestions} | |
def _create_merged_layer(self, layers: List[Dict]) -> Dict: | |
"""Create a merged layer from multiple similar layers""" | |
if not layers: | |
return {} | |
merged = { | |
"layer_id": f"merged_{layers[0]['layer_id']}_{layers[-1]['layer_id']}", | |
"depth_from": min(layer["depth_from"] for layer in layers), | |
"depth_to": max(layer["depth_to"] for layer in layers), | |
"soil_type": layers[0]["soil_type"], # Use first layer's type | |
"description": f"Merged layer: {', '.join([layer.get('description', '') for layer in layers])}", | |
"strength_parameter": layers[0].get("strength_parameter", ""), | |
"strength_value": np.mean([layer.get("strength_value", 0) for layer in layers if layer.get("strength_value") is not None]), | |
"color": layers[0].get("color", ""), | |
"moisture": layers[0].get("moisture", ""), | |
"consistency": layers[0].get("consistency", "") | |
} | |
return merged | |
def suggest_layer_splitting(self, layers: List[Dict]) -> Dict[str, Any]: | |
"""Suggest which layers should be split based on thickness and variability""" | |
suggestions = [] | |
for i, layer in enumerate(layers): | |
thickness = layer["depth_to"] - layer["depth_from"] | |
# Suggest splitting very thick layers (>5m) | |
if thickness > 5.0: | |
suggested_splits = int(thickness / 2.5) # Split into ~2.5m sublayers | |
suggestions.append({ | |
"layer_index": i, | |
"reason": f"Layer is very thick ({thickness:.1f}m) - consider splitting into {suggested_splits} sublayers", | |
"suggested_depths": np.linspace(layer["depth_from"], layer["depth_to"], suggested_splits + 1).tolist() | |
}) | |
# Check for significant strength variation indication | |
description = layer.get("description", "").lower() | |
if any(word in description for word in ["varying", "variable", "interbedded", "alternating"]): | |
suggestions.append({ | |
"layer_index": i, | |
"reason": "Description indicates variable conditions - consider splitting based on detailed log", | |
"suggested_depths": [layer["depth_from"], (layer["depth_from"] + layer["depth_to"])/2, layer["depth_to"]] | |
}) | |
return {"suggestions": suggestions} | |
def optimize_layer_division(self, layers: List[Dict], merge_similar=True, split_thick=True) -> Dict[str, Any]: | |
"""Optimize layer division by merging similar layers and splitting thick ones""" | |
optimized_layers = layers.copy() | |
changes_made = [] | |
# Traditional merge suggestions | |
merge_suggestions = {"suggestions": []} | |
if merge_similar: | |
merge_suggestions = self.suggest_layer_merging(optimized_layers) | |
for suggestion in merge_suggestions["suggestions"]: | |
changes_made.append(f"Merged layers {suggestion['layer_indices']}: {suggestion['reason']}") | |
# Nearest neighbor analysis | |
nn_analysis = self.analyze_nearest_neighbors(optimized_layers) | |
# Split suggestions | |
split_suggestions = {"suggestions": []} | |
if split_thick: | |
split_suggestions = self.suggest_layer_splitting(optimized_layers) | |
for suggestion in split_suggestions["suggestions"]: | |
changes_made.append(f"Suggested splitting layer {suggestion['layer_index']}: {suggestion['reason']}") | |
return { | |
"optimized_layers": optimized_layers, | |
"changes_made": changes_made, | |
"merge_suggestions": merge_suggestions, | |
"split_suggestions": split_suggestions, | |
"nearest_neighbor_analysis": nn_analysis | |
} | |
def analyze_nearest_neighbors(self, layers: List[Dict], k: int = 3, similarity_threshold: float = 0.55) -> Dict[str, Any]: | |
"""Perform nearest neighbor analysis on soil layers""" | |
if len(layers) < 2: | |
return {"message": "Insufficient layers for neighbor analysis"} | |
try: | |
# Get nearest neighbor analysis | |
nn_suggestions = self.nn_grouping.suggest_layer_merging(layers, similarity_threshold) | |
# Get detailed neighbor report | |
neighbor_report = self.nn_grouping.get_layer_neighbors_report(layers, k) | |
return { | |
"neighbor_groups": nn_suggestions.get("groups", []), | |
"merge_recommendations": nn_suggestions.get("recommendations", []), | |
"cluster_labels": nn_suggestions.get("cluster_labels", []), | |
"neighbor_report": neighbor_report, | |
"analysis_parameters": { | |
"similarity_threshold": similarity_threshold, | |
"k_neighbors": k, | |
"total_layers": len(layers) | |
} | |
} | |
except Exception as e: | |
st.error(f"Error in nearest neighbor analysis: {str(e)}") | |
return {"error": str(e)} | |
def get_grouping_summary(self, layers: List[Dict]) -> Dict[str, Any]: | |
"""Get a comprehensive summary of layer grouping analysis""" | |
nn_analysis = self.analyze_nearest_neighbors(layers) | |
if "error" in nn_analysis: | |
return nn_analysis | |
summary = { | |
"total_layers": len(layers), | |
"identified_groups": len(nn_analysis.get("neighbor_groups", [])), | |
"merge_recommendations": len(nn_analysis.get("merge_recommendations", [])), | |
"group_details": [] | |
} | |
# Add details for each group | |
for i, group in enumerate(nn_analysis.get("neighbor_groups", [])): | |
group_detail = { | |
"group_id": group.get("group_id", i+1), | |
"layers_in_group": group.get("group_size", 0), | |
"depth_range": f"{group.get('depth_range', {}).get('min', 0):.1f}-{group.get('depth_range', {}).get('max', 0):.1f}m", | |
"total_thickness": group.get('depth_range', {}).get('total_thickness', 0), | |
"dominant_soil_type": max(group.get('soil_types', {}).items(), key=lambda x: x[1])[0] if group.get('soil_types') else "unknown", | |
"layer_ids": group.get("layer_ids", []) | |
} | |
summary["group_details"].append(group_detail) | |
return summary | |
def calculate_layer_statistics(self, layers: List[Dict]) -> Dict[str, Any]: | |
"""Calculate statistics for the soil profile""" | |
if not layers: | |
return {} | |
total_depth = max(layer["depth_to"] for layer in layers) | |
layer_count = len(layers) | |
# Soil type distribution | |
soil_types = {} | |
for layer in layers: | |
soil_type = layer.get("soil_type", "unknown") | |
thickness = layer["depth_to"] - layer["depth_from"] | |
if soil_type in soil_types: | |
soil_types[soil_type] += thickness | |
else: | |
soil_types[soil_type] = thickness | |
# Convert to percentages | |
soil_type_percentages = {k: (v/total_depth)*100 for k, v in soil_types.items()} | |
# Average layer thickness | |
thicknesses = [layer["depth_to"] - layer["depth_from"] for layer in layers] | |
avg_thickness = np.mean(thicknesses) | |
return { | |
"total_depth": total_depth, | |
"layer_count": layer_count, | |
"average_layer_thickness": avg_thickness, | |
"soil_type_distribution": soil_type_percentages, | |
"thickest_layer": max(thicknesses), | |
"thinnest_layer": min(thicknesses) | |
} |