Spaces:
Running
Running
| """ | |
| usage_logging.py | |
| ---------------- | |
| Purpose: | |
| This module implements privacy-preserving telemetry for the | |
| AI Recruiting Agent Hugging Face Space. | |
| Its sole purpose is to measure anonymous usage and adoption | |
| metrics in order to: | |
| - Understand how the tool is being used | |
| - Improve reliability and performance | |
| - Gauge sense of real-world adoption | |
| - Support research and evaluation of responsible AI practices | |
| Privacy Principles: | |
| This module is explicitly designed to minimize data collection | |
| and avoid storing any personally identifiable information (PII). | |
| It DOES NOT collect or store: | |
| - Raw IP addresses | |
| - User names or Hugging Face account IDs | |
| - Resume contents or job descriptions | |
| - Emails, phone numbers, or file names | |
| - Full user-agent strings or device fingerprints | |
| - Any demographic attributes about users | |
| It ONLY records: | |
| - Approximate country and city (derived from IP, not stored) | |
| - UTC timestamp of the event | |
| - Space URL | |
| - High-level event type (e.g., "app_open") | |
| - Non-identifying, aggregate metadata (e.g., counts, booleans, latencies) | |
| All usage logs are: | |
| - Anonymized | |
| - Append-only | |
| - Persisted in a public Hugging Face Dataset repository (https://huggingface.co/datasets/19arjun89/ai_recruiting_agent_usage) | |
| - Versioned via immutable commit history for auditability | |
| Ethical Safeguards: | |
| - Logging failures never break application functionality | |
| - No raw identifiers are persisted at any time | |
| - All telemetry is optional and best-effort | |
| - The system is intended for transparency and improvement, | |
| not for surveillance or profiling | |
| Transparency: | |
| A public-facing usage reporting Space will be provided to allow | |
| independent verification of aggregate adoption metrics. | |
| Author: | |
| Arjun Singh | |
| Last Updated: | |
| 2026-01-27 | |
| """ | |
| import os | |
| import json | |
| from datetime import datetime | |
| import requests | |
| import gradio as gr | |
| from huggingface_hub import HfApi, list_repo_files, hf_hub_download | |
| import ipaddress | |
| import pycountry | |
| from io import BytesIO | |
| import uuid | |
| import time | |
| SPACE_URL = "https://huggingface.co/spaces/19arjun89/AI_Recruiting_Agent" | |
| USAGE_DATASET_REPO = "19arjun89/ai_recruiting_agent_usage" | |
| USAGE_EVENTS_DIR = "usage/events" | |
| LEGACY_JSONL_PATH = "usage/visits_legacy.jsonl" | |
| ROLLUP_PATH = "usage/visits.jsonl" | |
| def _hf_api(): | |
| token = os.environ.get("HF_TOKEN") | |
| if not token: | |
| return None | |
| return HfApi(token=token) | |
| def _is_public_ip(ip: str) -> bool: | |
| try: | |
| obj = ipaddress.ip_address(ip) | |
| return not (obj.is_private or obj.is_loopback or obj.is_reserved or obj.is_multicast or obj.is_link_local) | |
| except Exception: | |
| return False | |
| def _get_client_ip(request: gr.Request) -> str: | |
| if request: | |
| xff = request.headers.get("x-forwarded-for") | |
| if xff: | |
| for part in xff.split(","): | |
| ip = part.strip() | |
| if _is_public_ip(ip): | |
| return ip | |
| if request.client: | |
| host = request.client.host | |
| return host if _is_public_ip(host) else "" | |
| return "" | |
| def _country_lookup(ip: str) -> tuple[str, str]: | |
| token = os.environ.get("IPINFO_TOKEN") | |
| if not token: | |
| return ("", "") | |
| try: | |
| url = f"https://ipinfo.io/{ip}/json?token={token}" | |
| r = requests.get(url, timeout=4) | |
| if r.status_code != 200: | |
| return ("", "") | |
| data = r.json() | |
| # Some plans: country="US" | |
| # Some plans: country_code="US" and country="United States" | |
| cc = (data.get("country_code") or data.get("country") or "").strip().upper() | |
| name = (data.get("country") or "").strip() | |
| # If name is actually a code like "US", expand it | |
| if len(name) == 2 and name.upper() == cc: | |
| name = _expand_country_code(cc) | |
| # If name is missing but cc exists, expand | |
| if not name and cc: | |
| name = _expand_country_code(cc) | |
| return (cc, name) | |
| except Exception: | |
| return ("", "") | |
| def append_visit_to_dataset( | |
| country: str, | |
| city: str, | |
| event_type: str = "usage_start", | |
| country_source: str = "unknown", | |
| country_code: str = "", | |
| **extra_fields | |
| ): | |
| api = _hf_api() | |
| if not api: | |
| return | |
| event = { | |
| "ts_utc": datetime.utcnow().isoformat() + "Z", | |
| "space_url": SPACE_URL, | |
| "event": event_type, | |
| "country": country or "Unknown", | |
| "country_code": (country_code or "").strip().upper(), | |
| "country_source": country_source or "unknown", | |
| "city": city or "", | |
| } | |
| if extra_fields: | |
| # Prevent JSON nulls | |
| event.update({k: v for k, v in extra_fields.items() if v is not None}) | |
| # Unique file path per event (prevents collisions) | |
| ts = datetime.utcnow().strftime("%Y%m%dT%H%M%S%f") | |
| uid = uuid.uuid4().hex[:8] | |
| path_in_repo = f"{USAGE_EVENTS_DIR}/{ts}_{uid}.json" | |
| try: | |
| api.upload_file( | |
| repo_id=USAGE_DATASET_REPO, | |
| repo_type="dataset", | |
| path_in_repo=path_in_repo, | |
| path_or_fileobj=BytesIO(json.dumps(event).encode("utf-8")), | |
| commit_message=f"log {event_type}", | |
| ) | |
| except Exception as e: | |
| print("telemetry upload failed:", repr(e)) | |
| def record_visit(request: gr.Request): | |
| # 1) Header hint | |
| country_hint = _country_from_headers(request) | |
| if _is_valid_country_code(country_hint): | |
| append_visit_to_dataset( | |
| country=_expand_country_code(country_hint), | |
| city="", | |
| event_type="usage_start", | |
| country_source="header", | |
| country_code=country_hint.strip().upper(), | |
| ) | |
| return | |
| # 2) IP-based lookup | |
| ip = _get_client_ip(request) | |
| if ip: | |
| cc, name = _country_lookup(ip) | |
| if _is_valid_country_code(cc): | |
| append_visit_to_dataset( | |
| country=name or _expand_country_code(cc), | |
| city="", | |
| event_type="usage_start", | |
| country_source="ipinfo", | |
| country_code=cc, | |
| ) | |
| else: | |
| append_visit_to_dataset( | |
| country="Unknown", | |
| city="", | |
| event_type="usage_start", | |
| country_source="ipinfo_unknown", | |
| country_code="", | |
| ) | |
| return | |
| # 3) Nothing usable | |
| append_visit_to_dataset( | |
| country="Unknown", | |
| city="", | |
| event_type="usage_start", | |
| country_source="none", | |
| country_code="", | |
| ) | |
| def _country_from_headers(request: gr.Request) -> str: | |
| if not request: | |
| return "" | |
| return ( | |
| request.headers.get("cf-ipcountry") or | |
| request.headers.get("x-country") or | |
| request.headers.get("x-geo-country") or | |
| "" | |
| ).strip().upper() | |
| def _is_valid_country_code(code: str) -> bool: | |
| if not code: | |
| return False | |
| code = code.strip().upper() | |
| # Common "unknown" markers from CDNs / proxies | |
| if code in {"XX", "ZZ", "UNKNOWN", "NA", "N/A", "NONE", "-"}: | |
| return False | |
| # ISO2 should be exactly 2 letters | |
| return len(code) == 2 and code.isalpha() | |
| def _expand_country_code(code: str) -> str: | |
| if not code or len(code) != 2: | |
| return "Unknown" | |
| try: | |
| country = pycountry.countries.get(alpha_2=code.upper()) | |
| return country.name if country else "Unknown" | |
| except Exception: | |
| return "Unknown" | |
| def migrate_legacy_jsonl_to_event_files( | |
| max_rows: int = 100000, | |
| sleep_s: float = 0.0, | |
| ) -> str: | |
| """ | |
| One-time migration: | |
| - Reads usage/visits_legacy.jsonl | |
| - Writes each row as its own event file under usage/events/legacy_<ts>_<n>.json | |
| - Skips if the legacy file doesn't exist | |
| - Does NOT delete legacy file (you can keep it as an archive) | |
| """ | |
| api = _hf_api() | |
| if not api: | |
| return "HF_TOKEN not available. Migration requires write access." | |
| # 1) Download legacy JSONL from dataset repo | |
| try: | |
| legacy_local = hf_hub_download( | |
| repo_id=USAGE_DATASET_REPO, | |
| repo_type="dataset", | |
| filename=LEGACY_JSONL_PATH, | |
| ) | |
| except Exception as e: | |
| return f"Legacy file not found or not accessible: {LEGACY_JSONL_PATH} ({repr(e)})" | |
| # 2) Read legacy rows | |
| rows = [] | |
| with open(legacy_local, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| rows.append(json.loads(line)) | |
| except Exception: | |
| pass | |
| if not rows: | |
| return "Legacy file exists but contained 0 parseable rows." | |
| rows = rows[:max_rows] | |
| # 3) (Optional) check if migration already happened by looking for any legacy_* files | |
| try: | |
| files = list_repo_files(repo_id=USAGE_DATASET_REPO, repo_type="dataset") | |
| already = any(p.startswith(f"{USAGE_EVENTS_DIR}/legacy_") for p in files) | |
| if already: | |
| return "Migration appears to have already run (found legacy_ files in usage/events). Aborting." | |
| except Exception: | |
| # If listing fails, proceed cautiously | |
| pass | |
| # 4) Upload each row as its own event file | |
| uploaded = 0 | |
| skipped = 0 | |
| for i, evt in enumerate(rows): | |
| # Ensure minimal schema | |
| ts = (evt.get("ts_utc") or "").strip() | |
| if not ts: | |
| # If no timestamp, synthesize one to avoid empty sorting later | |
| ts = datetime.utcnow().isoformat() + "Z" | |
| evt["ts_utc"] = ts | |
| # Sanitize filename timestamp (avoid ":" which is annoying in filenames) | |
| safe_ts = ( | |
| ts.replace(":", "") | |
| .replace("-", "") | |
| .replace(".", "") | |
| .replace("Z", "") | |
| .replace("T", "T") | |
| ) | |
| path_in_repo = f"{USAGE_EVENTS_DIR}/legacy_{safe_ts}_{i:05d}.json" | |
| try: | |
| api.upload_file( | |
| repo_id=USAGE_DATASET_REPO, | |
| repo_type="dataset", | |
| path_in_repo=path_in_repo, | |
| path_or_fileobj=BytesIO(json.dumps(evt, ensure_ascii=False).encode("utf-8")), | |
| commit_message="migrate legacy telemetry row", | |
| ) | |
| uploaded += 1 | |
| except Exception as e: | |
| skipped += 1 | |
| print("legacy migration upload failed:", path_in_repo, repr(e)) | |
| if sleep_s: | |
| time.sleep(sleep_s) | |
| return f"Legacy migration complete. Uploaded={uploaded}, Skipped={skipped}, TotalRowsRead={len(rows)}." | |
| def rebuild_visits_rollup_from_event_files() -> str: | |
| """ | |
| Rebuilds usage/visits.jsonl from immutable per-event JSON files in usage/events/. | |
| This is safe if triggered manually (admin button). | |
| """ | |
| api = _hf_api() | |
| if not api: | |
| return "HF_TOKEN not available. Rollup requires write access." | |
| # 1) List files | |
| try: | |
| files = list_repo_files(repo_id=USAGE_DATASET_REPO, repo_type="dataset") | |
| except Exception as e: | |
| return f"Could not list repo files: {repr(e)}" | |
| event_files = [ | |
| f for f in files | |
| if f.startswith(f"{USAGE_EVENTS_DIR}/") and f.endswith(".json") | |
| ] | |
| if not event_files: | |
| return f"No event files found under {USAGE_EVENTS_DIR}/" | |
| events = [] | |
| bad = 0 | |
| # 2) Download & parse each event | |
| for path in event_files: | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=USAGE_DATASET_REPO, | |
| repo_type="dataset", | |
| filename=path, | |
| ) | |
| with open(local_path, "r", encoding="utf-8") as f: | |
| events.append(json.load(f)) | |
| except Exception: | |
| bad += 1 | |
| if not events: | |
| return f"Found {len(event_files)} event files, but 0 were parseable (bad={bad})." | |
| # 3) Sort by ts_utc | |
| events.sort(key=lambda e: (e.get("ts_utc") or "")) | |
| # 4) Write JSONL | |
| buf = BytesIO() | |
| for evt in events: | |
| buf.write((json.dumps(evt, ensure_ascii=False) + "\n").encode("utf-8")) | |
| buf.seek(0) | |
| # 5) Upload rollup | |
| try: | |
| api.upload_file( | |
| repo_id=USAGE_DATASET_REPO, | |
| repo_type="dataset", | |
| path_in_repo=ROLLUP_PATH, | |
| path_or_fileobj=buf, | |
| commit_message=f"rebuild {ROLLUP_PATH} from {USAGE_EVENTS_DIR}", | |
| ) | |
| except Exception as e: | |
| return f"Rollup upload failed: {repr(e)}" | |
| return f"Rollup rebuilt: {ROLLUP_PATH} rows={len(events)} (bad_files={bad})." | |