| |
| import numpy as np |
| import pandas as pd |
| from collections import Counter |
| from sklearn.decomposition import PCA |
| from sklearn.cluster import DBSCAN |
| from sklearn.neighbors import LocalOutlierFactor |
| import hdbscan |
| from scipy.stats import circvar |
|
|
| from utils import add_zone_labels |
|
|
|
|
| def get_dominant_zone(df: pd.DataFrame) -> str: |
| if len(df) == 0 or 'zone_label' not in df.columns: |
| return "N/A" |
| counter = Counter(df['zone_label']) |
| most_common_zone, _ = counter.most_common(1)[0] |
| return most_common_zone |
|
|
|
|
| def circular_range_deg(angles_deg: np.ndarray) -> float: |
| if len(angles_deg) < 2: return 0.0 |
| angles_sorted = np.sort(np.array(angles_deg) % 360.0) |
| gaps = np.diff(angles_sorted) |
| circular_gap = 360.0 - angles_sorted[-1] + angles_sorted[0] |
| max_gap = max(np.max(gaps), circular_gap) |
| return 360.0 - max_gap |
|
|
|
|
| def check_sector_coverage(theta_deg: np.ndarray, min_sectors: int = 8) -> bool: |
| if len(theta_deg) == 0: return False |
| sector_indices = ((theta_deg % 360) // 30).astype(int) % 12 |
| unique_sectors = len(np.unique(sector_indices)) |
| return unique_sectors >= min_sectors |
|
|
|
|
| def fit_circle_least_squares(x: np.ndarray, y: np.ndarray): |
| if len(x) < 3: return None, None, None, np.inf |
| x = x[:, np.newaxis] |
| y = y[:, np.newaxis] |
| A = np.hstack([x, y, np.ones_like(x)]) |
| b = x**2 + y**2 |
| try: |
| solution, residuals, _, _ = np.linalg.lstsq(A, b, rcond=None) |
| a, b, c = solution.flatten() |
| center_x = a / 2 |
| center_y = b / 2 |
| radius = np.sqrt((a**2 + b**2) / 4 + c) |
| fitted_dists = np.sqrt((x - center_x)**2 + (y - center_y)**2) |
| rmse = np.sqrt(np.mean((fitted_dists - radius)**2)) |
| return center_x, center_y, radius, rmse |
| except: |
| return None, None, None, np.inf |
|
|
|
|
| def filter_main_ring_band(df: pd.DataFrame, r_bin_width: float = 5.0, top_n_bins: int = 1) -> pd.DataFrame: |
| if len(df) == 0 or 'r' not in df.columns: return df.copy() |
| r = df['r'].values |
| r = r[(r >= 0) & (r <= 150)] |
| if len(r) == 0: return pd.DataFrame(columns=df.columns) |
| r_bins = np.arange(0, 150 + r_bin_width, r_bin_width) |
| r_hist, r_edges = np.histogram(r, bins=r_bins) |
| top_bin_indices = np.argsort(r_hist)[::-1][:top_n_bins] |
| mask = np.zeros(len(df), dtype=bool) |
| for bin_idx in top_bin_indices: |
| r_min = r_edges[bin_idx] |
| r_max = r_edges[bin_idx + 1] |
| bin_mask = (df['r'] >= r_min) & (df['r'] < r_max) |
| mask = mask | bin_mask.values |
| return df[mask].copy() |
|
|
|
|
| def is_ring_pattern_robust(inlier_df: pd.DataFrame, cfg: dict) -> bool: |
| n_total = len(inlier_df) |
| if n_total < cfg['ring']['ring_min_points']: return False |
| main_ring_df = filter_main_ring_band(inlier_df, r_bin_width=cfg['ring']['ring_band_width'], top_n_bins=1) |
| if len(main_ring_df) < cfg['ring']['ring_min_points']: return False |
| r = main_ring_df['r'].values |
| theta_deg = main_ring_df['theta_deg'].values |
| x = main_ring_df['coor_x'].values |
| y = main_ring_df['coor_y'].values |
| if r.max() - r.min() > cfg['ring']['ring_r_absolute_tolerance']: return False |
| if circular_range_deg(theta_deg) < cfg['ring']['ring_min_angular_coverage']: return False |
| if not check_sector_coverage(theta_deg, min_sectors=cfg['ring']['ring_min_sectors']): return False |
| cx, cy, r_fit, rmse = fit_circle_least_squares(x, y) |
| if rmse == np.inf or rmse > cfg['ring']['ring_fit_rmse_max']: return False |
| if np.sqrt(cx**2 + cy**2) > 10.0: return False |
| return True |
|
|
|
|
| def _is_linear_set(coords: np.ndarray, cfg: dict) -> bool: |
| n = len(coords) |
| if n < 3: return False |
| centroid = np.mean(coords, axis=0) |
| max_dist = np.max(np.linalg.norm(coords - centroid, axis=1)) |
| if 2 * max_dist < cfg['linear']['linear_min_length']: return False |
| pca = PCA(n_components=min(2, n)).fit(coords) |
| if len(pca.explained_variance_) < 2: return False |
| eig_ratio = pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9) |
| if np.sqrt(eig_ratio) < cfg['linear']['linear_pca_ratio_min']: return False |
| normal_vec = np.array([-pca.components_[0][1], pca.components_[0][0]]) |
| if np.mean(np.abs(np.dot(coords - pca.mean_, normal_vec))) > cfg['linear']['linear_max_deviation']: return False |
| proj = np.sort(np.dot(coords - pca.mean_, pca.components_[0])) |
| total_len = proj[-1] - proj[0] |
| if total_len > 0 and np.max(np.diff(proj)) / total_len > cfg['linear']['linear_max_gap_ratio']: return False |
| return True |
|
|
|
|
| def _is_centroids_linear(sub_coords_list: list, cfg: dict) -> bool: |
| if len(sub_coords_list) < 3: return False |
| centroids = np.array([np.mean(sc, axis=0) for sc in sub_coords_list]) |
| max_span = 2 * np.max(np.linalg.norm(centroids - np.mean(centroids, axis=0), axis=1)) |
| if max_span < cfg['linear']['centroid_linear_min_length']: return False |
| pca = PCA(n_components=2).fit(centroids) |
| if len(pca.explained_variance_) < 2: return False |
| if np.sqrt(pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9)) < cfg['linear']['centroid_linear_pca_min']: return False |
| normal = np.array([-pca.components_[0][1], pca.components_[0][0]]) |
| if np.mean(np.abs(np.dot(centroids - pca.mean_, normal))) > cfg['linear']['centroid_linear_dev_max']: return False |
| return True |
|
|
|
|
| def _classify_subcluster(sub_coords: np.ndarray, cfg: dict) -> str: |
| n = len(sub_coords) |
| if n < 3: return "군집" |
| centroid = np.mean(sub_coords, axis=0) |
| dists_from_centroid = np.linalg.norm(sub_coords - centroid, axis=1) |
| max_dist = np.max(dists_from_centroid) |
| if max_dist <= cfg['cluster']['cluster_compactness_radius']: return "군집" |
| pca = PCA(n_components=min(2, n)).fit(sub_coords) |
| if len(pca.explained_variance_) >= 2: |
| eig_ratio = pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9) |
| shape_idx = np.sqrt(eig_ratio) |
| if shape_idx >= cfg['linear']['linear_pca_ratio_min']: |
| normal_vec = np.array([-pca.components_[0][1], pca.components_[0][0]]) |
| mean_dev = np.mean(np.abs(np.dot(sub_coords - pca.mean_, normal_vec))) |
| if mean_dev <= cfg['linear']['linear_max_deviation'] and 2*max_dist >= cfg['linear']['linear_min_length']: |
| return "선형" |
| return "군집" |
|
|
|
|
| def classify_wafer_patterns(df: pd.DataFrame, cfg: dict) -> tuple: |
| if df.empty: return df, "데이터 없음", ["None"], None |
| df = df.copy().reset_index(drop=True) |
| df = add_zone_labels(df, inner_radius=cfg['preprocessing']['inner_radius_mm']) |
| coords = df[["coor_x", "coor_y"]].values |
| n_total = len(df) |
| if n_total < cfg['misc']['min_points_for_clustering']: |
| return df.assign(inlier=np.zeros(len(df), dtype=bool)), "데이터 없음", ["정상/미달"], None |
|
|
| clusterer = hdbscan.HDBSCAN( |
| min_cluster_size=cfg['clustering']['min_cluster_size'], |
| min_samples=cfg['clustering']['min_samples'], |
| cluster_selection_method=cfg['clustering']['cluster_selection_method'], |
| metric="euclidean", |
| gen_min_span_tree=True |
| ) |
| labels = clusterer.fit_predict(coords) |
| if np.all(labels == -1): |
| labels = DBSCAN(eps=cfg['clustering']['dbscan_eps'], min_samples=cfg['clustering']['min_cluster_size']).fit(coords).labels_ |
| inlier_mask = (labels != -1) |
| if not any(inlier_mask): |
| return df.assign(inlier=inlier_mask), "데이터 없음", ["Others"], None |
|
|
| inlier_df_pre = df[inlier_mask].copy() |
| inlier_coords = coords[inlier_mask] |
| n_inlier = len(inlier_coords) |
| if n_inlier >= cfg['lof']['lof_min_points']: |
| n_neighbors_lof = min(cfg['lof']['lof_n_neighbors'], n_inlier - 1) |
| if n_neighbors_lof >= 2: |
| lof = LocalOutlierFactor( |
| n_neighbors=n_neighbors_lof, |
| contamination=cfg['lof']['lof_contamination'], |
| metric="euclidean" |
| ) |
| lof_labels = lof.fit_predict(inlier_coords) |
| full_lof_mask = np.zeros(len(df), dtype=bool) |
| full_lof_mask[inlier_mask] = (lof_labels == 1) |
| inlier_mask = inlier_mask & full_lof_mask |
|
|
| inlier_df = df[inlier_mask].copy() |
| inlier_coords = coords[inlier_mask] |
| n_inlier = len(inlier_df) |
| if n_inlier < cfg['clustering']['min_cluster_size']: |
| return df.assign(inlier=inlier_mask), "데이터 없음", ["Others"], None |
|
|
| if is_ring_pattern_robust(inlier_df, cfg): |
| dominant_zone = get_dominant_zone(inlier_df) |
| centroid = tuple(np.mean(inlier_df[['coor_x', 'coor_y']].values, axis=0)) |
| return df.assign(inlier=inlier_mask), dominant_zone, ["환형"], centroid |
|
|
| if _is_linear_set(inlier_coords, cfg): |
| dominant_zone = get_dominant_zone(inlier_df) |
| dom_points = inlier_df[inlier_df['zone_label'] == dominant_zone] |
| centroid = tuple(np.mean(dom_points[['coor_x', 'coor_y']].values, axis=0)) if not dom_points.empty else tuple(np.mean(inlier_coords, axis=0)) |
| return df.assign(inlier=inlier_mask), dominant_zone, ["선형"], centroid |
|
|
| dominant_zone = get_dominant_zone(inlier_df) |
| dom_points = inlier_df[inlier_df['zone_label'] == dominant_zone] |
| centroid = tuple(np.mean(dom_points[['coor_x', 'coor_y']].values, axis=0)) if not dom_points.empty else tuple(np.mean(inlier_coords, axis=0)) |
|
|
| if n_inlier >= 2: |
| dbscan_sub = DBSCAN(eps=cfg['clustering']['cluster_dbscan_eps'], min_samples=cfg['clustering']['min_cluster_size']).fit(inlier_coords) |
| sub_labels = dbscan_sub.labels_ |
| n_sub_clusters = len(set(sub_labels)) - (1 if -1 in sub_labels else 0) |
| if n_sub_clusters >= 2: |
| sub_coords_list = [inlier_coords[sub_labels == lbl] for lbl in set(sub_labels) if lbl != -1] |
| if _is_centroids_linear(sub_coords_list, cfg): |
| return df.assign(inlier=inlier_mask), dominant_zone, ["선형"], centroid |
| sub_results = [(_classify_subcluster(sc, cfg), len(sc)) for sc in sub_coords_list] |
| pat_totals = {} |
| for pat, cnt in sub_results: pat_totals[pat] = pat_totals.get(pat, 0) + cnt |
| dominant_pattern = max(pat_totals, key=pat_totals.get) |
| return df.assign(inlier=inlier_mask), dominant_zone, [dominant_pattern], centroid |
| pattern = _classify_subcluster(inlier_coords, cfg) |
| return df.assign(inlier=inlier_mask), dominant_zone, [pattern], centroid |
|
|
| return df.assign(inlier=inlier_mask), dominant_zone, ["Others"], None |