- forensic_oil_complete_reform.py
- 完整改良版单文件:法证级多模态油脂包浆鉴定(一次性全面改革)
- 目标:一次性解决之前所有失败与不足,提供工程化、可测试、可部署的单文件实现。
- 说明:
- - 生产必须替换 TODO: REPLACE 的厂商解析器与提供真实 CS-14C 标定数据
- - 请通过 KMS/Secret 管理 PROVENANCE_SECRET,不要在环境中明文存放
- - 运行示例:
- python forensic_oil_complete_reform.py --mode build-dummy
- python forensic_oil_complete_reform.py --mode train --manifest train_manifest.json --model-dir models
- python forensic_oil_complete_reform.py --mode infer-file --sample-file sample.json --model-dir models
- python forensic_oil_complete_reform.py --mode auto
- uvicorn forensic_oil_complete_reform:app --host 0.0.0.0 --port 8000
- numpy compatibility
- Optional heavy deps (graceful fallback)
- Web API optional
- Logging
- ----------------------------
- Default configuration
- ----------------------------
- ----------------------------
- Utilities
- ----------------------------
- ----------------------------
- Error codes & helpers
- ----------------------------
- ----------------------------
- Robust parsing utilities (comprehensive)
- - Handles placeholders, single-column CSVs, non-image bytes, and tries multiple fallbacks
- ----------------------------
- ----------------------------
- Parsers (robust, with synthetic fallback)
- ----------------------------
- ----------------------------
- Signal processing & feature helpers (unchanged but robust)
- ----------------------------
- ----------------------------
- Image & AFM quantification (robust)
- ----------------------------
- ----------------------------
- Dataset & feature assembly (robust with synthetic minimal features)
- ----------------------------
- ----------------------------
- Synthetic minimal feature generators (when modality missing)
- ----------------------------
- ----------------------------
- Spectrum features helper
- ----------------------------
- ----------------------------
- GC-MS feature extraction (precise)
- ----------------------------
- ----------------------------
- Submodels (XGBoost)
- ----------------------------
- ----------------------------
- Fusion & Bayesian age posterior (MCMC template + conjugate fallback)
- ----------------------------
- ----------------------------
- Calibration & evaluation
- ----------------------------
- ----------------------------
- Training pipeline (robust)
- ----------------------------
- ----------------------------
- Inference engine (robust)
- ----------------------------
- ----------------------------
- Blind-test harness
- ----------------------------
- ----------------------------
- AutoOrchestrator (robust)
- ----------------------------
- ----------------------------
- FastAPI UI prototype (optional)
- ----------------------------
- ----------------------------
- CLI helpers & tests
- ----------------------------
- ----------------------------
- Main CLI
- ----------------------------
#!/usr/bin/env python3
forensic_oil_complete_reform.py
完整改良版单文件:法证级多模态油脂包浆鉴定(一次性全面改革)
目标:一次性解决之前所有失败与不足,提供工程化、可测试、可部署的单文件实现。
说明:
- 生产必须替换 TODO: REPLACE 的厂商解析器与提供真实 CS-14C 标定数据
- 请通过 KMS/Secret 管理 PROVENANCE_SECRET,不要在环境中明文存放
- 运行示例:
python forensic_oil_complete_reform.py --mode build-dummy
python forensic_oil_complete_reform.py --mode train --manifest train_manifest.json --model-dir models
python forensic_oil_complete_reform.py --mode infer-file --sample-file sample.json --model-dir models
python forensic_oil_complete_reform.py --mode auto
uvicorn forensic_oil_complete_reform:app --host 0.0.0.0 --port 8000
import os import io import sys import json import time import uuid import math import hmac import yaml import base64 import hashlib import logging import datetime from typing import Dict, Any, List, Tuple, Optional from threading import Thread, Event from queue import Queue, Empty from contextlib import asynccontextmanager
numpy compatibility
try: import numpy as np if not hasattr(np, 'bool'): np.bool = np.bool_ except Exception: raise
import pandas as pd from scipy import signal, optimize, stats from scipy.ndimage import gaussian_filter from sklearn.isotonic import IsotonicRegression from sklearn.metrics import roc_auc_score, precision_recall_fscore_support from sklearn.model_selection import train_test_split import xgboost as xgb
Optional heavy deps (graceful fallback)
try: import pymzml except Exception: pymzml = None try: import tifffile except Exception: tifffile = None try: import pymc3 as pm import arviz as az except Exception: pm = None try: import shap except Exception: shap = None try: from PIL import Image, UnidentifiedImageError except Exception: Image = None UnidentifiedImageError = Exception
Web API optional
try: from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.responses import HTMLResponse, JSONResponse from pydantic import BaseModel except Exception: FastAPI = None
Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") logger = logging.getLogger("forensic_oil_complete_reform")
----------------------------
Default configuration
----------------------------
DEFAULT_CONFIG = { "model_version": "5.0.0-complete-reform", "provenance_secret_env": "PROVENANCE_SECRET", "audit_dir": "audit_logs", "model_dir": "models", "incoming_dir": "incoming_samples", "min_sample_mass_mg": 0.2, "gcms": { "binned_bins": 1024, "marker_windows": [[270,290],[300,320],[340,360],[400,420]], "topk": 256, "peak_prominence": 1e3, "peak_width": (1, 200) }, "spectrum": {"binned_bins": 512, "marker_peaks": [1700,1740,2850,2920,1650]}, "image": {"patch_size": 256, "hist_bins": 128}, "afm": {"features": ["max_force","min_force","adhesion","slope","auc"]}, "fusion": {"confidence_high": 0.92, "confidence_low": 0.35}, "training": {"xgb_rounds": 200, "random_seed": 42}, "blind_test": {"n_min": 200, "auc_target": 0.95, "fpr_target": 0.05}, "bayes": {"mcmc_draws": 1000, "mcmc_tune": 500}, "auto": { "watch_dir": "incoming_samples", "poll_interval_s": 5, "max_workers": 2, "human_review_confidence_threshold": 0.75, "auto_accept_confidence": 0.92, "max_queue_size": 1000 } }
----------------------------
Utilities
----------------------------
def now_iso() -> str: return datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def ensure_dir(p: str): os.makedirs(p, exist_ok=True)
def canonical_json(obj: Any) -> str: return json.dumps(obj, sort_keys=True, ensure_ascii=False, separators=(',',':'))
def compute_provenance(meta: Dict[str, Any], files_bytes: List[bytes], secret: str) -> Dict[str,str]: h = hashlib.sha256() h.update(canonical_json(meta).encode('utf-8')) for fb in files_bytes: if fb: h.update(fb) digest = h.hexdigest() mac = hmac.new(secret.encode('utf-8'), digest.encode('utf-8'), hashlib.sha256).hexdigest() return {"hash": digest, "hmac": mac}
----------------------------
Error codes & helpers
----------------------------
ERROR_CODES = { "PARSE_GCMS_FAIL": "E1001", "PARSE_SPECTRUM_FAIL": "E1002", "PARSE_IMAGE_FAIL": "E1003", "PARSE_AFM_FAIL": "E1004", "MISSING_MODALITY": "E2001", "MODEL_LOAD_FAIL": "E3001", "INFERENCE_FAIL": "E3002" }
def audit_error(audit_dir: str, sample_id: str, code: str, message: str, raw_bytes: Optional[bytes]=None): ensure_dir(audit_dir) rec = {"sample_id": sample_id, "error_code": code, "message": message, "timestamp": now_iso()} if raw_bytes: rec['raw_hash'] = hashlib.sha256(raw_bytes).hexdigest() fname = os.path.join(audit_dir, f"error_{sample_id}{int(time.time())}{uuid.uuid4().hex[:6]}.json") with open(fname, 'w', encoding='utf-8') as f: json.dump(rec, f, ensure_ascii=False, indent=2) logger.debug("Audit error written: %s", fname)
----------------------------
Robust parsing utilities (comprehensive)
- Handles placeholders, single-column CSVs, non-image bytes, and tries multiple fallbacks
----------------------------
PLACEHOLDER_MARKERS = {b"mzml_placeholder", b"spectrum_placeholder", b"tiff_placeholder", b"afm_placeholder", b"cs14c_placeholder", b"tiff", b"image", b"placeholder"}
def is_placeholder(b: Optional[bytes]) -> bool: if not b: return True if isinstance(b, bytes) and len(b) < 1024: low = b.strip().lower() for m in PLACEHOLDER_MARKERS: if m in low: return True return False
def try_read_csv_bytes(b: bytes, sep_guesses=[',','\t',';',' ']) -> pd.DataFrame: s = b.decode('utf-8', errors='ignore') # quick sniff: if looks like JSON, raise if s.strip().startswith('{') or s.strip().startswith('['): raise ValueError("Not CSV") # try pandas default try: df = pd.read_csv(io.StringIO(s)) if df.shape[1] >= 2: return df except Exception: pass # try guesses for sep in sep_guesses: try: df = pd.read_csv(io.StringIO(s), sep=sep) if df.shape[1] >= 2: return df except Exception: continue # try single-column splitting heuristics lines = [ln.strip() for ln in s.splitlines() if ln.strip()] rows = [] for ln in lines: parts = ln.replace(',', ' ').replace('\t',' ').split() if len(parts) >= 2: rows.append(parts[:2]) if rows: df = pd.DataFrame(rows) return df # fallback: raise raise ValueError("CSV parse failed or single-column without numeric pairs")
----------------------------
Parsers (robust, with synthetic fallback)
----------------------------
def parse_mzml_precise(mzml_bytes: Optional[bytes]) -> Dict[str, Any]: """ Return {'peaks': DataFrame with columns ['mz','intensity','rt'], 'meta': {...}, 'note':...} Robust behavior: - If placeholder or None -> return empty peaks and note 'placeholder' - If pymzml available and valid mzML -> parse - Else try CSV heuristics - On failure return empty peaks and audit error """ meta = {} if is_placeholder(mzml_bytes): return {'peaks': pd.DataFrame(columns=['mz','intensity','rt']), 'meta': {'parser':'placeholder'}, 'note':'placeholder'} if pymzml is not None: try: with io.BytesIO(mzml_bytes) as bio: run = pymzml.run.Reader(bio) peaks = [] for spec in run: if getattr(spec, 'ms_level', 1) == 1: mzs = np.array(getattr(spec, 'mz', [])) ints = np.array(getattr(spec, 'i', [])) try: rt = spec.scan_time_in_minutes() except Exception: rt = np.nan if len(mzs) == 0: continue for m, it in zip(mzs, ints): peaks.append((float(m), float(it), float(rt))) if len(peaks) == 0: raise ValueError("No MS1 peaks found") df = pd.DataFrame(peaks, columns=['mz','intensity','rt']) return {'peaks': df, 'meta': {'parser':'pymzml'}, 'note':'pymzml'} except Exception as e: logger.warning("pymzml parse failed: %s. Will try CSV fallback.", e) # CSV fallback try: df = try_read_csv_bytes(mzml_bytes) # normalize column names cols = [c.lower() for c in df.columns] if 'mz' in cols and 'intensity' in cols: df2 = df.copy() df2.columns = cols if 'rt' not in df2.columns: df2['rt'] = np.nan return {'peaks': df2[['mz','intensity','rt']].copy(), 'meta': {'parser':'csv_fallback'}, 'note':'csv_fallback'} # if first two columns numeric, treat as mz,intensity numeric_cols = [] for c in df.columns[:2]: try: pd.to_numeric(df[c]) numeric_cols.append(c) except Exception: pass if len(numeric_cols) >= 2: df2 = df.copy() df2.columns = ['mz','intensity'] + list(df.columns[2:]) if 'rt' not in df2.columns: df2['rt'] = np.nan return {'peaks': df2[['mz','intensity','rt']].copy(), 'meta': {'parser':'csv_guess'}, 'note':'csv_guess'} except Exception as e: logger.exception("parse_mzml_precise failed: %s", e) # final fallback: empty return {'peaks': pd.DataFrame(columns=['mz','intensity','rt']), 'meta': {'parser':'failed'}, 'note':'failed'}
def parse_jcamp_precise(b: Optional[bytes]) -> Dict[str, Any]: if is_placeholder(b): return {'wn': np.array([], dtype=np.float32), 'intensity': np.array([], dtype=np.float32), 'meta': {'parser':'placeholder'}, 'note':'placeholder'} s = b.decode('utf-8', errors='ignore') if '##' in s and 'JCAMP' in s.upper(): header = {} data = [] in_xy = False for ln in s.splitlines(): ln = ln.strip() if ln.startswith('##'): parts = ln[2:].split('=',1) if len(parts) == 2: header[parts[0].strip()] = parts[1].strip() if ln.upper().startswith('##XYDATA'): in_xy = True continue if in_xy: for token in ln.split(): try: data.append(float(token)) except: pass if len(data) >= 4 and len(data) % 2 == 0: wn = np.array(data[0::2], dtype=np.float32) inten = np.array(data[1::2], dtype=np.float32) return {'wn': wn, 'intensity': inten, 'meta': header, 'note':'jcamp'} # CSV fallback try: df = try_read_csv_bytes(b) if df.shape[1] >= 2: wn = pd.to_numeric(df.iloc[:,0], errors='coerce').fillna(0).values.astype(np.float32) inten = pd.to_numeric(df.iloc[:,1], errors='coerce').fillna(0).values.astype(np.float32) return {'wn': wn, 'intensity': inten, 'meta': {'parser':'csv'}, 'note':'csv'} except Exception as e: logger.exception("parse_jcamp_precise failed: %s", e) return {'wn': np.array([], dtype=np.float32), 'intensity': np.array([], dtype=np.float32), 'meta': {'parser':'failed'}, 'note':'failed'}
def parse_sem_tiff_precise(b: Optional[bytes]) -> Tuple[np.ndarray, Dict[str,Any]]: meta = {} if is_placeholder(b): # return synthetic blank patch arr = np.zeros((DEFAULT_CONFIG['image']['patch_size'], DEFAULT_CONFIG['image']['patch_size']), dtype=np.uint8) return arr, {'parser':'placeholder'} if tifffile is not None: try: with io.BytesIO(b) as bio: tf = tifffile.TiffFile(bio) page = tf.pages[0] arr = page.asarray() tags = {} try: for k,v in page.tags.items(): tags[k] = str(v.value) meta['tiff_tags'] = tags desc = tags.get('ImageDescription','') if isinstance(desc, str) and 'nm' in desc: import re m = re.search(r'([\d.]+)\s*nm', desc) if m: meta['pixel_size_nm'] = float(m.group(1)) except Exception: pass return arr, meta except Exception as e: logger.warning("tifffile parse failed: %s. Trying PIL fallback.", e) # PIL fallback try: if Image is None: raise RuntimeError("PIL not available") with io.BytesIO(b) as bio: img = Image.open(bio) arr = np.array(img) return arr, meta except UnidentifiedImageError as e: logger.warning("PIL cannot identify image: %s", e) except Exception as e: logger.exception("parse_sem_tiff_precise failed: %s", e) # final fallback: synthetic blank arr = np.zeros((DEFAULT_CONFIG['image']['patch_size'], DEFAULT_CONFIG['image']['patch_size']), dtype=np.uint8) return arr, {'parser':'failed_fallback'}
def parse_afm_precise(b: Optional[bytes]) -> Dict[str,Any]: if is_placeholder(b): return {'distance': np.array([], dtype=np.float32), 'force': np.array([], dtype=np.float32), 'meta': {'parser':'placeholder'}} s = b.decode('utf-8', errors='ignore') try: df = try_read_csv_bytes(b) # heuristics: find numeric columns numeric_cols = [] for c in df.columns: try: pd.to_numeric(df[c]) numeric_cols.append(c) except Exception: pass if len(numeric_cols) >= 2: dist_col = numeric_cols[0]; force_col = numeric_cols[1] return {'distance': pd.to_numeric(df[dist_col], errors='coerce').fillna(0).values.astype(np.float32), 'force': pd.to_numeric(df[force_col], errors='coerce').fillna(0).values.astype(np.float32), 'meta': {'parser':'csv'}} # if only one numeric column, assume it's force and synthesize distance if len(numeric_cols) == 1: force = pd.to_numeric(df[numeric_cols[0]], errors='coerce').fillna(0).values.astype(np.float32) distance = np.linspace(0.0, 1.0, len(force)).astype(np.float32) return {'distance': distance, 'force': force, 'meta': {'parser':'csv_singlecol_synth_distance'}} except Exception as e: logger.exception("parse_afm_precise failed: %s", e) return {'distance': np.array([], dtype=np.float32), 'force': np.array([], dtype=np.float32), 'meta': {'parser':'failed'}}
def parse_cs14c_precise(b: Optional[bytes]) -> Dict[str,Any]: if is_placeholder(b): return {'records': [], 'meta': {'parser':'placeholder'}} try: df = try_read_csv_bytes(b) cols = [c.lower() for c in df.columns] required = ['sample_id','compound_id','age_bp','age_sigma'] if all(r in cols for r in required): df.columns = cols return {'records': df.to_dict(orient='records'), 'meta': {'parser':'csv'}} # try to map common variants mapping = {} for r in required: for c in cols: if r in c: mapping[r] = c break if len(mapping) == len(required): df2 = df.rename(columns={v:k for k,v in mapping.items()}) return {'records': df2.to_dict(orient='records'), 'meta': {'parser':'csv_mapped'}} except Exception as e: logger.exception("parse_cs14c_precise failed: %s", e) return {'records': [], 'meta': {'parser':'failed'}}
----------------------------
Signal processing & feature helpers (unchanged but robust)
----------------------------
def detect_peaks(mz: np.ndarray, inten: np.ndarray, prominence: float, width: Tuple[float,float]) -> np.ndarray: if len(mz) < 3: return np.array([], dtype=int) try: inten_s = signal.savgol_filter(inten, window_length=11 if len(inten)>=11 else max(3,len(inten)//2*2+1), polyorder=2) peaks_idx, props = signal.find_peaks(inten_s, prominence=prominence, width=width) return peaks_idx except Exception: return np.array([], dtype=int)
def fit_gaussian_peak(x: np.ndarray, y: np.ndarray) -> Tuple[float,float,float,float]: if len(x) < 3: return (float(np.mean(x)) if len(x)>0 else 0.0, 0.0, 0.0, float(np.max(y)) if len(y)>0 else 0.0) def gauss(x, A, mu, sigma): return A * np.exp(-0.5*((x-mu)/sigma)*2) p0 = [float(np.max(y)), float(x[np.argmax(y)]), max(1e-6, float((x.max()-x.min())/6.0))] try: popt, _ = optimize.curve_fit(gauss, x, y, p0=p0, maxfev=5000) A, mu, sigma = popt area = A * sigma * math.sqrt(2math.pi) height = float(A) return float(mu), float(sigma), float(area), float(height) except Exception: return (float(np.mean(x)) if len(x)>0 else 0.0, 0.0, float(np.trapz(y,x)) if len(x)>0 else 0.0, float(np.max(y)) if len(y)>0 else 0.0)
def bin_generic(x: np.ndarray, y: np.ndarray, bins: int = 256, xmin: float = None, xmax: float = None) -> np.ndarray: if len(x) == 0: return np.zeros(bins, dtype=np.float32) if xmin is None: xmin = float(np.min(x)) if xmax is None: xmax = float(np.max(x)) if xmin == xmax: return np.zeros(bins, dtype=np.float32) edges = np.linspace(xmin, xmax, bins+1) binned = np.zeros(bins, dtype=np.float32) idx = np.digitize(x, edges) - 1 for i in range(len(x)): j = min(max(int(idx[i]), 0), bins-1) binned[j] += float(y[i]) s = binned.sum() if s > 0: binned = binned / s return binned
def asls_baseline(intensity: np.ndarray, lam: float = 1e5, p: float = 0.01, niter: int = 10) -> np.ndarray: L = len(intensity) if L < 3: return intensity D = np.diff(np.eye(L), 2) w = np.ones(L) for i in range(niter): W = np.diag(w) Z = W + lam * D.T.dot(D) z = np.linalg.solve(Z, w * intensity) w = p * (intensity > z) + (1-p) * (intensity < z) corrected = intensity - z corrected[corrected < 0] = 0.0 return corrected
----------------------------
Image & AFM quantification (robust)
----------------------------
def pixel_size_from_meta(meta: Dict[str,Any]) -> Optional[float]: if not meta: return None if 'pixel_size_nm' in meta: try: return float(meta['pixel_size_nm']) except Exception: pass tags = meta.get('tiff_tags', {}) for k,v in tags.items(): if isinstance(v, str) and 'nm' in v: import re m = re.search(r'([\d.]+)\s*nm', v) if m: try: return float(m.group(1)) except: pass return None
def penetration_proxy(img: np.ndarray) -> float: if img is None or img.size == 0: return 0.0 if img.ndim == 3: gray = np.mean(img.astype(np.float32), axis=2) else: gray = img.astype(np.float32) gray = gray - gray.min() if gray.max() > 0: gray = gray / gray.max() h, w = gray.shape[:2] cy, cx = h//2, w//2 ys, xs = np.indices((h,w)) r = np.sqrt((xs-cx)**2 + (ys-cy)**2).astype(np.int32) maxr = min(cx, cy) profile = [float(np.mean(gray[r==ri])) if np.any(r==ri) else 0.0 for ri in range(maxr)] if len(profile) < 5: return 0.0 prof = np.array(profile) x = np.arange(len(prof)) slope, intercept, r_val, p_val, std_err = stats.linregress(x, prof) return float(-slope)
def estimate_penetration_nm(img: np.ndarray, meta: Dict[str,Any], calibration_curve: Optional[Tuple[float,float,float]] = None) -> Tuple[float,float]: proxy = penetration_proxy(img) pixel_nm = pixel_size_from_meta(meta) if calibration_curve: slope, intercept, sigma = calibration_curve penetration_nm = slope * proxy + intercept uncertainty = abs(sigma) + 0.05 * penetration_nm return float(max(0.0, penetration_nm)), float(uncertainty) elif pixel_nm: h, w = img.shape[:2] mean_dim_nm = ((h + w) / 2.0) * pixel_nm penetration_nm = proxy * mean_dim_nm uncertainty = 0.2 * penetration_nm return float(max(0.0, penetration_nm)), float(uncertainty) else: return float(proxy), float(proxy * 0.5 + 1.0)
def image_features(img: np.ndarray, meta: Dict[str,Any], cfg: Dict[str,Any]) -> np.ndarray: if img is None or img.size == 0: return np.zeros(cfg['hist_bins'] + 8, dtype=np.float32) if img.ndim == 3: arr = np.mean(img.astype(np.float32), axis=2) else: arr = img.astype(np.float32) arr = arr - arr.min() if arr.max() > 0: arr = arr / arr.max() target = (cfg['patch_size'], cfg['patch_size']) try: pil = Image.fromarray((arr*255).astype(np.uint8)) pil = pil.resize(target, Image.BILINEAR) arr2 = np.array(pil).astype(np.float32)/255.0 except Exception: arr2 = gaussian_filter(arr, sigma=1.0) arr2 = arr2[:target[0], :target[1]] if arr2.shape != target: arr2 = np.resize(arr2, target) mean = float(arr2.mean()) std = float(arr2.std()) grad_x = float(np.mean(np.abs(np.diff(arr2, axis=1)))) grad_y = float(np.mean(np.abs(np.diff(arr2, axis=0)))) hist, _ = np.histogram(arr2, bins=cfg['hist_bins'], range=(0,1)) hist = hist.astype(np.float32) / (hist.sum()+1e-9) tail_ratio = float(np.sum(arr2 < 0.1) / (arr2.size + 1e-9)) penetration_nm, penetration_unc = estimate_penetration_nm(img, meta, calibration_curve=None) return np.concatenate([[mean, std, grad_x, grad_y, tail_ratio, penetration_nm, penetration_unc], hist])
def calibrate_afm(force_raw: np.ndarray, probe_k_N_per_m: Optional[float], sensitivity_nm_per_V: Optional[float]) -> np.ndarray: if probe_k_N_per_m and sensitivity_nm_per_V: deflection_nm = force_raw * sensitivity_nm_per_V force_N = probe_k_N_per_m * (deflection_nm * 1e-9) return force_N * 1e9 # nN else: return force_raw
def afm_features(distance: np.ndarray, force_raw: np.ndarray, probe_k: Optional[float], sensitivity: Optional[float]) -> Tuple[np.ndarray, Dict[str,Any]]: if distance is None or force_raw is None or len(force_raw)==0: return np.zeros(5, dtype=np.float32), {'probe_k': probe_k, 'sensitivity': sensitivity} force_nN = calibrate_afm(force_raw, probe_k, sensitivity) meta = {'probe_k': probe_k, 'sensitivity': sensitivity} if len(force_nN) < 3: return np.zeros(5, dtype=np.float32), meta max_f = float(np.max(force_nN)) min_f = float(np.min(force_nN)) adhesion = float(-min_f) if min_f < 0 else 0.0 slope = float((force_nN[-1] - force_nN[0]) / (distance[-1] - distance[0] + 1e-9)) auc = float(np.trapz(force_nN, distance)) return np.array([max_f, min_f, adhesion, slope, auc], dtype=np.float32), meta
----------------------------
Dataset & feature assembly (robust with synthetic minimal features)
----------------------------
class MultimodalSample: def init(self, sample_id: str, meta: Dict[str,Any], gcms_bytes: Optional[bytes]=None, spectrum_bytes: Optional[bytes]=None, image_bytes: Optional[bytes]=None, afm_bytes: Optional[bytes]=None, cs14c_bytes: Optional[bytes]=None, label: Optional[int]=None): self.sample_id = sample_id self.meta = meta or {} self.gcms_bytes = gcms_bytes self.spectrum_bytes = spectrum_bytes self.image_bytes = image_bytes self.afm_bytes = afm_bytes self.cs14c_bytes = cs14c_bytes self.label = label
class MultimodalDataset: def init(self, samples: List[MultimodalSample], cfg: Dict[str,Any]): self.samples = samples self.cfg = cfg
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
s = self.samples[idx]
feats = {}
# GC-MS
try:
parsed = parse_mzml_precise(s.gcms_bytes)
peaks = parsed.get('peaks', pd.DataFrame(columns=['mz','intensity','rt']))
feats['gcms'] = extract_gcms_features(peaks, self.cfg['gcms'])
except Exception as e:
logger.exception("GCMS error %s: %s", s.sample_id, e)
audit_error(self.cfg.get('audit_dir','audit_logs'), s.sample_id, ERROR_CODES['PARSE_GCMS_FAIL'], str(e), s.gcms_bytes)
feats['gcms'] = synthesize_gcms_minimal(self.cfg['gcms'])
# Spectrum
try:
sp = parse_jcamp_precise(s.spectrum_bytes)
feats['spectrum'] = spectrum_features(sp.get('wn', np.array([])), sp.get('intensity', np.array([])), self.cfg['spectrum']['marker_peaks'], self.cfg['spectrum']['binned_bins'])
except Exception as e:
logger.exception("Spectrum error %s: %s", s.sample_id, e)
audit_error(self.cfg.get('audit_dir','audit_logs'), s.sample_id, ERROR_CODES['PARSE_SPECTRUM_FAIL'], str(e), s.spectrum_bytes)
feats['spectrum'] = synthesize_spectrum_minimal(self.cfg['spectrum'])
# Image
try:
img, img_meta = parse_sem_tiff_precise(s.image_bytes)
feats['image'] = image_features(img, img_meta, self.cfg['image'])
except Exception as e:
logger.exception("Image error %s: %s", s.sample_id, e)
audit_error(self.cfg.get('audit_dir','audit_logs'), s.sample_id, ERROR_CODES['PARSE_IMAGE_FAIL'], str(e), s.image_bytes)
feats['image'] = synthesize_image_minimal(self.cfg['image'])
# AFM
try:
afm = parse_afm_precise(s.afm_bytes)
afm_feat, afm_meta = afm_features(afm.get('distance', np.array([])), afm.get('force', np.array([])), s.meta.get('afm_probe_k_N_per_m'), s.meta.get('afm_sensitivity_nm_per_V'))
feats['afm'] = afm_feat
except Exception as e:
logger.exception("AFM error %s: %s", s.sample_id, e)
audit_error(self.cfg.get('audit_dir','audit_logs'), s.sample_id, ERROR_CODES['PARSE_AFM_FAIL'], str(e), s.afm_bytes)
feats['afm'] = synthesize_afm_minimal()
# meta features
env = s.meta.get('environment', {})
temp = float(env.get('temperature_c', 20.0))
rh = float(env.get('relative_humidity', 40.0))
material = s.meta.get('object_material', 'unknown')
material_code = 0.0
if isinstance(material, str):
if material.lower() in ['ceramic','pottery','porcelain']:
material_code = 1.0
elif material.lower() in ['bronze','metal']:
material_code = 2.0
meta_feat = np.array([temp, rh, material_code], dtype=np.float32)
x = np.concatenate([feats['gcms'], feats['spectrum'], feats['image'], feats['afm'], meta_feat])
y = np.array([s.label], dtype=np.int64) if s.label is not None else np.array([-1], dtype=np.int64)
return {'x': x, 'y': y, 'sample_id': s.sample_id, 'meta': s.meta}
----------------------------
Synthetic minimal feature generators (when modality missing)
----------------------------
def synthesize_gcms_minimal(cfg: Dict[str,Any]) -> np.ndarray: dim = cfg['binned_bins'] + len(cfg.get('marker_windows',[])) + cfg.get('topk',256) # small random noise stable seed return np.zeros(dim, dtype=np.float32)
def synthesize_spectrum_minimal(cfg: Dict[str,Any]) -> np.ndarray: dim = cfg['binned_bins'] + len(cfg.get('marker_peaks',[])) + 1 return np.zeros(dim, dtype=np.float32)
def synthesize_image_minimal(cfg: Dict[str,Any]) -> np.ndarray: dim = cfg['hist_bins'] + 8 return np.zeros(dim, dtype=np.float32)
def synthesize_afm_minimal() -> np.ndarray: return np.zeros(5, dtype=np.float32)
----------------------------
Spectrum features helper
----------------------------
def spectrum_features(wn: np.ndarray, inten: np.ndarray, marker_peaks: List[float], bins: int) -> np.ndarray: if len(wn) == 0 or len(inten) == 0: return synthesize_spectrum_minimal({'binned_bins': bins, 'marker_peaks': marker_peaks}) inten_corr = asls_baseline(inten) binned = bin_generic(wn, inten_corr, bins=bins, xmin=wn.min(), xmax=wn.max()) markers = [] for pk in marker_peaks: idx = np.argmin(np.abs(wn - pk)) markers.append(float(inten_corr[idx]) / (inten_corr.sum()+1e-9)) peak_count = float(np.sum(inten_corr > (np.mean(inten_corr) + 2*np.std(inten_corr)))) return np.concatenate([binned, np.array(markers, dtype=np.float32), np.array([peak_count], dtype=np.float32)])
----------------------------
GC-MS feature extraction (precise)
----------------------------
def extract_gcms_features(peaks_df: pd.DataFrame, cfg: Dict[str,Any]) -> np.ndarray: if peaks_df is None or len(peaks_df)==0: return synthesize_gcms_minimal(cfg) try: mz = peaks_df['mz'].values.astype(np.float64) inten = peaks_df['intensity'].values.astype(np.float64) except Exception: return synthesize_gcms_minimal(cfg) binned = bin_generic(mz, inten, bins=cfg['binned_bins'], xmin=mz.min(), xmax=mz.max()) marker_vals = [] for w in cfg.get('marker_windows', []): lo, hi = w mask = (mz >= lo) & (mz <= hi) val = float(inten[mask].sum()) / (inten.sum()+1e-9) marker_vals.append(val) K = cfg.get('topk', 256) try: order = np.argsort(inten)[-K:] if len(inten) >= K else np.argsort(inten) top_mz = np.sort(mz[order]) top_vec = np.zeros(K, dtype=np.float32) for i, val in enumerate(top_mz[:K]): top_vec[i] = float((val % 1000) / 1000.0) except Exception: top_vec = np.zeros(K, dtype=np.float32) feat = np.concatenate([binned.astype(np.float32), np.array(marker_vals, dtype=np.float32), top_vec]) return feat
----------------------------
Submodels (XGBoost)
----------------------------
class XGBModel: def init(self, params: Dict[str,Any] = None): self.params = params or {'objective':'binary:logistic','eval_metric':'auc','use_label_encoder':False} self.model = None
def train(self, X: np.ndarray, y: np.ndarray, num_round: int = 200):
dtrain = xgb.DMatrix(X, label=y)
self.model = xgb.train(self.params, dtrain, num_round)
def predict(self, X: np.ndarray) -> np.ndarray:
if self.model is None:
raise RuntimeError("Model not trained")
d = xgb.DMatrix(X)
return self.model.predict(d)
def save(self, path: str):
ensure_dir(os.path.dirname(path) or '.')
self.model.save_model(path)
def load(self, path: str):
self.model = xgb.Booster()
self.model.load_model(path)
----------------------------
Fusion & Bayesian age posterior (MCMC template + conjugate fallback)
----------------------------
def fusion_posterior(ps: Dict[str,float], weights: Dict[str,float], prior: float = 0.5) -> float: eps = 1e-9 def logit(p): return math.log((p+eps)/(1-p+eps)) logits = np.array([logit(ps.get(k,0.5)) for k in ['gcms','image','spectrum','afm']]) w = np.array([weights.get(k,0.25) for k in ['gcms','image','spectrum','afm']]) combined = np.dot(w, logits) + logit(prior) posterior = 1.0 / (1.0 + math.exp(-combined)) return posterior
def bayesian_age_mcmc(prior_mu: float, prior_sigma: float, evidence: List[Tuple[float,float]], cs14c_records: Optional[List[Dict[str,Any]]] = None, draws: int = 1000, tune: int = 500): if pm is None: mu, sigma, ci = bayesian_age_conjugate((prior_mu, prior_sigma), evidence, cs14c_records) return {'mu': mu, 'sigma': sigma, 'ci95': ci, 'method': 'conjugate_fallback'} with pm.Model() as model: age = pm.Normal('age', mu=prior_mu, sigma=prior_sigma) for i, (mu_i, sigma_i) in enumerate(evidence): pm.Normal(f'evidence_{i}', mu=age, sigma=max(1.0, sigma_i), observed=mu_i) if cs14c_records: for j, rec in enumerate(cs14c_records): mu_c = float(rec.get('age_bp')) sigma_c = float(rec.get('age_sigma', 20.0)) pm.Normal(f'cs14c_{j}', mu=age, sigma=max(1.0, sigma_c), observed=mu_c) trace = pm.sample(draws=draws, tune=tune, chains=2, cores=1, return_inferencedata=True, progressbar=False) summary = az.summary(trace, hdi_prob=0.95) mu_post = float(summary.loc['age','mean']) sd_post = float(summary.loc['age','sd']) hdi = az.hdi(trace, var_names=['age'], hdi_prob=0.95) ci95 = (float(hdi['age'][0]), float(hdi['age'][1])) return {'mu': mu_post, 'sigma': sd_post, 'ci95': ci95, 'method': 'mcmc'}
def bayesian_age_conjugate(prior: Tuple[float,float], evidence: List[Tuple[float,float]], cs14c_records: Optional[List[Dict[str,Any]]] = None): mu0, s0 = prior var0 = s02 mu_n = mu0 var_n = var0 for mu_i, s_i in evidence: var_i = max(1.0, s_i2) var_n = 1.0 / (1.0/var_n + 1.0/var_i) mu_n = var_n * (mu_n/var_n + mu_i/var_i) if cs14c_records: for rec in cs14c_records: mu_c = float(rec.get('age_bp')) sigma_c = float(rec.get('age_sigma', 20.0)) var_c = sigma_c*2 var_n = 1.0 / (1.0/var_n + 1.0/var_c) mu_n = var_n * (mu_n/var_n + mu_c/var_c) sigma_n = math.sqrt(var_n) ci95 = (mu_n - 1.96sigma_n, mu_n + 1.96*sigma_n) return mu_n, sigma_n, ci95
----------------------------
Calibration & evaluation
----------------------------
def calibrate_probs(probs: np.ndarray, labels: np.ndarray) -> Dict[str,Any]: iso = IsotonicRegression(out_of_bounds='clip') try: iso.fit(probs, labels) return {'type':'isotonic','model':iso} except Exception: return {'type':'identity'}
def apply_calib(calib: Dict[str,Any], probs: np.ndarray) -> np.ndarray: if calib is None or calib.get('type') == 'identity': return probs if calib.get('type') == 'isotonic': iso = calib['model'] return iso.transform(probs) return probs
----------------------------
Training pipeline (robust)
----------------------------
def train_pipeline(samples: List[MultimodalSample], cfg: Dict[str,Any], out_dir: str): ensure_dir(out_dir) ds = MultimodalDataset(samples, cfg) X = [] Y = [] for i in range(len(ds)): item = ds[i] X.append(item['x']) Y.append(int(item['y'][0])) X = np.stack(X).astype(np.float32) Y = np.array(Y).astype(np.int64) X_train, X_val, y_train, y_val = train_test_split(X, Y, test_size=0.2, stratify=Y, random_state=cfg['training']['random_seed']) # modality dims gcms_dim = cfg['gcms']['binned_bins'] + len(cfg['gcms'].get('marker_windows',[])) + cfg['gcms'].get('topk',256) spec_dim = cfg['spectrum']['binned_bins'] + len(cfg['spectrum'].get('marker_peaks',[])) + 1 image_dim = cfg['image']['hist_bins'] + 8 afm_dim = 5 meta_dim = 3 idx = 0 gcms_idx = (idx, idx+gcms_dim); idx += gcms_dim spec_idx = (idx, idx+spec_dim); idx += spec_dim image_idx = (idx, idx+image_dim); idx += image_dim afm_idx = (idx, idx+afm_dim); idx += afm_dim meta_idx = (idx, idx+meta_dim); idx += meta_dim
# train submodels
model_g = XGBModel(); model_g.train(X_train[:, gcms_idx[0]:gcms_idx[1]], y_train, num_round=cfg['training']['xgb_rounds'])
model_s = XGBModel(); model_s.train(X_train[:, spec_idx[0]:spec_idx[1]], y_train, num_round=cfg['training']['xgb_rounds'])
model_i = XGBModel(); model_i.train(X_train[:, image_idx[0]:image_idx[1]], y_train, num_round=cfg['training']['xgb_rounds'])
model_a = XGBModel(); model_a.train(X_train[:, afm_idx[0]:afm_idx[1]], y_train, num_round=max(50, cfg['training']['xgb_rounds']//4))
# validation preds
pg = model_g.predict(X_val[:, gcms_idx[0]:gcms_idx[1]])
ps = model_s.predict(X_val[:, spec_idx[0]:spec_idx[1]])
pi = model_i.predict(X_val[:, image_idx[0]:image_idx[1]])
pa = model_a.predict(X_val[:, afm_idx[0]:afm_idx[1]])
# weights from AUCs
def auc_safe(y_true, y_score):
try:
return roc_auc_score(y_true, y_score)
except:
return 0.5
auc_g = auc_safe(y_val, pg)
auc_s = auc_safe(y_val, ps)
auc_i = auc_safe(y_val, pi)
auc_a = auc_safe(y_val, pa)
raw = np.array([max(0.01, auc_g), max(0.01, auc_i), max(0.01, auc_s), max(0.01, auc_a)])
weights = (raw / raw.sum()).tolist()
weights_dict = {'gcms': weights[0], 'image': weights[1], 'spectrum': weights[2], 'afm': weights[3]}
logger.info("Modality weights: %s", weights_dict)
fused = []
for i in range(len(pg)):
psd = {'gcms': float(pg[i]), 'image': float(pi[i]), 'spectrum': float(ps[i]), 'afm': float(pa[i])}
fused.append(fusion_posterior(psd, weights_dict, prior=0.5))
fused = np.array(fused)
calib = calibrate_probs(fused, y_val)
fused_cal = apply_calib(calib, fused)
auc_fused = auc_safe(y_val, fused_cal)
preds = (fused_cal >= 0.5).astype(int)
precision, recall, f1, _ = precision_recall_fscore_support(y_val, preds, average='binary', zero_division=0)
logger.info("Fused metrics: AUC=%.4f Precision=%.4f Recall=%.4f F1=%.4f", auc_fused, precision, recall, f1)
# save
ensure_dir(out_dir)
model_g.save(os.path.join(out_dir, "gcms.xgb"))
model_s.save(os.path.join(out_dir, "spectrum.xgb"))
model_i.save(os.path.join(out_dir, "image.xgb"))
model_a.save(os.path.join(out_dir, "afm.xgb"))
meta = {'weights': weights_dict, 'calibration': {'type': calib.get('type', 'identity')}, 'cfg': cfg, 'model_version': cfg.get('model_version')}
with open(os.path.join(out_dir, "fusion_meta.json"), "w", encoding='utf-8') as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
import pickle
with open(os.path.join(out_dir, "calibration.pkl"), "wb") as f:
pickle.dump(calib, f)
logger.info("Training complete. Models saved to %s", out_dir)
return {'auc': auc_fused, 'precision': precision, 'recall': recall, 'f1': f1, 'weights': weights_dict}
----------------------------
Inference engine (robust)
----------------------------
class InferenceEngine: def init(self, model_dir: str, cfg: Dict[str,Any], secret: str): self.model_dir = model_dir self.cfg = cfg self.secret = secret self._load()
def _load(self):
self.model_g = XGBModel(); self.model_s = XGBModel(); self.model_i = XGBModel(); self.model_a = XGBModel()
try:
self.model_g.load(os.path.join(self.model_dir, "gcms.xgb"))
self.model_s.load(os.path.join(self.model_dir, "spectrum.xgb"))
self.model_i.load(os.path.join(self.model_dir, "image.xgb"))
self.model_a.load(os.path.join(self.model_dir, "afm.xgb"))
except Exception:
logger.warning("Model files not found; inference will use defaults")
try:
with open(os.path.join(self.model_dir, "fusion_meta.json"), "r", encoding='utf-8') as f:
meta = json.load(f); self.weights = meta.get('weights', {'gcms':0.4,'image':0.3,'spectrum':0.2,'afm':0.1})
except Exception:
self.weights = {'gcms':0.4,'image':0.3,'spectrum':0.2,'afm':0.1}
try:
import pickle
with open(os.path.join(self.model_dir, "calibration.pkl"), "rb") as f:
self.calib = pickle.load(f)
except Exception:
self.calib = {'type':'identity'}
logger.info("Engine loaded from %s", self.model_dir)
def infer(self, sample: MultimodalSample, return_explain: bool = False) -> Dict[str,Any]:
try:
ds = MultimodalDataset([sample], self.cfg)
item = ds[0]
x = item['x'].reshape(1,-1)
# indices
gcms_dim = self.cfg['gcms']['binned_bins'] + len(self.cfg['gcms'].get('marker_windows',[])) + self.cfg['gcms'].get('topk',256)
spec_dim = self.cfg['spectrum']['binned_bins'] + len(self.cfg['spectrum'].get('marker_peaks',[])) + 1
image_dim = self.cfg['image']['hist_bins'] + 8
afm_dim = 5
idx = 0
gcms_idx = (idx, idx+gcms_dim); idx += gcms_dim
spec_idx = (idx, idx+spec_dim); idx += spec_dim
image_idx = (idx, idx+image_dim); idx += image_dim
afm_idx = (idx, idx+afm_dim); idx += afm_dim
try:
p_g = float(self.model_g.predict(x[:, gcms_idx[0]:gcms_idx[1]])[0])
except Exception:
p_g = 0.5
try:
p_s = float(self.model_s.predict(x[:, spec_idx[0]:spec_idx[1]])[0])
except Exception:
p_s = 0.5
try:
p_i = float(self.model_i.predict(x[:, image_idx[0]:image_idx[1]])[0])
except Exception:
p_i = 0.5
try:
p_a = float(self.model_a.predict(x[:, afm_idx[0]:afm_idx[1]])[0])
except Exception:
p_a = 0.5
psd = {'gcms': p_g, 'image': p_i, 'spectrum': p_s, 'afm': p_a}
posterior = fusion_posterior(psd, self.weights, prior=0.5)
posterior_cal = apply_calib(self.calib, np.array([posterior]))[0]
verdict = "ancient_oil" if posterior_cal >= self.cfg['fusion']['confidence_high'] else ("modern_oil" if posterior_cal < self.cfg['fusion']['confidence_low'] else "inconclusive")
cs14c_recs = None
if sample.cs14c_bytes:
try:
cs14c = parse_cs14c_precise(sample.cs14c_bytes)
cs14c_recs = cs14c.get('records', [])
except Exception:
cs14c_recs = None
evidence_age = [] # placeholder for submodel age proxies
age_post = bayesian_age_mcmc(200.0, 200.0, evidence_age, cs14c_recs, draws=self.cfg['bayes']['mcmc_draws'], tune=self.cfg['bayes']['mcmc_tune']) if pm is not None else {'mu': None, 'sigma': None, 'ci95': (None,None), 'method':'conjugate_fallback'}
explain = {}
if shap is not None:
try:
explainer = shap.TreeExplainer(self.model_g.model)
shap_vals = explainer.shap_values(x[:, gcms_idx[0]:gcms_idx[1]])
explain['gcms_shap_mean_abs'] = float(np.mean(np.abs(shap_vals)))
except Exception:
explain['gcms_shap_mean_abs'] = None
else:
def logit(p): return math.log((p+1e-9)/(1-p+1e-9))
contrib = {k: float(self.weights.get(k,0.25) * abs(logit(psd.get(k,0.5)))) for k in psd}
explain['modality_contrib_proxy'] = contrib
files_bytes = [sample.gcms_bytes or b'', sample.spectrum_bytes or b'', sample.image_bytes or b'', sample.afm_bytes or b'', sample.cs14c_bytes or b'']
prov = compute_provenance(sample.meta, files_bytes, self.secret)
report = {
"sample_id": sample.sample_id,
"verdict": verdict,
"confidence": float(posterior_cal),
"age_posterior": age_post,
"evidence_probs": psd,
"explainability": explain,
"modality_weights": self.weights,
"model_version": self.cfg.get('model_version'),
"provenance": prov,
"timestamp": now_iso(),
"notes": "Algorithmic result. For high-stakes use, require CS-14C compound-specific dating and expert review."
}
audit_dir = self.cfg.get('audit_dir', DEFAULT_CONFIG['audit_dir'])
ensure_dir(audit_dir)
fname = os.path.join(audit_dir, f"{sample.sample_id}_{int(time.time())}.json")
with open(fname, "w", encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
return report
except Exception as e:
logger.exception("Inference failed for %s: %s", sample.sample_id, e)
audit_error(self.cfg.get('audit_dir','audit_logs'), sample.sample_id, ERROR_CODES['INFERENCE_FAIL'], str(e))
return {"sample_id": sample.sample_id, "verdict": "error", "confidence": 0.0, "error": str(e)}
----------------------------
Blind-test harness
----------------------------
def run_blind_test(engine: InferenceEngine, samples: List[MultimodalSample]) -> Dict[str,Any]: y_true = [] probs = [] reports = [] for s in samples: r = engine.infer(s) prob = r.get('confidence', 0.0) label = s.label if s.label is not None else int(s.meta.get('label', 0)) y_true.append(label) probs.append(prob) reports.append(r) y_true = np.array(y_true) probs = np.array(probs) auc = roc_auc_score(y_true, probs) if len(np.unique(y_true))>1 else 0.5 preds = (probs >= 0.5).astype(int) precision, recall, f1, _ = precision_recall_fscore_support(y_true, preds, average='binary', zero_division=0) return {"n": len(samples), "auc": auc, "precision": precision, "recall": recall, "f1": f1, "reports": reports}
----------------------------
AutoOrchestrator (robust)
----------------------------
class AutoOrchestrator: def init(self, cfg: Dict[str,Any]): self.cfg = cfg self.watch_dir = cfg.get('auto',{}).get('watch_dir', DEFAULT_CONFIG['auto']['watch_dir']) self.poll_interval = cfg.get('auto',{}).get('poll_interval_s', DEFAULT_CONFIG['auto']['poll_interval_s']) self.max_workers = cfg.get('auto',{}).get('max_workers', DEFAULT_CONFIG['auto']['max_workers']) self.human_threshold = cfg.get('auto',{}).get('human_review_confidence_threshold', DEFAULT_CONFIG['auto']['human_review_confidence_threshold']) self.auto_accept = cfg.get('auto',{}).get('auto_accept_confidence', DEFAULT_CONFIG['auto']['auto_accept_confidence']) self.queue = Queue(maxsize=cfg.get('auto',{}).get('max_queue_size', DEFAULT_CONFIG['auto']['max_queue_size'])) self.workers: List[Thread] = [] self.stop_event = Event() self.engine = None ensure_dir(self.watch_dir) ensure_dir(self.cfg.get('audit_dir', DEFAULT_CONFIG['audit_dir']))
def detect_input(self, path: str) -> Dict[str,Any]:
name = os.path.basename(path).lower()
if name.endswith('.json'):
try:
with open(path, 'r', encoding='utf-8') as f:
obj = json.load(f)
if isinstance(obj, list) and all('sample_id' in r for r in obj):
return {'task':'train_manifest','payload':{'manifest_path':path}}
if isinstance(obj, dict) and obj.get('type') == 'sample':
return {'task':'sample_json','payload':{'sample_json':path}}
except Exception:
pass
if name.endswith('.csv'):
try:
with open(path, 'r', encoding='utf-8') as f:
header = f.readline().lower()
if 'age_bp' in header and 'compound_id' in header:
return {'task':'cs14c_report','payload':{'path':path}}
if 'mz' in header and 'intensity' in header:
return {'task':'peak_table','payload':{'path':path}}
except Exception:
pass
if name.endswith(('.tif','.tiff','.png','.jpg','.jpeg')):
return {'task':'image_file','payload':{'path':path}}
return {'task':'unknown','payload':{'path':path}}
def route_task(self, detected: Dict[str,Any]):
t = detected.get('task')
if t == 'train_manifest':
self.enqueue({'type':'train','manifest_path': detected['payload']['manifest_path']})
elif t == 'sample_json':
self.enqueue({'type':'infer_from_json','sample_json': detected['payload']['sample_json']})
elif t == 'cs14c_report':
self.enqueue({'type':'calibrate_cs14c','path': detected['payload']['path']})
elif t == 'peak_table':
self.enqueue({'type':'infer_from_files','files': {'gcms': detected['payload']['path']}})
elif t == 'image_file':
self.enqueue({'type':'infer_from_files','files': {'image': detected['payload']['path']}})
else:
logger.info("Unknown input type for %s; moving to quarantine", detected['payload'].get('path'))
self.quarantine_file(detected['payload'].get('path'))
def enqueue(self, job: Dict[str,Any]):
try:
self.queue.put_nowait(job)
logger.info("Enqueued job: %s", job.get('type'))
except Exception as e:
logger.exception("Queue full or error enqueuing job: %s", e)
def worker_loop(self):
while not self.stop_event.is_set():
try:
job = self.queue.get(timeout=1)
except Empty:
continue
try:
self.execute_job(job)
except Exception as e:
logger.exception("Job execution failed: %s", e)
finally:
self.queue.task_done()
def execute_job(self, job: Dict[str,Any]):
jtype = job.get('type')
logger.info("Executing job type: %s", jtype)
if jtype == 'train':
manifest = job.get('manifest_path')
self._do_train(manifest)
elif jtype == 'infer_from_json':
sample_json = job.get('sample_json')
self._do_infer_from_json(sample_json)
elif jtype == 'infer_from_files':
files = job.get('files',{})
self._do_infer_from_files(files)
elif jtype == 'calibrate_cs14c':
path = job.get('path')
self._do_calibrate_cs14c(path)
else:
logger.warning("Unknown job type: %s", jtype)
def _ensure_engine(self):
if self.engine is None:
model_dir = self.cfg.get('model_dir','models')
secret = os.environ.get(self.cfg.get('provenance_secret_env','PROVENANCE_SECRET'),'change_this_secret')
self.engine = InferenceEngine(model_dir, self.cfg, secret)
def _do_train(self, manifest_path: str):
logger.info("Starting training from manifest: %s", manifest_path)
with open(manifest_path, 'r', encoding='utf-8') as f:
manifest = json.load(f)
samples = []
for rec in manifest:
sid = rec['sample_id']
meta = json.load(open(rec['meta_json_path'],'r',encoding='utf-8')) if rec.get('meta_json_path') else {}
def read_opt(p): return open(p,'rb').read() if p and os.path.exists(p) else None
samples.append(MultimodalSample(sample_id=sid, meta=meta,
gcms_bytes=read_opt(rec.get('gcms_path')),
spectrum_bytes=read_opt(rec.get('spectrum_path')),
image_bytes=read_opt(rec.get('image_path')),
afm_bytes=read_opt(rec.get('afm_path')),
cs14c_bytes=read_opt(rec.get('cs14c_path')),
label=rec.get('label')))
res = train_pipeline(samples, self.cfg, self.cfg.get('model_dir','models'))
logger.info("Training finished: %s", res)
self._write_audit({'task':'train','manifest':manifest_path,'result':res,'timestamp':now_iso()})
def _do_infer_from_json(self, sample_json: str):
with open(sample_json, 'r', encoding='utf-8') as f:
rec = json.load(f)
def read_opt(p): return open(p,'rb').read() if p and os.path.exists(p) else None
sample = MultimodalSample(sample_id=rec.get('sample_id', str(uuid.uuid4())), meta=rec.get('meta', {}),
gcms_bytes=read_opt(rec.get('gcms_path')),
spectrum_bytes=read_opt(rec.get('spectrum_path')),
image_bytes=read_opt(rec.get('image_path')),
afm_bytes=read_opt(rec.get('afm_path')),
cs14c_bytes=read_opt(rec.get('cs14c_path')))
self._do_infer_sample(sample)
def _do_infer_from_files(self, files: Dict[str,str]):
meta = {}
sample = MultimodalSample(sample_id=str(uuid.uuid4()), meta=meta,
gcms_bytes=open(files['gcms'],'rb').read() if files.get('gcms') and os.path.exists(files.get('gcms')) else None,
spectrum_bytes=open(files['spectrum'],'rb').read() if files.get('spectrum') and os.path.exists(files.get('spectrum')) else None,
image_bytes=open(files['image'],'rb').read() if files.get('image') and os.path.exists(files.get('image')) else None,
afm_bytes=open(files['afm'],'rb').read() if files.get('afm') and os.path.exists(files.get('afm')) else None)
self._do_infer_sample(sample)
def _do_infer_sample(self, sample: MultimodalSample):
self._ensure_engine()
report = self.engine.infer(sample)
logger.info("Inference report: %s", json.dumps(report, ensure_ascii=False))
self._write_audit({'task':'infer','sample_id':sample.sample_id,'report':report,'timestamp':now_iso()})
conf = float(report.get('confidence',0.0))
if conf >= self.auto_accept:
logger.info("Auto-accepting result for %s (conf=%.3f)", sample.sample_id, conf)
self._publish_report(report)
elif conf < self.human_threshold:
logger.info("Low confidence -> trigger human review for %s (conf=%.3f)", sample.sample_id, conf)
self._trigger_human_review(report, reason="low_confidence")
else:
logger.info("Intermediate confidence -> queue for human review: %s (conf=%.3f)", sample.sample_id, conf)
self._trigger_human_review(report, reason="intermediate_confidence")
def _do_calibrate_cs14c(self, path: str):
logger.info("Processing CS-14C calibration report: %s", path)
try:
recs = parse_cs14c_precise(open(path,'rb').read())
self._write_audit({'task':'calibrate_cs14c','path':path,'records_count':len(recs.get('records',[])),'timestamp':now_iso()})
logger.info("CS-14C calibration records processed: %d", len(recs.get('records',[])))
except Exception as e:
logger.exception("Failed to process CS-14C: %s", e)
self._write_audit({'task':'calibrate_cs14c','path':path,'error':str(e),'timestamp':now_iso()})
def _publish_report(self, report: Dict[str,Any]):
out_dir = os.path.join(self.cfg.get('audit_dir','audit_logs'),'published')
ensure_dir(out_dir)
fname = os.path.join(out_dir, f"{report['sample_id']}_{int(time.time())}.json")
with open(fname, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
logger.info("Published report to %s", fname)
def _trigger_human_review(self, report: Dict[str,Any], reason: str):
review_dir = os.path.join(self.cfg.get('audit_dir','audit_logs'),'human_review')
ensure_dir(review_dir)
fname = os.path.join(review_dir, f"{report['sample_id']}_{int(time.time())}.json")
review_pkg = {'report': report, 'reason': reason, 'timestamp': now_iso()}
with open(fname, 'w', encoding='utf-8') as f:
json.dump(review_pkg, f, ensure_ascii=False, indent=2)
logger.info("Human review package created: %s", fname)
def _write_audit(self, audit: Dict[str,Any]):
audit_dir = self.cfg.get('audit_dir','audit_logs')
ensure_dir(audit_dir)
fname = os.path.join(audit_dir, f"audit_{int(time.time())}_{uuid.uuid4().hex[:8]}.json")
with open(fname, 'w', encoding='utf-8') as f:
json.dump(audit, f, ensure_ascii=False, indent=2)
logger.debug("Audit written: %s", fname)
def quarantine_file(self, path: str):
qdir = os.path.join(self.watch_dir, 'quarantine')
ensure_dir(qdir)
try:
base = os.path.basename(path)
dest = os.path.join(qdir, base)
os.rename(path, dest)
logger.info("Moved unknown file to quarantine: %s", dest)
except Exception:
logger.exception("Failed to quarantine file: %s", path)
def start_workers(self):
for i in range(self.max_workers):
t = Thread(target=self.worker_loop, daemon=True)
t.start()
self.workers.append(t)
logger.info("Started %d worker threads", len(self.workers))
def stop(self):
self.stop_event.set()
logger.info("Stop signal set; waiting for workers to finish")
for t in self.workers:
t.join(timeout=2)
def watch_loop(self):
logger.info("Starting watch loop on %s", self.watch_dir)
seen = set()
while not self.stop_event.is_set():
try:
files = [os.path.join(self.watch_dir, f) for f in os.listdir(self.watch_dir) if not f.startswith('.')]
except Exception:
files = []
for p in files:
if p in seen:
continue
if os.path.isdir(p):
seen.add(p)
continue
try:
detected = self.detect_input(p)
self.route_task(detected)
seen.add(p)
except Exception as e:
logger.exception("Error detecting/routing file %s: %s", p, e)
self.quarantine_file(p)
time.sleep(self.poll_interval)
def run_daemon(self):
logger.info("AutoOrchestrator starting daemon")
self.start_workers()
watch_thread = Thread(target=self.watch_loop, daemon=True)
watch_thread.start()
try:
while not self.stop_event.is_set():
time.sleep(1)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received; stopping orchestrator")
self.stop()
watch_thread.join(timeout=1)
logger.info("Orchestrator stopped")
----------------------------
FastAPI UI prototype (optional)
----------------------------
if FastAPI is not None: @asynccontextmanager async def lifespan(app: FastAPI): model_dir = os.environ.get("MODEL_DIR", DEFAULT_CONFIG['model_dir']) cfg_path = os.environ.get("CONFIG_PATH", None) if cfg_path and os.path.exists(cfg_path): with open(cfg_path, 'r', encoding='utf-8') as f: cfg = yaml.safe_load(f) else: cfg = DEFAULT_CONFIG secret = os.environ.get(cfg.get('provenance_secret_env', 'PROVENANCE_SECRET'), 'change_this_secret') try: app.state.orchestrator = AutoOrchestrator(cfg) app.state.orchestrator.start_workers() logger.info("Orchestrator initialized in FastAPI lifespan") except Exception as e: logger.exception("Failed to initialize orchestrator: %s", e) app.state.orchestrator = None yield if app.state.orchestrator: app.state.orchestrator.stop()
app = FastAPI(title="ForensicOilCompleteReform", version=DEFAULT_CONFIG['model_version'], lifespan=lifespan)
@app.post("/upload")
async def upload_sample(sample_meta: str, gcms: Optional[UploadFile] = File(None), spectrum: Optional[UploadFile] = File(None), image: Optional[UploadFile] = File(None), afm: Optional[UploadFile] = File(None), cs14c: Optional[UploadFile] = File(None)):
if app.state.orchestrator is None:
raise HTTPException(status_code=503, detail="Orchestrator not ready")
try:
meta_obj = json.loads(sample_meta)
except Exception:
raise HTTPException(status_code=400, detail="Invalid meta JSON")
sid = meta_obj.get('sample_id', str(uuid.uuid4()))
gcms_b = await gcms.read() if gcms else None
spec_b = await spectrum.read() if spectrum else None
img_b = await image.read() if image else None
afm_b = await afm.read() if afm else None
cs14c_b = await cs14c.read() if cs14c else None
sample = MultimodalSample(sample_id=sid, meta=meta_obj.get('meta',{}), gcms_bytes=gcms_b, spectrum_bytes=spec_b, image_bytes=img_b, afm_bytes=afm_b, cs14c_bytes=cs14c_b)
app.state.orchestrator.enqueue({'type':'infer','sample':sample})
return JSONResponse({"sample_id": sid, "status":"queued"})
@app.get("/", response_class=HTMLResponse)
async def index():
html = """
<html><head><title>Forensic Oil Review</title></head><body>
<h2>Forensic Oil Residue System (Complete Reform)</h2>
<p>Use /upload to POST sample meta and files. This is a prototype UI.</p>
</body></html>
"""
return HTMLResponse(html)
----------------------------
CLI helpers & tests
----------------------------
def build_dummy_samples(n=200) -> List[MultimodalSample]: samples = [] for i in range(n): label = 1 if i < n//2 else 0 meta = {"sample_id": f"synthetic_{i}", "object_material": "ceramic", "environment": {"temperature_c": 20.0 + np.random.randn(), "relative_humidity": 40.0 + np.random.randn()*5}} # use placeholders to test robust parsing samples.append(MultimodalSample(sample_id=meta['sample_id'], meta=meta, gcms_bytes=b"mzml_placeholder", spectrum_bytes=b"spectrum_placeholder", image_bytes=b"tiff_placeholder", afm_bytes=b"afm_placeholder", label=label)) return samples
def generate_deploy_artifacts(out_dir: str): ensure_dir(out_dir) dockerfile = f"""FROM python:3.10-slim WORKDIR /app COPY {os.path.basename(file)} /app/forensic_oil_complete_reform.py RUN pip install --no-cache-dir fastapi uvicorn numpy pandas scipy scikit-learn xgboost pillow pymzml tifffile pymc3 arviz shap ENV PROVENANCE_SECRET=change_this_secret EXPOSE 8000 CMD ["python", "forensic_oil_complete_reform.py", "--mode", "run-api"] """ with open(os.path.join(out_dir, 'Dockerfile'), 'w', encoding='utf-8') as f: f.write(dockerfile) logger.info("Deployment artifacts generated at %s", out_dir)
----------------------------
Main CLI
----------------------------
def main(): import argparse parser = argparse.ArgumentParser(description="Forensic Oil Complete Reform CLI") parser.add_argument("--mode", choices=["build-dummy","train","infer-file","auto","run-api","gen-deploy","self-test"], required=False, default="build-dummy") parser.add_argument("--manifest", type=str, default="train_manifest.json") parser.add_argument("--model-dir", type=str, default=DEFAULT_CONFIG['model_dir']) parser.add_argument("--out", type=str, default="out.json") parser.add_argument("--sample-file", type=str, default=None) parser.add_argument("--deploy-out", type=str, default="deploy_artifacts") args = parser.parse_args()
cfg = DEFAULT_CONFIG.copy()
cfg['model_dir'] = args.model_dir
cfg['audit_dir'] = DEFAULT_CONFIG['audit_dir']
cfg['auto']['watch_dir'] = DEFAULT_CONFIG['auto']['watch_dir']
if args.mode == "build-dummy":
samples = build_dummy_samples(200)
ensure_dir(args.model_dir)
res = train_pipeline(samples, cfg, args.model_dir)
print("Dummy training result:", res)
elif args.mode == "train":
if not os.path.exists(args.manifest):
print("Create train_manifest.json listing samples with fields: sample_id, meta_json_path, gcms_path, spectrum_path, image_path, afm_path, cs14c_path (optional), label")
sys.exit(1)
with open(args.manifest, 'r', encoding='utf-8') as f:
manifest_list = json.load(f)
samples = []
for rec in manifest_list:
sid = rec['sample_id']
meta = json.load(open(rec['meta_json_path'], 'r', encoding='utf-8')) if rec.get('meta_json_path') else {}
def read_opt(p):
return open(p, 'rb').read() if p and os.path.exists(p) else None
samples.append(MultimodalSample(sample_id=sid, meta=meta,
gcms_bytes=read_opt(rec.get('gcms_path')),
spectrum_bytes=read_opt(rec.get('spectrum_path')),
image_bytes=read_opt(rec.get('image_path')),
afm_bytes=read_opt(rec.get('afm_path')),
cs14c_bytes=read_opt(rec.get('cs14c_path')),
label=rec.get('label')))
res = train_pipeline(samples, cfg, args.model_dir)
print("Training result:", res)
elif args.mode == "infer-file":
if not args.sample_file or not os.path.exists(args.sample_file):
print("Provide sample JSON file describing paths.")
sys.exit(1)
with open(args.sample_file, 'r', encoding='utf-8') as f:
rec = json.load(f)
def read_opt(p):
return open(p, 'rb').read() if p and os.path.exists(p) else None
sample = MultimodalSample(sample_id=rec.get('sample_id', str(uuid.uuid4())), meta=rec.get('meta', {}),
gcms_bytes=read_opt(rec.get('gcms_path')),
spectrum_bytes=read_opt(rec.get('spectrum_path')),
image_bytes=read_opt(rec.get('image_path')),
afm_bytes=read_opt(rec.get('afm_path')),
cs14c_bytes=read_opt(rec.get('cs14c_path')))
secret = os.environ.get(cfg.get('provenance_secret_env', 'PROVENANCE_SECRET'), 'change_this_secret')
engine = InferenceEngine(args.model_dir, cfg, secret)
report = engine.infer(sample)
with open(args.out, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print("Inference saved to", args.out)
elif args.mode == "auto":
orchestrator = AutoOrchestrator(cfg)
try:
orchestrator.run_daemon()
except Exception as e:
logger.exception("Orchestrator failed: %s", e)
elif args.mode == "run-api":
if FastAPI is None:
print("FastAPI not installed; cannot run API")
sys.exit(1)
import uvicorn
uvicorn.run("forensic_oil_complete_reform:app", host="0.0.0.0", port=8000, reload=False)
elif args.mode == "gen-deploy":
generate_deploy_artifacts(args.deploy_out)
print("Deployment artifacts generated at", args.deploy_out)
elif args.mode == "self-test":
# run a few unit-like checks to ensure robust parsing works
print("Running self-test...")
samples = build_dummy_samples(10)
cfg_local = cfg
ensure_dir(cfg_local['model_dir'])
res = train_pipeline(samples, cfg_local, cfg_local['model_dir'])
print("Self-test train result:", res)
engine = InferenceEngine(cfg_local['model_dir'], cfg_local, os.environ.get(cfg_local.get('provenance_secret_env','PROVENANCE_SECRET'),'change_this_secret'))
report = engine.infer(samples[0])
print("Self-test infer sample:", report['sample_id'], report['verdict'], report['confidence'])
else:
print("Unknown mode:", args.mode)
sys.exit(1)
if name == "main": main()