Rajan Sharma commited on
Commit
8d23104
·
verified ·
1 Parent(s): 525ff8a

Create schema_profiler.py

Browse files
Files changed (1) hide show
  1. schema_profiler.py +103 -0
schema_profiler.py ADDED
@@ -0,0 +1,103 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # schema_profiler.py
2
+ from __future__ import annotations
3
+ from typing import Dict, Any, List, Tuple, Optional
4
+ import pandas as pd
5
+ import numpy as np
6
+ import re, math, os
7
+
8
+ # Optional embeddings for soft matching; falls back to lexical if missing
9
+ try:
10
+ from sentence_transformers import SentenceTransformer
11
+ _EMB = SentenceTransformer("all-MiniLM-L6-v2")
12
+ except Exception:
13
+ _EMB = None
14
+
15
+ def profile_csv(path: str, max_rows: int = 10000) -> Dict[str, Any]:
16
+ df = pd.read_csv(path, nrows=max_rows, low_memory=False)
17
+ cols = []
18
+ for c in df.columns:
19
+ s = df[c]
20
+ cols.append({
21
+ "raw": str(c),
22
+ "dtype": str(s.dtype),
23
+ "nonnull": int(s.notna().sum()),
24
+ "samples": s.dropna().astype(str).head(3).tolist(),
25
+ })
26
+ return {"kind":"csv","name":os.path.basename(path),"rows":len(df),"columns":cols,"df":df}
27
+
28
+ def build_dynamic_label_space(scenario_text: str) -> List[str]:
29
+ """
30
+ Create a candidate label space from the scenario itself:
31
+ - Nounish/metric-like phrases (very permissive)
32
+ - Units hints (%, hours, days, rate, cost, capacity)
33
+ - Also include frequent bigrams from scenario
34
+ """
35
+ t = (scenario_text or "").lower()
36
+ # crude noun-ish grabs
37
+ phrases = re.findall(r"[a-z][a-z0-9_./%-]*(?:\s+[a-z0-9_./%-]+){0,3}", t)
38
+ phrases = [p.strip() for p in phrases if len(p.split())<=4 and len(p)>=3]
39
+ # keep likely metric-ish tokens
40
+ keepers = []
41
+ for p in phrases:
42
+ if any(k in p for k in ["median","mean","p90","p95","rate","cost","capacity","clients","visits","screen","a1c","bmi","bp","wait","throughput","budget","per day","per client","percent","%","hours","days","delta","change","outcome"]):
43
+ keepers.append(p)
44
+ # dedupe and limit size
45
+ seen = set()
46
+ out = []
47
+ for x in keepers:
48
+ x = re.sub(r"\s+", " ", x).strip()
49
+ if x not in seen:
50
+ seen.add(x)
51
+ out.append(x)
52
+ if len(out) >= 128:
53
+ break
54
+ return out or ["value","count","rate","cost","capacity"]
55
+
56
+ def soft_bind_inputs_to_columns(
57
+ required_inputs: List[str],
58
+ column_bag: List[str],
59
+ scenario_labels: List[str],
60
+ min_score: float = 0.46
61
+ ) -> Dict[str, Dict[str, Any]]:
62
+ """
63
+ For each required input "name", find the best candidate column from the union of:
64
+ - uploaded headers
65
+ - scenario-derived label space
66
+ Returns {input_name: {"match": raw_col_or_label, "score": float, "source": "header|scenario"}}
67
+ If no confident match, the 'match' is None.
68
+ """
69
+ req = [r.strip() for r in required_inputs if r and r.strip()]
70
+ if not req:
71
+ return {}
72
+
73
+ # Vectorize all tokens if embeddings present
74
+ combined_pool = list(dict.fromkeys(column_bag + scenario_labels))
75
+ if _EMB is not None and combined_pool:
76
+ pool_vecs = _EMB.encode(combined_pool)
77
+ req_vecs = _EMB.encode(req)
78
+ sims = np.matmul(req_vecs, pool_vecs.T) # cosine if model outputs normalized; good enough here
79
+ mapping: Dict[str, Dict[str, Any]] = {}
80
+ for i, name in enumerate(req):
81
+ j = int(np.argmax(sims[i]))
82
+ score = float(np.max(sims[i]))
83
+ cand = combined_pool[j]
84
+ src = "header" if cand in column_bag else "scenario"
85
+ mapping[name] = {"match": cand if score >= min_score else None, "score": score, "source": src}
86
+ return mapping
87
+
88
+ # Fallback: lexical overlap (very conservative)
89
+ def _lex_overlap(a: str, b: str) -> float:
90
+ A = set(re.findall(r"[a-z0-9]+", a.lower()))
91
+ B = set(re.findall(r"[a-z0-9]+", b.lower()))
92
+ if not A or not B: return 0.0
93
+ return len(A & B) / math.sqrt(len(A)*len(B))
94
+
95
+ mapping: Dict[str, Dict[str, Any]] = {}
96
+ for name in req:
97
+ best = ("", 0.0, "")
98
+ for cand in combined_pool:
99
+ s = _lex_overlap(name, cand)
100
+ if s > best[1]:
101
+ best = (cand, s, "header" if cand in column_bag else "scenario")
102
+ mapping[name] = {"match": best[0] if best[1] >= 0.34 else None, "score": best[1], "source": best[2]}
103
+ return mapping