File size: 13,270 Bytes
2c200f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import numpy as np
from typing import List, Dict, Any
import streamlit as st
from nearest_neighbor_grouping import NearestNeighborGrouping

class SoilLayerAnalyzer:
    def __init__(self):
        self.consistency_mapping = {
            "soft": 1, "loose": 1,
            "medium": 2, "medium dense": 2,
            "stiff": 3, "dense": 3,
            "very stiff": 4, "very dense": 4,
            "hard": 5
        }
        self.nn_grouping = NearestNeighborGrouping()
    
    def validate_layer_continuity(self, layers: List[Dict]) -> List[Dict]:
        """Validate and fix layer depth continuity"""
        if not layers:
            return layers
        
        # Sort layers by depth_from
        sorted_layers = sorted(layers, key=lambda x: x.get("depth_from", 0))
        
        validated_layers = []
        for i, layer in enumerate(sorted_layers):
            if i == 0:
                # First layer starts from 0
                layer["depth_from"] = 0
            else:
                # Each layer starts where previous ends
                layer["depth_from"] = validated_layers[-1]["depth_to"]
            
            validated_layers.append(layer)
        
        return validated_layers
    
    def identify_similar_layers(self, layers: List[Dict], similarity_threshold: float = 0.8) -> List[List[int]]:
        """Identify layers that could potentially be grouped together"""
        similar_groups = []
        
        for i, layer1 in enumerate(layers):
            for j, layer2 in enumerate(layers[i+1:], i+1):
                similarity_score = self._calculate_layer_similarity(layer1, layer2)
                
                if similarity_score >= similarity_threshold:
                    # Check if either layer is already in a group
                    group_found = False
                    for group in similar_groups:
                        if i in group:
                            if j not in group:
                                group.append(j)
                            group_found = True
                            break
                        elif j in group:
                            if i not in group:
                                group.append(i)
                            group_found = True
                            break
                    
                    if not group_found:
                        similar_groups.append([i, j])
        
        return similar_groups
    
    def _calculate_layer_similarity(self, layer1: Dict, layer2: Dict) -> float:
        """Calculate similarity score between two layers"""
        score = 0.0
        total_weight = 0.0
        
        # Soil type similarity (weight: 0.4)
        if layer1.get("soil_type", "").lower() == layer2.get("soil_type", "").lower():
            score += 0.4
        total_weight += 0.4
        
        # Strength parameter similarity (weight: 0.3)
        strength1 = layer1.get("strength_value")
        strength2 = layer2.get("strength_value")
        if strength1 is not None and strength2 is not None:
            if abs(strength1 - strength2) / max(strength1, strength2) < 0.3:
                score += 0.3
        total_weight += 0.3
        
        # Consistency similarity (weight: 0.2)
        consistency1 = self._extract_consistency(layer1.get("soil_type", ""))
        consistency2 = self._extract_consistency(layer2.get("soil_type", ""))
        if consistency1 == consistency2:
            score += 0.2
        total_weight += 0.2
        
        # Color similarity (weight: 0.1)
        color1 = layer1.get("color") or ""
        color2 = layer2.get("color") or ""
        if color1.lower() == color2.lower():
            score += 0.1
        total_weight += 0.1
        
        return score / total_weight if total_weight > 0 else 0.0
    
    def _extract_consistency(self, soil_type: str) -> str:
        """Extract consistency from soil type description"""
        soil_type_lower = soil_type.lower()
        for consistency in self.consistency_mapping.keys():
            if consistency in soil_type_lower:
                return consistency
        return ""
    
    def suggest_layer_merging(self, layers: List[Dict]) -> Dict[str, Any]:
        """Suggest which layers could be merged"""
        similar_groups = self.identify_similar_layers(layers)
        suggestions = []
        
        for group in similar_groups:
            if len(group) >= 2:
                group_layers = [layers[i] for i in group]
                
                # Check if layers are adjacent or close
                depths = [(layer["depth_from"], layer["depth_to"]) for layer in group_layers]
                depths.sort()
                
                # Check for adjacency
                is_adjacent = True
                for i in range(len(depths) - 1):
                    if abs(depths[i][1] - depths[i+1][0]) > 0.5:  # 0.5m tolerance
                        is_adjacent = False
                        break
                
                if is_adjacent:
                    suggestions.append({
                        "layer_indices": group,
                        "reason": "Similar soil properties and adjacent depths",
                        "merged_layer": self._create_merged_layer(group_layers)
                    })
        
        return {"suggestions": suggestions}
    
    def _create_merged_layer(self, layers: List[Dict]) -> Dict:
        """Create a merged layer from multiple similar layers"""
        if not layers:
            return {}
        
        merged = {
            "layer_id": f"merged_{layers[0]['layer_id']}_{layers[-1]['layer_id']}",
            "depth_from": min(layer["depth_from"] for layer in layers),
            "depth_to": max(layer["depth_to"] for layer in layers),
            "soil_type": layers[0]["soil_type"],  # Use first layer's type
            "description": f"Merged layer: {', '.join([layer.get('description', '') for layer in layers])}",
            "strength_parameter": layers[0].get("strength_parameter", ""),
            "strength_value": np.mean([layer.get("strength_value", 0) for layer in layers if layer.get("strength_value") is not None]),
            "color": layers[0].get("color", ""),
            "moisture": layers[0].get("moisture", ""),
            "consistency": layers[0].get("consistency", "")
        }
        
        return merged
    
    def suggest_layer_splitting(self, layers: List[Dict]) -> Dict[str, Any]:
        """Suggest which layers should be split based on thickness and variability"""
        suggestions = []
        
        for i, layer in enumerate(layers):
            thickness = layer["depth_to"] - layer["depth_from"]
            
            # Suggest splitting very thick layers (>5m)
            if thickness > 5.0:
                suggested_splits = int(thickness / 2.5)  # Split into ~2.5m sublayers
                
                suggestions.append({
                    "layer_index": i,
                    "reason": f"Layer is very thick ({thickness:.1f}m) - consider splitting into {suggested_splits} sublayers",
                    "suggested_depths": np.linspace(layer["depth_from"], layer["depth_to"], suggested_splits + 1).tolist()
                })
            
            # Check for significant strength variation indication
            description = layer.get("description", "").lower()
            if any(word in description for word in ["varying", "variable", "interbedded", "alternating"]):
                suggestions.append({
                    "layer_index": i,
                    "reason": "Description indicates variable conditions - consider splitting based on detailed log",
                    "suggested_depths": [layer["depth_from"], (layer["depth_from"] + layer["depth_to"])/2, layer["depth_to"]]
                })
        
        return {"suggestions": suggestions}
    
    def optimize_layer_division(self, layers: List[Dict], merge_similar=True, split_thick=True) -> Dict[str, Any]:
        """Optimize layer division by merging similar layers and splitting thick ones"""
        optimized_layers = layers.copy()
        changes_made = []
        
        # Traditional merge suggestions
        merge_suggestions = {"suggestions": []}
        if merge_similar:
            merge_suggestions = self.suggest_layer_merging(optimized_layers)
            for suggestion in merge_suggestions["suggestions"]:
                changes_made.append(f"Merged layers {suggestion['layer_indices']}: {suggestion['reason']}")
        
        # Nearest neighbor analysis
        nn_analysis = self.analyze_nearest_neighbors(optimized_layers)
        
        # Split suggestions
        split_suggestions = {"suggestions": []}
        if split_thick:
            split_suggestions = self.suggest_layer_splitting(optimized_layers)
            for suggestion in split_suggestions["suggestions"]:
                changes_made.append(f"Suggested splitting layer {suggestion['layer_index']}: {suggestion['reason']}")
        
        return {
            "optimized_layers": optimized_layers,
            "changes_made": changes_made,
            "merge_suggestions": merge_suggestions,
            "split_suggestions": split_suggestions,
            "nearest_neighbor_analysis": nn_analysis
        }
    
    def analyze_nearest_neighbors(self, layers: List[Dict], k: int = 3, similarity_threshold: float = 0.55) -> Dict[str, Any]:
        """Perform nearest neighbor analysis on soil layers"""
        
        if len(layers) < 2:
            return {"message": "Insufficient layers for neighbor analysis"}
        
        try:
            # Get nearest neighbor analysis
            nn_suggestions = self.nn_grouping.suggest_layer_merging(layers, similarity_threshold)
            
            # Get detailed neighbor report
            neighbor_report = self.nn_grouping.get_layer_neighbors_report(layers, k)
            
            return {
                "neighbor_groups": nn_suggestions.get("groups", []),
                "merge_recommendations": nn_suggestions.get("recommendations", []),
                "cluster_labels": nn_suggestions.get("cluster_labels", []),
                "neighbor_report": neighbor_report,
                "analysis_parameters": {
                    "similarity_threshold": similarity_threshold,
                    "k_neighbors": k,
                    "total_layers": len(layers)
                }
            }
        
        except Exception as e:
            st.error(f"Error in nearest neighbor analysis: {str(e)}")
            return {"error": str(e)}
    
    def get_grouping_summary(self, layers: List[Dict]) -> Dict[str, Any]:
        """Get a comprehensive summary of layer grouping analysis"""
        
        nn_analysis = self.analyze_nearest_neighbors(layers)
        
        if "error" in nn_analysis:
            return nn_analysis
        
        summary = {
            "total_layers": len(layers),
            "identified_groups": len(nn_analysis.get("neighbor_groups", [])),
            "merge_recommendations": len(nn_analysis.get("merge_recommendations", [])),
            "group_details": []
        }
        
        # Add details for each group
        for i, group in enumerate(nn_analysis.get("neighbor_groups", [])):
            group_detail = {
                "group_id": group.get("group_id", i+1),
                "layers_in_group": group.get("group_size", 0),
                "depth_range": f"{group.get('depth_range', {}).get('min', 0):.1f}-{group.get('depth_range', {}).get('max', 0):.1f}m",
                "total_thickness": group.get('depth_range', {}).get('total_thickness', 0),
                "dominant_soil_type": max(group.get('soil_types', {}).items(), key=lambda x: x[1])[0] if group.get('soil_types') else "unknown",
                "layer_ids": group.get("layer_ids", [])
            }
            summary["group_details"].append(group_detail)
        
        return summary
    
    def calculate_layer_statistics(self, layers: List[Dict]) -> Dict[str, Any]:
        """Calculate statistics for the soil profile"""
        if not layers:
            return {}
        
        total_depth = max(layer["depth_to"] for layer in layers)
        layer_count = len(layers)
        
        # Soil type distribution
        soil_types = {}
        for layer in layers:
            soil_type = layer.get("soil_type", "unknown")
            thickness = layer["depth_to"] - layer["depth_from"]
            if soil_type in soil_types:
                soil_types[soil_type] += thickness
            else:
                soil_types[soil_type] = thickness
        
        # Convert to percentages
        soil_type_percentages = {k: (v/total_depth)*100 for k, v in soil_types.items()}
        
        # Average layer thickness
        thicknesses = [layer["depth_to"] - layer["depth_from"] for layer in layers]
        avg_thickness = np.mean(thicknesses)
        
        return {
            "total_depth": total_depth,
            "layer_count": layer_count,
            "average_layer_thickness": avg_thickness,
            "soil_type_distribution": soil_type_percentages,
            "thickest_layer": max(thicknesses),
            "thinnest_layer": min(thicknesses)
        }