| | """ |
| | Mapper v5.0: SRE-Observable Entity/Industry Detection |
| | |
| | Changes: |
| | - Added Prometheus metrics for all Redis operations |
| | - Added circuit breaker for Redis failures |
| | - Added pub/sub events when entity/industry is detected |
| | - Added structured JSON logging for Loki/Splunk |
| | - Added health check endpoint |
| | - ZERO changes to core detection logic |
| | """ |
| |
|
| | import os |
| | import json |
| | import asyncio |
| | import pandas as pd |
| | import numpy as np |
| | from datetime import datetime, timedelta |
| | from concurrent.futures import ThreadPoolExecutor |
| | import time |
| | import logging |
| | from typing import Dict, Any, Optional |
| |
|
| | from app.db import get_conn, ensure_raw_table, transactional_conn, ensure_schema_versions_table |
| | from app.core.detection_engine import hybrid_detect_entity_type,hybrid_detect_industry_type |
| | from app.core.event_hub import event_hub |
| | from app.deps import get_sre_metrics |
| | from app.core.sre_logging import emit_mapper_log |
| | |
| | try: |
| | from prometheus_client import Counter, Histogram, Gauge |
| | except ImportError: |
| | class Counter: |
| | def __init__(self, *args, **kwargs): pass |
| | def inc(self, amount=1): pass |
| | |
| | class Histogram: |
| | def __init__(self, *args, **kwargs): pass |
| | def observe(self, value): pass |
| | |
| | class Gauge: |
| | def __init__(self, *args, **kwargs): pass |
| | def set(self, value): pass |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| |
|
| | |
| | class MapperMetrics: |
| | """SRE: Metrics for mapper operations""" |
| | redis_reads = Counter( |
| | 'mapper_redis_reads_total', |
| | 'Total Redis read operations', |
| | ['org_id', 'status'] |
| | ) |
| | |
| | redis_writes = Counter( |
| | 'mapper_redis_writes_total', |
| | 'Total Redis write operations', |
| | ['org_id', 'status'] |
| | ) |
| | |
| | fallback_runs = Counter( |
| | 'mapper_fallback_total', |
| | 'Total fallback executions', |
| | ['org_id', 'fallback_type'] |
| | ) |
| | |
| | detection_latency = Histogram( |
| | 'mapper_detection_duration_seconds', |
| | 'Time to detect entity/industry', |
| | ['org_id', 'detection_type'] |
| | ) |
| | |
| | cache_size = Gauge( |
| | 'mapper_cache_entries', |
| | 'Number of cached entries', |
| | ['cache_type'] |
| | ) |
| |
|
| | |
| | _circuit_breaker = { |
| | "failure_count": 0, |
| | "last_failure_time": None, |
| | "is_open": False, |
| | "threshold": 5, |
| | "reset_timeout": 300 |
| | } |
| |
|
| | |
| | CANONICAL = { |
| | "timestamp": ["timestamp", "date", "sale_date", "created_at"], |
| | "product_id": ["sku", "barcode", "plu", "product_id", "item_code"], |
| | "qty": ["qty", "quantity", "units", "pieces"], |
| | "total": ["total", "amount", "line_total", "sales_amount"], |
| | "store_id": ["store_id", "branch", "location", "outlet_id"], |
| | "category": ["category", "department", "cat", "family"], |
| | "promo_flag": ["promo", "promotion", "is_promo", "discount_code"], |
| | "expiry_date":["expiry_date", "best_before", "use_by", "expiration"], |
| | } |
| |
|
| | ALIAS_FILE = "./db/alias_memory.json" |
| |
|
| | |
| | _ENTITY_CACHE = {} |
| | _INDUSTRY_CACHE = {} |
| |
|
| | |
| |
|
| | def _check_circuit_breaker() -> bool: |
| | """Check if Redis circuit is open""" |
| | if not _circuit_breaker["is_open"]: |
| | return True |
| | |
| | |
| | if _circuit_breaker["last_failure_time"]: |
| | elapsed = time.time() - _circuit_breaker["last_failure_time"] |
| | if elapsed > _circuit_breaker["reset_timeout"]: |
| | logger.warning("[CIRCUIT] 🔄 Closing breaker, retrying...") |
| | _circuit_breaker["is_open"] = False |
| | _circuit_breaker["failure_count"] = 0 |
| | return True |
| | |
| | logger.error("[CIRCUIT] 🔴 Circuit breaker OPEN - rejecting Redis ops") |
| | return False |
| |
|
| | def _record_redis_failure(error: str): |
| | """Track Redis failures""" |
| | _circuit_breaker["failure_count"] += 1 |
| | _circuit_breaker["last_failure_time"] = time.time() |
| | |
| | if _circuit_breaker["failure_count"] >= _circuit_breaker["threshold"]: |
| | _circuit_breaker["is_open"] = True |
| | logger.critical(f"[CIRCUIT] 🔴 Breaker opened! {_circuit_breaker['failure_count']} failures") |
| |
|
| | def _record_redis_success(): |
| | """Reset failure count on success""" |
| | if _circuit_breaker["failure_count"] > 0: |
| | logger.info(f"[CIRCUIT] ✅ Resetting failure count (was {_circuit_breaker['failure_count']})") |
| | _circuit_breaker["failure_count"] = 0 |
| |
|
| | def _publish_detection_event(org_id: str, source_id: str, detection_type: str, data: Dict): |
| | """ |
| | 🚀 Pub/Sub: Publish entity/industry detection event |
| | Frontend can subscribe to: `detection:events:{org_id}:{source_id}` |
| | """ |
| | try: |
| | channel = f"detection:events:{org_id}:{source_id}" |
| | payload = { |
| | "type": f"{detection_type}.detected", |
| | "timestamp": datetime.utcnow().isoformat(), |
| | "org_id": org_id, |
| | "source_id": source_id, |
| | "data": data |
| | } |
| | |
| | |
| | asyncio.create_task( |
| | asyncio.to_thread( |
| | event_hub.publish, |
| | channel, |
| | json.dumps(payload) |
| | ) |
| | ) |
| | |
| | logger.info(f"[PUBSUB] 📡 Published {detection_type} detection event") |
| | |
| | except Exception as e: |
| | logger.error(f"[PUBSUB] ❌ Failed to publish detection event: {e}") |
| |
|
| | |
| |
|
| | def map_pandas_to_duck(col: str, series: pd.Series) -> str: |
| | """Map pandas dtype to DuckDB type (UNCHANGED)""" |
| | if pd.api.types.is_bool_dtype(series): return "BOOLEAN" |
| | if pd.api.types.is_integer_dtype(series): return "BIGINT" |
| | if pd.api.types.is_float_dtype(series): return "DOUBLE" |
| | if pd.api.types.is_datetime64_any_dtype(series): return "TIMESTAMP" |
| | return "VARCHAR" |
| |
|
| | def load_dynamic_aliases() -> None: |
| | """Load column alias mappings (UNCHANGED)""" |
| | if os.path.exists(ALIAS_FILE): |
| | try: |
| | with open(ALIAS_FILE) as f: |
| | dynamic_aliases = json.load(f) |
| | for k, v in dynamic_aliases.items(): |
| | if k in CANONICAL: |
| | CANONICAL[k].extend([a for a in v if a not in CANONICAL[k]]) |
| | else: |
| | CANONICAL[k] = v |
| | except Exception as e: |
| | print(f"[mapper] ⚠️ Failed to load alias memory: {e}") |
| |
|
| | def save_dynamic_aliases() -> None: |
| | """Save column alias mappings (UNCHANGED)""" |
| | os.makedirs(os.path.dirname(ALIAS_FILE), exist_ok=True) |
| | with open(ALIAS_FILE, "w") as f: |
| | json.dump(CANONICAL, f, indent=2) |
| |
|
| | |
| |
|
| | def health_check_mapper(org_id: str = "test") -> Dict[str, Any]: |
| | """SRE: Health check for mapper service""" |
| | return { |
| | "status": "healthy" if not _circuit_breaker["is_open"] else "degraded", |
| | "circuit_breaker": { |
| | "open": _circuit_breaker["is_open"], |
| | "failure_count": _circuit_breaker["failure_count"] |
| | }, |
| | "cache_size": { |
| | "entity": len(_ENTITY_CACHE), |
| | "industry": len(_INDUSTRY_CACHE) |
| | }, |
| | "canonical_columns": len(CANONICAL), |
| | "metrics": get_sre_metrics() |
| | } |
| |
|
| | |
| |
|
| | def poll_for_entity(org_id: str, source_id: str, timeout: int = 10) -> dict: |
| | """ |
| | Poll Redis for entity detection result - NOW WITH SRE OBSERVABILITY |
| | |
| | Core logic: UNCHANGED |
| | - Checks cache first (zero Redis calls) |
| | - Polls Redis twice with 3s sleep |
| | - Falls back to combined detection |
| | |
| | Added: |
| | - Prometheus metrics for cache hits/misses |
| | - Circuit breaker protection |
| | - Pub/sub event when entity detected |
| | - Structured logging |
| | """ |
| | start_time = time.time() |
| | cache_key = (org_id, source_id) |
| | |
| | |
| | if cache_key in _ENTITY_CACHE: |
| | logger.info(f"[ENTITY] 💾 CACHE HIT: {cache_key}") |
| | MapperMetrics.redis_reads.labels(org_id=org_id, status="cache_hit").inc() |
| | |
| | |
| | _publish_detection_event(org_id, source_id, "entity", _ENTITY_CACHE[cache_key]) |
| | |
| | return _ENTITY_CACHE[cache_key] |
| | |
| | |
| | if not _check_circuit_breaker(): |
| | logger.error("[ENTITY] 🔴 Circuit open - using fallback immediately") |
| | entity_info, _ = _fallback_combined(org_id, source_id) |
| | MapperMetrics.fallback_runs.labels(org_id=org_id, fallback_type="entity").inc() |
| | return entity_info |
| | |
| | try: |
| | |
| | entity_key = f"entity:{org_id}:{source_id}" |
| | logger.info(f"[ENTITY] ⏳ Polling for key: {entity_key}") |
| | |
| | for attempt in range(2): |
| | redis_start = time.time() |
| | data = event_hub.get_key(entity_key) |
| | redis_latency = (time.time() - redis_start) * 1000 |
| | |
| | if data: |
| | entity_info = json.loads(data) |
| | logger.info(f"[ENTITY] ✅ Redis hit: {entity_info['entity_type']} (attempt {attempt+1})") |
| | |
| | MapperMetrics.redis_reads.labels(org_id=org_id, status="success").inc() |
| | MapperMetrics.detection_latency.labels(org_id=org_id, detection_type="entity").observe( |
| | (time.time() - start_time) + attempt * 3 |
| | ) |
| | |
| | |
| | _ENTITY_CACHE[cache_key] = entity_info |
| | MapperMetrics.cache_size.labels(cache_type="entity").set(len(_ENTITY_CACHE)) |
| | |
| | |
| | _publish_detection_event(org_id, source_id, "entity", entity_info) |
| | |
| | _record_redis_success() |
| | |
| | return entity_info |
| | |
| | if attempt == 0: |
| | logger.debug("[ENTITY] 🔄 First check failed, sleeping 3s...") |
| | time.sleep(3.0) |
| | MapperMetrics.redis_reads.labels(org_id=org_id, status="miss").inc() |
| | |
| | |
| | logger.warning("[ENTITY] ⚠️ Using fallback") |
| | MapperMetrics.fallback_runs.labels(org_id=org_id, fallback_type="entity").inc() |
| | entity_info, _ = _fallback_combined(org_id, source_id) |
| | |
| | return entity_info |
| | |
| | except Exception as e: |
| | _record_redis_failure(str(e)) |
| | MapperMetrics.redis_reads.labels(org_id=org_id, status="error").inc() |
| | logger.error(f"[ENTITY] ❌ Error: {e}, using fallback") |
| | |
| | entity_info, _ = _fallback_combined(org_id, source_id) |
| | return entity_info |
| |
|
| | def poll_for_industry(org_id: str, source_id: str, timeout: int = 10) -> dict: |
| | """ |
| | Poll Redis for industry detection result - NOW WITH SRE OBSERVABILITY |
| | |
| | Core logic: UNCHANGED |
| | Reuses data from poll_for_entity to avoid duplicate Redis calls |
| | |
| | Added: |
| | - Prometheus metrics for cache hits/misses |
| | - Circuit breaker protection |
| | - Pub/sub event when industry detected |
| | """ |
| | start_time = time.time() |
| | cache_key = (org_id, source_id) |
| | |
| | |
| | if cache_key in _INDUSTRY_CACHE: |
| | logger.info(f"[INDUSTRY] 💾 CACHE HIT: {cache_key}") |
| | MapperMetrics.redis_reads.labels(org_id=org_id, status="cache_hit").inc() |
| | |
| | _publish_detection_event(org_id, source_id, "industry", _INDUSTRY_CACHE[cache_key]) |
| | |
| | return _INDUSTRY_CACHE[cache_key] |
| | |
| | |
| | if not _check_circuit_breaker(): |
| | logger.error("[INDUSTRY] 🔴 Circuit open - using fallback") |
| | industry_info = _fallback_industry_detection(org_id, source_id) |
| | MapperMetrics.fallback_runs.labels(org_id=org_id, fallback_type="industry").inc() |
| | return industry_info |
| | |
| | try: |
| | |
| | industry_key = f"industry:{org_id}:{source_id}" |
| | logger.info(f"[INDUSTRY] ⏳ Polling for key: {industry_key}") |
| | |
| | redis_start = time.time() |
| | data = event_hub.get_key(industry_key) |
| | redis_latency = (time.time() - redis_start) * 1000 |
| | |
| | if data: |
| | industry_info = json.loads(data) |
| | logger.info(f"[INDUSTRY] ✅ Redis hit: {industry_info['industry']}") |
| | |
| | MapperMetrics.redis_reads.labels(org_id=org_id, status="success").inc() |
| | MapperMetrics.detection_latency.labels(org_id=org_id, detection_type="industry").observe( |
| | time.time() - start_time |
| | ) |
| | |
| | |
| | _INDUSTRY_CACHE[cache_key] = industry_info |
| | MapperMetrics.cache_size.labels(cache_type="industry").set(len(_INDUSTRY_CACHE)) |
| | |
| | |
| | _publish_detection_event(org_id, source_id, "industry", industry_info) |
| | |
| | _record_redis_success() |
| | |
| | return industry_info |
| | |
| | |
| | logger.warning("[INDUSTRY] ⚠️ Cache miss, running emergency fallback") |
| | MapperMetrics.fallback_runs.labels(org_id=org_id, fallback_type="industry").inc() |
| | industry_info = _fallback_industry_detection(org_id, source_id) |
| | |
| | return industry_info |
| | |
| | except Exception as e: |
| | _record_redis_failure(str(e)) |
| | MapperMetrics.redis_reads.labels(org_id=org_id, status="error").inc() |
| | logger.error(f"[INDUSTRY] ❌ Error: {e}, using fallback") |
| | |
| | industry_info = _fallback_industry_detection(org_id, source_id) |
| | return industry_info |
| |
|
| | def _fallback_combined(org_id: str, source_id: str) -> tuple[dict, dict]: |
| | """ |
| | SINGLE DuckDB query to detect BOTH entity and industry. |
| | Writes BOTH keys to Redis atomically. |
| | Updates caches WITHOUT immediately invalidating them. |
| | |
| | Core logic: UNCHANGED |
| | - Runs detection in parallel ThreadPoolExecutor |
| | - Writes to Redis via event_hub.setex() |
| | - Updates in-memory caches |
| | |
| | Added: |
| | - Prometheus metrics for fallback executions |
| | - Circuit breaker checks |
| | - Pub/sub events for both entity and industry |
| | - Structured logging |
| | """ |
| | start_time = time.time() |
| | logger.info(f"[FALLBACK] 🚨 Running combined fallback for {org_id}/{source_id}") |
| | |
| | MapperMetrics.fallback_runs.labels(org_id=org_id, fallback_type="combined").inc() |
| | |
| | |
| | if not _check_circuit_breaker(): |
| | logger.error("[FALLBACK] 🔴 Circuit open - returning UNKNOWN") |
| | entity_info = {"entity_type": "UNKNOWN", "confidence": 0.0} |
| | industry_info = {"industry": "UNKNOWN", "confidence": 0.0} |
| | return entity_info, industry_info |
| | |
| | |
| | entity_info = {"entity_type": "UNKNOWN", "confidence": 0.0} |
| | industry_info = {"industry": "UNKNOWN", "confidence": 0.0} |
| | |
| | try: |
| | conn = get_conn(org_id) |
| | rows = conn.execute(""" |
| | SELECT row_data |
| | FROM main.raw_rows |
| | WHERE row_data IS NOT NULL |
| | USING SAMPLE 100 |
| | """).fetchall() |
| | |
| | if rows: |
| | parsed = [json.loads(r[0]) for r in rows if r[0]] |
| | df = pd.DataFrame(parsed) |
| | df.columns = [str(col).lower().strip() for col in df.columns] |
| | |
| | def detect_entity(): |
| | try: |
| | return hybrid_detect_entity_type(org_id, df, source_id, use_llm=False) |
| | except Exception as e: |
| | logger.error(f"[FALLBACK] Entity detection failed: {e}") |
| | return ("UNKNOWN", 0.0, False) |
| | |
| | def detect_industry(): |
| | try: |
| | |
| | return hybrid_detect_industry_type(org_id, df, source_id, use_llm=False) |
| | except Exception as e: |
| | logger.error(f"[FALLBACK] Industry detection failed: {e}") |
| | return ("UNKNOWN", 0.0, False) |
| | |
| | with ThreadPoolExecutor(max_workers=2) as ex: |
| | ent_future = ex.submit(detect_entity) |
| | ind_future = ex.submit(detect_industry) |
| | |
| | entity_type, ent_conf, _ = ent_future.result() |
| | industry, ind_conf, _ = ind_future.result() |
| | |
| | entity_info = {"entity_type": entity_type, "confidence": ent_conf} |
| | industry_info = {"industry": industry, "confidence": ind_conf} |
| | |
| | logger.info( |
| | f"[FALLBACK] ✅ Entity: {entity_type} ({ent_conf:.2%}), " |
| | f"Industry: {industry} ({ind_conf:.2%})" |
| | ) |
| | |
| | except Exception as e: |
| | logger.error(f"[FALLBACK] ❌ Failed: {e}") |
| | MapperMetrics.stream_errors.labels(org_id=org_id, error_type="fallback_error").inc() |
| | |
| | |
| | try: |
| | e_key = f"entity:{org_id}:{source_id}" |
| | i_key = f"industry:{org_id}:{source_id}" |
| | |
| | |
| | redis_start = time.time() |
| | event_hub.setex(e_key, 3600, json.dumps(entity_info)) |
| | event_hub.setex(i_key, 3600, json.dumps(industry_info)) |
| | redis_latency = (time.time() - redis_start) * 1000 |
| | |
| | logger.info(f"[FALLBACK] 💾 WRITTEN to Redis in {redis_latency:.2f}ms") |
| | |
| | MapperMetrics.redis_writes.labels(org_id=org_id, status="success").inc(2) |
| | MapperMetrics.detection_latency.labels(org_id=org_id, detection_type="combined").observe( |
| | time.time() - start_time |
| | ) |
| | |
| | |
| | _publish_detection_event(org_id, source_id, "entity", entity_info) |
| | _publish_detection_event(org_id, source_id, "industry", industry_info) |
| | |
| | _record_redis_success() |
| | |
| | except Exception as re: |
| | _record_redis_failure(str(re)) |
| | MapperMetrics.redis_writes.labels(org_id=org_id, status="error").inc(2) |
| | logger.error(f"[FALLBACK] ❌ Redis write failed: {re}") |
| | |
| | |
| | cache_key = (org_id, source_id) |
| | _ENTITY_CACHE[cache_key] = entity_info |
| | _INDUSTRY_CACHE[cache_key] = industry_info |
| | MapperMetrics.cache_size.labels(cache_type="entity").set(len(_ENTITY_CACHE)) |
| | MapperMetrics.cache_size.labels(cache_type="industry").set(len(_INDUSTRY_CACHE)) |
| | |
| | return entity_info, industry_info |
| |
|
| | def _fallback_industry_detection(org_id: str, source_id: str) -> dict: |
| | """ |
| | Emergency fallback for industry only (rarely used). |
| | Core logic: UNCHANGED |
| | Added: SRE metrics, circuit breaker, pub/sub event |
| | """ |
| | logger.info(f"[FALLBACK_IND] 🚨 Emergency fallback for {org_id}/{source_id}") |
| | MapperMetrics.fallback_runs.labels(org_id=org_id, fallback_type="industry_emergency").inc() |
| | |
| | if not _check_circuit_breaker(): |
| | logger.error("[FALLBACK_IND] 🔴 Circuit open - returning UNKNOWN") |
| | return {"industry": "UNKNOWN", "confidence": 0.0} |
| | |
| | try: |
| | conn = get_conn(org_id) |
| | rows = conn.execute(""" |
| | SELECT row_data |
| | FROM main.raw_rows |
| | WHERE row_data IS NOT NULL |
| | USING SAMPLE 100 |
| | """).fetchall() |
| | |
| | if not rows: |
| | logger.warning("[FALLBACK_IND] No data found") |
| | return {"industry": "UNKNOWN", "confidence": 0.0} |
| | |
| | parsed = [json.loads(r[0]) for r in rows if r[0]] |
| | df = pd.DataFrame(parsed) |
| | df.columns = [str(col).lower().strip() for col in df.columns] |
| | |
| | from app.core.detection_engine import hybrid_detect_industry_type |
| | industry, confidence, _ = hybrid_detect_industry_type(org_id, df, source_id, use_llm=False) |
| |
|
| | |
| | industry_info = {"industry": industry, "confidence": confidence} |
| | logger.info(f"[FALLBACK_IND] ✅ Detected: {industry} ({confidence:.2%})") |
| | |
| | |
| | redis_key = f"industry:{org_id}:{source_id}" |
| | event_hub.setex(redis_key, 3600, json.dumps(industry_info)) |
| | logger.info(f"[FALLBACK_IND] 💾 WRITTEN to Redis: {redis_key}") |
| | |
| | MapperMetrics.redis_writes.labels(org_id=org_id, status="success").inc() |
| | _record_redis_success() |
| | |
| | |
| | _publish_detection_event(org_id, source_id, "industry", industry_info) |
| | |
| | return industry_info |
| | |
| | except Exception as e: |
| | _record_redis_failure(str(e)) |
| | MapperMetrics.redis_writes.labels(org_id=org_id, status="error").inc() |
| | logger.error(f"[FALLBACK_IND] ❌ Failed: {e}") |
| | |
| | |
| | redis_key = f"industry:{org_id}:{source_id}" |
| | event_hub.setex(redis_key, 3600, json.dumps({"industry": "UNKNOWN", "confidence": 0.0})) |
| | return {"industry": "UNKNOWN", "confidence": 0.0} |
| |
|
| | |
| |
|
| | def ensure_canonical_table(duck, df: pd.DataFrame, entity_type: str) -> str: |
| | """Creates entity-specific table (UNCHANGED)""" |
| | table_name = f"main.{entity_type}_canonical" |
| | |
| | duck.execute(f""" |
| | CREATE TABLE IF NOT EXISTS {table_name} ( |
| | id UUID DEFAULT uuid(), |
| | _ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| | ) |
| | """) |
| | |
| | existing_cols_raw = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall() |
| | existing_cols = {str(r[0]).lower() for r in existing_cols_raw} |
| | |
| | for col in df.columns: |
| | col_name = str(col).lower().strip() |
| | if col_name not in existing_cols: |
| | try: |
| | dtype = map_pandas_to_duck(col_name, df[col]) |
| | logger.info(f"[MAPPER] ➕ Adding column '{col_name}:{dtype}'") |
| | duck.execute(f"ALTER TABLE {table_name} ADD COLUMN {col_name} {dtype}") |
| | except Exception as e: |
| | logger.warning(f"[MAPPER] ⚠️ Skipping column {col_name}: {e}") |
| | |
| | return table_name |
| |
|
| | |
| |
|
| | def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]: |
| | """ |
| | ENTERPRISE DATA INGESTION PIPELINE |
| | Safe, idempotent, and Redis-efficient. |
| | |
| | Core logic: UNCHANGED |
| | Added: SRE metrics, structured logging, pub/sub events |
| | """ |
| | start_time = time.time() |
| | emit_mapper_log("info", f"🚀 Starting pipeline for {org_id}/{source_id}") |
| | |
| | |
| | load_dynamic_aliases() |
| | |
| | |
| | with get_conn(org_id) as conn: |
| | ensure_raw_table(conn) |
| | cutoff_time = datetime.now() - timedelta(hours=hours_window) |
| | |
| | try: |
| | rows = conn.execute(""" |
| | SELECT row_data FROM main.raw_rows |
| | WHERE row_data IS NOT NULL |
| | AND LENGTH(CAST(row_data AS TEXT)) > 0 |
| | AND ingested_at >= ? |
| | ORDER BY ingested_at DESC |
| | """, (cutoff_time,)).fetchall() |
| | except Exception as e: |
| | emit_mapper_log("error", f"❌ SQL read error: {e}", error=str(e)) |
| | return pd.DataFrame(), "unknown", 0.0 |
| | |
| | if not rows: |
| | logger.warning("[MAPPER] ⚠️ No audit rows found") |
| | return pd.DataFrame(), "unknown", 0.0 |
| | |
| | |
| | parsed, malformed_count = [], 0 |
| | for r in rows: |
| | raw = r[0] |
| | if not raw: |
| | malformed_count += 1 |
| | continue |
| | |
| | try: |
| | obj = raw if isinstance(raw, (dict, list)) else json.loads(str(raw)) |
| | except Exception: |
| | malformed_count += 1 |
| | continue |
| | |
| | if isinstance(obj, dict): |
| | if "rows" in obj and isinstance(obj["rows"], list): |
| | parsed.extend(obj["rows"]) |
| | elif "data" in obj and isinstance(obj["data"], list): |
| | parsed.extend(obj["data"]) |
| | elif "tables" in obj and isinstance(obj["tables"], dict): |
| | for table_rows in obj["tables"].values(): |
| | if isinstance(table_rows, list): |
| | parsed.extend(table_rows) |
| | else: |
| | parsed.append(obj) |
| | elif isinstance(obj, list): |
| | parsed.extend(obj) |
| | else: |
| | malformed_count += 1 |
| | |
| | if malformed_count: |
| | logger.warning(f"[MAPPER] ⚠️ Skipped {malformed_count} malformed rows") |
| | if not parsed: |
| | logger.error("[MAPPER] ❌ No valid data after parsing") |
| | return pd.DataFrame(), "unknown", 0.0 |
| | |
| | |
| | df = pd.DataFrame(parsed) |
| | df.columns = [str(col).lower().strip() for col in df.columns] |
| | df = df.loc[:, ~df.columns.duplicated()] |
| | logger.info(f"[MAPPER] 📊 Parsed DataFrame: {len(df)} rows × {len(df.columns)} cols") |
| | |
| | |
| | mapping, canonical_used = {}, set() |
| | for canon, aliases in CANONICAL.items(): |
| | for col in df.columns: |
| | if any(str(alias).lower() in col for alias in aliases): |
| | if canon not in canonical_used: |
| | mapping[col] = canon |
| | canonical_used.add(canon) |
| | logger.info(f"[MAPPER] 🔀 Mapped '{col}' → canonical '{canon}'") |
| | break |
| | |
| | for col in df.columns: |
| | for canon in CANONICAL.keys(): |
| | if str(canon).lower() in col and col not in CANONICAL[canon]: |
| | CANONICAL[canon].append(col) |
| | logger.info(f"[MAPPER] 🧠 Learned new alias: {canon} ← {col}") |
| | |
| | save_dynamic_aliases() |
| | |
| | renamed = df.rename(columns=mapping) |
| | |
| | final_columns, seen = [], set() |
| | for col in renamed.columns: |
| | if col in CANONICAL.keys(): |
| | if col not in seen: |
| | final_columns.append(col) |
| | seen.add(col) |
| | else: |
| | final_columns.append(col) |
| | |
| | df = renamed[final_columns].copy() |
| | logger.info(f"[MAPPER] ✅ Kept columns: {list(df.columns)}") |
| | |
| | |
| | try: |
| | if "timestamp" in df: |
| | df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") |
| | if "expiry_date" in df: |
| | df["expiry_date"] = pd.to_datetime(df["expiry_date"], errors="coerce").dt.date |
| | if "promo_flag" in df: |
| | df["promo_flag"] = df["promo_flag"].astype(str).isin({"1", "true", "t", "yes"}) |
| | for col in ("qty", "total"): |
| | if col in df: |
| | df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) |
| | except Exception as e: |
| | logger.warning(f"[MAPPER] ⚠️ Type conversion warning: {e}") |
| | |
| | |
| | entity_info = poll_for_entity(org_id, source_id) |
| | entity_type = entity_info["entity_type"] |
| | |
| | industry_info = poll_for_industry(org_id, source_id) |
| | industry = industry_info["industry"] |
| | industry_confidence = industry_info["confidence"] |
| | logger.info(f"[MAPPER] 🎯 Entity: {entity_type}, Industry: {industry} ({industry_confidence:.2%})") |
| | |
| | |
| | os.makedirs("./db", exist_ok=True) |
| | |
| | rows_inserted = 0 |
| | |
| | with transactional_conn(org_id) as duck: |
| | ensure_schema_versions_table(duck) |
| | |
| | |
| | current_schema = {col: map_pandas_to_duck(col, df[col]) for col in df.columns} |
| | existing_schema_row = duck.execute(""" |
| | SELECT schema_json, version_id FROM main.schema_versions |
| | WHERE table_name = ? AND status = 'applied' |
| | ORDER BY version_id DESC LIMIT 1 |
| | """, (f"{entity_type}_canonical",)).fetchone() |
| | |
| | is_new_schema = ( |
| | not existing_schema_row or |
| | json.loads(existing_schema_row[0]) != current_schema |
| | ) |
| | |
| | version_id = None |
| | if is_new_schema: |
| | version_id = duck.execute(""" |
| | INSERT INTO main.schema_versions |
| | (version_id, table_name, schema_json, status) |
| | VALUES (nextval('schema_version_seq'), ?, ?, 'pending') |
| | RETURNING version_id |
| | """, (f"{entity_type}_canonical", json.dumps(current_schema))).fetchone()[0] |
| | logger.info(f"[MAPPER] 📝 Created schema v{version_id} for {entity_type}_canonical") |
| | |
| | |
| | table_name = ensure_canonical_table(duck, df, entity_type) |
| | |
| | |
| | if not df.empty: |
| | table_info = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall() |
| | table_cols = [str(r[1]) for r in table_info] |
| | |
| | df_to_insert = df[[col for col in df.columns if col in table_cols]] |
| | |
| | if not df_to_insert.empty: |
| | df_to_insert = df_to_insert.replace([np.inf, -np.inf, np.nan], None) |
| | |
| | cols_str = ", ".join(df_to_insert.columns) |
| | placeholders = ", ".join(["?"] * len(df_to_insert.columns)) |
| | |
| | duck.executemany( |
| | f"INSERT INTO {table_name} ({cols_str}) VALUES ({placeholders})", |
| | df_to_insert.values.tolist() |
| | ) |
| | rows_inserted = len(df_to_insert) |
| | logger.info(f"[MAPPER] 💾 Inserted {rows_inserted} rows into {table_name}") |
| | |
| | |
| | if is_new_schema and version_id: |
| | try: |
| | duck.execute(""" |
| | UPDATE main.schema_versions |
| | SET applied_at = CURRENT_TIMESTAMP, status = 'applied' |
| | WHERE version_id = ? |
| | """, (version_id,)) |
| | logger.info(f"[MAPPER] ✅ Schema v{version_id} marked as applied") |
| | except Exception as e: |
| | logger.warning(f"[MAPPER] ⚠️ Schema update warning: {e}") |
| | |
| | |
| | df = df.replace([np.inf, -np.inf, np.nan], None) |
| | duration_ms = (time.time() - start_time) * 1000 |
| | logger.info(f"[MAPPER] ✅ Pipeline complete in {duration_ms:.2f}ms for {org_id}") |
| | |
| | |
| | try: |
| | |
| | e_key = f"entity:{org_id}:{source_id}" |
| | i_key = f"industry:{org_id}:{source_id}" |
| | |
| | if not event_hub.exists(e_key) or not event_hub.exists(i_key): |
| | logger.warning("[MAPPER] ⚠️ Keys missing, running fallback to ensure") |
| | _fallback_combined(org_id, source_id) |
| | |
| | |
| | trigger_start = time.time() |
| | event_hub.emit_analytics_trigger(org_id, source_id, { |
| | "type": "kpi_compute", |
| | "entity_type": entity_type, |
| | "industry": industry, |
| | "rows_inserted": rows_inserted, |
| | "timestamp": datetime.now().isoformat() |
| | }) |
| | trigger_latency = (time.time() - trigger_start) * 1000 |
| | |
| | logger.info(f"[MAPPER] 🚀 Triggered analytics in {trigger_latency:.2f}ms") |
| | |
| | except Exception as e: |
| | logger.error(f"[MAPPER] ⚠️ Analytics trigger failed: {e}") |
| | _record_redis_failure(f"trigger_error:{e}") |
| | |
| | return df, industry, industry_confidence |