import numpy as np from typing import List, Dict, Any import streamlit as st from nearest_neighbor_grouping import NearestNeighborGrouping class SoilLayerAnalyzer: def __init__(self): self.consistency_mapping = { "soft": 1, "loose": 1, "medium": 2, "medium dense": 2, "stiff": 3, "dense": 3, "very stiff": 4, "very dense": 4, "hard": 5 } self.nn_grouping = NearestNeighborGrouping() def validate_layer_continuity(self, layers: List[Dict]) -> List[Dict]: """Validate and fix layer depth continuity""" if not layers: return layers # Sort layers by depth_from sorted_layers = sorted(layers, key=lambda x: x.get("depth_from", 0)) validated_layers = [] for i, layer in enumerate(sorted_layers): if i == 0: # First layer starts from 0 layer["depth_from"] = 0 else: # Each layer starts where previous ends layer["depth_from"] = validated_layers[-1]["depth_to"] validated_layers.append(layer) return validated_layers def identify_similar_layers(self, layers: List[Dict], similarity_threshold: float = 0.8) -> List[List[int]]: """Identify layers that could potentially be grouped together""" similar_groups = [] for i, layer1 in enumerate(layers): for j, layer2 in enumerate(layers[i+1:], i+1): similarity_score = self._calculate_layer_similarity(layer1, layer2) if similarity_score >= similarity_threshold: # Check if either layer is already in a group group_found = False for group in similar_groups: if i in group: if j not in group: group.append(j) group_found = True break elif j in group: if i not in group: group.append(i) group_found = True break if not group_found: similar_groups.append([i, j]) return similar_groups def _calculate_layer_similarity(self, layer1: Dict, layer2: Dict) -> float: """Calculate similarity score between two layers""" score = 0.0 total_weight = 0.0 # Soil type similarity (weight: 0.4) if layer1.get("soil_type", "").lower() == layer2.get("soil_type", "").lower(): score += 0.4 total_weight += 0.4 # Strength parameter similarity (weight: 0.3) strength1 = layer1.get("strength_value") strength2 = layer2.get("strength_value") if strength1 is not None and strength2 is not None: if abs(strength1 - strength2) / max(strength1, strength2) < 0.3: score += 0.3 total_weight += 0.3 # Consistency similarity (weight: 0.2) consistency1 = self._extract_consistency(layer1.get("soil_type", "")) consistency2 = self._extract_consistency(layer2.get("soil_type", "")) if consistency1 == consistency2: score += 0.2 total_weight += 0.2 # Color similarity (weight: 0.1) color1 = layer1.get("color") or "" color2 = layer2.get("color") or "" if color1.lower() == color2.lower(): score += 0.1 total_weight += 0.1 return score / total_weight if total_weight > 0 else 0.0 def _extract_consistency(self, soil_type: str) -> str: """Extract consistency from soil type description""" soil_type_lower = soil_type.lower() for consistency in self.consistency_mapping.keys(): if consistency in soil_type_lower: return consistency return "" def suggest_layer_merging(self, layers: List[Dict]) -> Dict[str, Any]: """Suggest which layers could be merged""" similar_groups = self.identify_similar_layers(layers) suggestions = [] for group in similar_groups: if len(group) >= 2: group_layers = [layers[i] for i in group] # Check if layers are adjacent or close depths = [(layer["depth_from"], layer["depth_to"]) for layer in group_layers] depths.sort() # Check for adjacency is_adjacent = True for i in range(len(depths) - 1): if abs(depths[i][1] - depths[i+1][0]) > 0.5: # 0.5m tolerance is_adjacent = False break if is_adjacent: suggestions.append({ "layer_indices": group, "reason": "Similar soil properties and adjacent depths", "merged_layer": self._create_merged_layer(group_layers) }) return {"suggestions": suggestions} def _create_merged_layer(self, layers: List[Dict]) -> Dict: """Create a merged layer from multiple similar layers""" if not layers: return {} merged = { "layer_id": f"merged_{layers[0]['layer_id']}_{layers[-1]['layer_id']}", "depth_from": min(layer["depth_from"] for layer in layers), "depth_to": max(layer["depth_to"] for layer in layers), "soil_type": layers[0]["soil_type"], # Use first layer's type "description": f"Merged layer: {', '.join([layer.get('description', '') for layer in layers])}", "strength_parameter": layers[0].get("strength_parameter", ""), "strength_value": np.mean([layer.get("strength_value", 0) for layer in layers if layer.get("strength_value") is not None]), "color": layers[0].get("color", ""), "moisture": layers[0].get("moisture", ""), "consistency": layers[0].get("consistency", "") } return merged def suggest_layer_splitting(self, layers: List[Dict]) -> Dict[str, Any]: """Suggest which layers should be split based on thickness and variability""" suggestions = [] for i, layer in enumerate(layers): thickness = layer["depth_to"] - layer["depth_from"] # Suggest splitting very thick layers (>5m) if thickness > 5.0: suggested_splits = int(thickness / 2.5) # Split into ~2.5m sublayers suggestions.append({ "layer_index": i, "reason": f"Layer is very thick ({thickness:.1f}m) - consider splitting into {suggested_splits} sublayers", "suggested_depths": np.linspace(layer["depth_from"], layer["depth_to"], suggested_splits + 1).tolist() }) # Check for significant strength variation indication description = layer.get("description", "").lower() if any(word in description for word in ["varying", "variable", "interbedded", "alternating"]): suggestions.append({ "layer_index": i, "reason": "Description indicates variable conditions - consider splitting based on detailed log", "suggested_depths": [layer["depth_from"], (layer["depth_from"] + layer["depth_to"])/2, layer["depth_to"]] }) return {"suggestions": suggestions} def optimize_layer_division(self, layers: List[Dict], merge_similar=True, split_thick=True) -> Dict[str, Any]: """Optimize layer division by merging similar layers and splitting thick ones""" optimized_layers = layers.copy() changes_made = [] # Traditional merge suggestions merge_suggestions = {"suggestions": []} if merge_similar: merge_suggestions = self.suggest_layer_merging(optimized_layers) for suggestion in merge_suggestions["suggestions"]: changes_made.append(f"Merged layers {suggestion['layer_indices']}: {suggestion['reason']}") # Nearest neighbor analysis nn_analysis = self.analyze_nearest_neighbors(optimized_layers) # Split suggestions split_suggestions = {"suggestions": []} if split_thick: split_suggestions = self.suggest_layer_splitting(optimized_layers) for suggestion in split_suggestions["suggestions"]: changes_made.append(f"Suggested splitting layer {suggestion['layer_index']}: {suggestion['reason']}") return { "optimized_layers": optimized_layers, "changes_made": changes_made, "merge_suggestions": merge_suggestions, "split_suggestions": split_suggestions, "nearest_neighbor_analysis": nn_analysis } def analyze_nearest_neighbors(self, layers: List[Dict], k: int = 3, similarity_threshold: float = 0.55) -> Dict[str, Any]: """Perform nearest neighbor analysis on soil layers""" if len(layers) < 2: return {"message": "Insufficient layers for neighbor analysis"} try: # Get nearest neighbor analysis nn_suggestions = self.nn_grouping.suggest_layer_merging(layers, similarity_threshold) # Get detailed neighbor report neighbor_report = self.nn_grouping.get_layer_neighbors_report(layers, k) return { "neighbor_groups": nn_suggestions.get("groups", []), "merge_recommendations": nn_suggestions.get("recommendations", []), "cluster_labels": nn_suggestions.get("cluster_labels", []), "neighbor_report": neighbor_report, "analysis_parameters": { "similarity_threshold": similarity_threshold, "k_neighbors": k, "total_layers": len(layers) } } except Exception as e: st.error(f"Error in nearest neighbor analysis: {str(e)}") return {"error": str(e)} def get_grouping_summary(self, layers: List[Dict]) -> Dict[str, Any]: """Get a comprehensive summary of layer grouping analysis""" nn_analysis = self.analyze_nearest_neighbors(layers) if "error" in nn_analysis: return nn_analysis summary = { "total_layers": len(layers), "identified_groups": len(nn_analysis.get("neighbor_groups", [])), "merge_recommendations": len(nn_analysis.get("merge_recommendations", [])), "group_details": [] } # Add details for each group for i, group in enumerate(nn_analysis.get("neighbor_groups", [])): group_detail = { "group_id": group.get("group_id", i+1), "layers_in_group": group.get("group_size", 0), "depth_range": f"{group.get('depth_range', {}).get('min', 0):.1f}-{group.get('depth_range', {}).get('max', 0):.1f}m", "total_thickness": group.get('depth_range', {}).get('total_thickness', 0), "dominant_soil_type": max(group.get('soil_types', {}).items(), key=lambda x: x[1])[0] if group.get('soil_types') else "unknown", "layer_ids": group.get("layer_ids", []) } summary["group_details"].append(group_detail) return summary def calculate_layer_statistics(self, layers: List[Dict]) -> Dict[str, Any]: """Calculate statistics for the soil profile""" if not layers: return {} total_depth = max(layer["depth_to"] for layer in layers) layer_count = len(layers) # Soil type distribution soil_types = {} for layer in layers: soil_type = layer.get("soil_type", "unknown") thickness = layer["depth_to"] - layer["depth_from"] if soil_type in soil_types: soil_types[soil_type] += thickness else: soil_types[soil_type] = thickness # Convert to percentages soil_type_percentages = {k: (v/total_depth)*100 for k, v in soil_types.items()} # Average layer thickness thicknesses = [layer["depth_to"] - layer["depth_from"] for layer in layers] avg_thickness = np.mean(thicknesses) return { "total_depth": total_depth, "layer_count": layer_count, "average_layer_thickness": avg_thickness, "soil_type_distribution": soil_type_percentages, "thickest_layer": max(thicknesses), "thinnest_layer": min(thicknesses) }