Spaces:
Running
Running
| # core/variable_loader.py | |
| import os | |
| import glob | |
| import json | |
| import time | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| try: | |
| import pandas as pd | |
| except Exception: | |
| pd = None | |
| # cache path (temporary) | |
| CACHE_PATH = "/tmp/ct_var_cache.json" | |
| CACHE_TTL_SECONDS = 60 * 60 # 1 hour; adjust as needed | |
| # candidate filenames / patterns to detect relevant excel files | |
| DEFAULT_PATTERNS = [ | |
| "*SDTM*.xls*", "*SDTMIG*.xls*", "*SDTM_*.xls*", "SDTM*.xls*", | |
| "*ADaM*.xls*", "*ADaMIG*.xls*", "ADaM*.xls*", | |
| "*CDASH*.xls*", "*CDASHIG*.xls*", "CDASH*.xls*" | |
| ] | |
| # Typical column name candidates | |
| VAR_COL_CANDIDATES = [ | |
| "variable", "variable name", "varname", "var", "column", "fieldname" | |
| ] | |
| LABEL_COL_CANDIDATES = [ | |
| "label", "variable label", "var label", "column label" | |
| ] | |
| DESC_COL_CANDIDATES = [ | |
| "description", "definition", "long name", "comments", "notes" | |
| ] | |
| ROLE_COL_CANDIDATES = [ | |
| "role", "type", "datatype", "origin" | |
| ] | |
| def _first_existing(columns, candidates): | |
| if not columns: | |
| return None | |
| low = {c.strip().lower(): c for c in columns} | |
| for cand in candidates: | |
| for k, orig in low.items(): | |
| if cand == k or cand in k: | |
| return orig | |
| return None | |
| def _discover_files(search_paths=None, patterns=None): | |
| patterns = patterns or DEFAULT_PATTERNS | |
| search_paths = search_paths or [ | |
| ".", "/workspace/data", "/mnt/data", os.getcwd(), | |
| "/root/.cache/huggingface/hub", "/home/user/.cache/huggingface/hub", | |
| "/root/.cache/huggingface/hub/datasets--essprasad--CT-Chat-Docs", | |
| "/home/user/.cache/huggingface/hub/datasets--essprasad--CT-Chat-Docs", | |
| ] | |
| found = [] | |
| for base in search_paths: | |
| if not base or not os.path.exists(base): | |
| continue | |
| for pat in patterns: | |
| try: | |
| matches = glob.glob(os.path.join(base, pat), recursive=True) | |
| for m in matches: | |
| if os.path.isfile(m) and m.lower().endswith((".xls", ".xlsx")): | |
| found.append(os.path.abspath(m)) | |
| except Exception: | |
| continue | |
| # dedupe but keep order | |
| seen = set() | |
| unique = [] | |
| for p in found: | |
| if p not in seen: | |
| seen.add(p) | |
| unique.append(p) | |
| return unique | |
| def _extract_from_df(df, filename): | |
| """ | |
| Given a dataframe, find likely variable/label/description columns and extract rows. | |
| Returns list of dicts. | |
| """ | |
| out = [] | |
| if df is None or df.shape[0] == 0: | |
| return out | |
| cols = list(df.columns) | |
| term_col = _first_existing(cols, VAR_COL_CANDIDATES) | |
| label_col = _first_existing(cols, LABEL_COL_CANDIDATES) | |
| desc_col = _first_existing(cols, DESC_COL_CANDIDATES) | |
| role_col = _first_existing(cols, ROLE_COL_CANDIDATES) | |
| # If we absolutely cannot find a term column, try first column | |
| if not term_col: | |
| term_col = cols[0] if cols else None | |
| # If there's absolutely no useful columns, give up | |
| if not term_col: | |
| return out | |
| for _, row in df.iterrows(): | |
| try: | |
| term = str(row.get(term_col, "") or "").strip() | |
| except Exception: | |
| term = "" | |
| if not term: | |
| continue | |
| label = "" | |
| desc = "" | |
| role = "" | |
| try: | |
| label = str(row.get(label_col, "") or "").strip() if label_col in df.columns else "" | |
| except Exception: | |
| label = "" | |
| try: | |
| desc = str(row.get(desc_col, "") or "").strip() if desc_col in df.columns else "" | |
| except Exception: | |
| desc = "" | |
| try: | |
| role = str(row.get(role_col, "") or "").strip() if role_col in df.columns else "" | |
| except Exception: | |
| role = "" | |
| # Compose a clean definition | |
| parts = [] | |
| if label: | |
| parts.append(f"Label: {label}") | |
| if desc: | |
| parts.append(f"Description: {desc}") | |
| if role: | |
| parts.append(f"Role/Origin: {role}") | |
| definition = " \n".join(parts).strip() or (label or desc or "") | |
| out.append({ | |
| "term": term, | |
| "definition": definition, | |
| "file": os.path.basename(filename), | |
| "type": "variable", | |
| "sources": [os.path.basename(filename)] | |
| }) | |
| return out | |
| def load_variable_metadata(search_paths=None, use_cache=True, verbose=True): | |
| """ | |
| Discover SDTM/ADaM/CDASH excel files and extract variable metadata. | |
| Returns list of dicts: {'term','definition','file','type','sources'} | |
| """ | |
| # quick fail if pandas not installed | |
| if pd is None: | |
| logger.warning("pandas not available — variable metadata loading skipped.") | |
| return [] | |
| # cache handling | |
| try: | |
| if use_cache and os.path.exists(CACHE_PATH): | |
| mtime = os.path.getmtime(CACHE_PATH) | |
| if time.time() - mtime < CACHE_TTL_SECONDS: | |
| if verbose: | |
| logger.info("Loading variable metadata from cache: %s", CACHE_PATH) | |
| with open(CACHE_PATH, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| # continue if cache read fails | |
| pass | |
| files = _discover_files(search_paths=search_paths) | |
| if verbose: | |
| logger.info("Variable loader discovered %d candidate Excel files.", len(files)) | |
| all_entries = [] | |
| for fx in files: | |
| try: | |
| # read all sheets (ExcelFile faster for many sheets) | |
| xls = pd.ExcelFile(fx) | |
| # iterate sheets: | |
| for sheet in xls.sheet_names: | |
| try: | |
| df = pd.read_excel(fx, sheet_name=sheet) | |
| # drop rows where all cells are NaN | |
| df = df.dropna(how="all") | |
| entries = _extract_from_df(df, fx) | |
| if entries: | |
| # annotate with sheet name to improve provenance | |
| for e in entries: | |
| e["sources"].append(f"{os.path.basename(fx)}::{sheet}") | |
| all_entries.extend(entries) | |
| except Exception: | |
| # try next sheet | |
| continue | |
| except Exception: | |
| # fallback: try single-sheet read | |
| try: | |
| df = pd.read_excel(fx) | |
| df = df.dropna(how="all") | |
| entries = _extract_from_df(df, fx) | |
| all_entries.extend(entries) | |
| except Exception as e: | |
| logger.debug("Failed reading excel %s: %s", fx, e) | |
| continue | |
| # dedupe by term (keep first occurrence) | |
| seen = {} | |
| deduped = [] | |
| for e in all_entries: | |
| key = (e["term"].strip().lower()) | |
| if key and key not in seen: | |
| seen[key] = True | |
| deduped.append(e) | |
| # write cache | |
| try: | |
| with open(CACHE_PATH, "w", encoding="utf-8") as f: | |
| json.dump(deduped, f, ensure_ascii=False, indent=2) | |
| except Exception: | |
| pass | |
| if verbose: | |
| logger.info("Variable loader extracted %d unique variables.", len(deduped)) | |
| return deduped | |
| if __name__ == "__main__": | |
| # quick CLI for debugging | |
| items = load_variable_metadata(verbose=True) | |
| print(f"[variable_loader] extracted {len(items)} items") | |
| if items: | |
| print("Sample:", items[:5]) | |