AI_Recruiting_Agent / usage_logging.py
19arjun89's picture
Update usage_logging.py
d8dfdc3 verified
"""
usage_logging.py
----------------
Purpose:
This module implements privacy-preserving telemetry for the
AI Recruiting Agent Hugging Face Space.
Its sole purpose is to measure anonymous usage and adoption
metrics in order to:
- Understand how the tool is being used
- Improve reliability and performance
- Gauge sense of real-world adoption
- Support research and evaluation of responsible AI practices
Privacy Principles:
This module is explicitly designed to minimize data collection
and avoid storing any personally identifiable information (PII).
It DOES NOT collect or store:
- Raw IP addresses
- User names or Hugging Face account IDs
- Resume contents or job descriptions
- Emails, phone numbers, or file names
- Full user-agent strings or device fingerprints
- Any demographic attributes about users
It ONLY records:
- Approximate country and city (derived from IP, not stored)
- UTC timestamp of the event
- Space URL
- High-level event type (e.g., "app_open")
- Non-identifying, aggregate metadata (e.g., counts, booleans, latencies)
All usage logs are:
- Anonymized
- Append-only
- Persisted in a public Hugging Face Dataset repository (https://huggingface.co/datasets/19arjun89/ai_recruiting_agent_usage)
- Versioned via immutable commit history for auditability
Ethical Safeguards:
- Logging failures never break application functionality
- No raw identifiers are persisted at any time
- All telemetry is optional and best-effort
- The system is intended for transparency and improvement,
not for surveillance or profiling
Transparency:
A public-facing usage reporting Space will be provided to allow
independent verification of aggregate adoption metrics.
Author:
Arjun Singh
Last Updated:
2026-01-27
"""
import os
import json
from datetime import datetime
import requests
import gradio as gr
from huggingface_hub import HfApi, list_repo_files, hf_hub_download
import ipaddress
import pycountry
from io import BytesIO
import uuid
import time
SPACE_URL = "https://huggingface.co/spaces/19arjun89/AI_Recruiting_Agent"
USAGE_DATASET_REPO = "19arjun89/ai_recruiting_agent_usage"
USAGE_EVENTS_DIR = "usage/events"
LEGACY_JSONL_PATH = "usage/visits_legacy.jsonl"
ROLLUP_PATH = "usage/visits.jsonl"
def _hf_api():
token = os.environ.get("HF_TOKEN")
if not token:
return None
return HfApi(token=token)
def _is_public_ip(ip: str) -> bool:
try:
obj = ipaddress.ip_address(ip)
return not (obj.is_private or obj.is_loopback or obj.is_reserved or obj.is_multicast or obj.is_link_local)
except Exception:
return False
def _get_client_ip(request: gr.Request) -> str:
if request:
xff = request.headers.get("x-forwarded-for")
if xff:
for part in xff.split(","):
ip = part.strip()
if _is_public_ip(ip):
return ip
if request.client:
host = request.client.host
return host if _is_public_ip(host) else ""
return ""
def _country_lookup(ip: str) -> tuple[str, str]:
token = os.environ.get("IPINFO_TOKEN")
if not token:
return ("", "")
try:
url = f"https://ipinfo.io/{ip}/json?token={token}"
r = requests.get(url, timeout=4)
if r.status_code != 200:
return ("", "")
data = r.json()
# Some plans: country="US"
# Some plans: country_code="US" and country="United States"
cc = (data.get("country_code") or data.get("country") or "").strip().upper()
name = (data.get("country") or "").strip()
# If name is actually a code like "US", expand it
if len(name) == 2 and name.upper() == cc:
name = _expand_country_code(cc)
# If name is missing but cc exists, expand
if not name and cc:
name = _expand_country_code(cc)
return (cc, name)
except Exception:
return ("", "")
def append_visit_to_dataset(
country: str,
city: str,
event_type: str = "usage_start",
country_source: str = "unknown",
country_code: str = "",
**extra_fields
):
api = _hf_api()
if not api:
return
event = {
"ts_utc": datetime.utcnow().isoformat() + "Z",
"space_url": SPACE_URL,
"event": event_type,
"country": country or "Unknown",
"country_code": (country_code or "").strip().upper(),
"country_source": country_source or "unknown",
"city": city or "",
}
if extra_fields:
# Prevent JSON nulls
event.update({k: v for k, v in extra_fields.items() if v is not None})
# Unique file path per event (prevents collisions)
ts = datetime.utcnow().strftime("%Y%m%dT%H%M%S%f")
uid = uuid.uuid4().hex[:8]
path_in_repo = f"{USAGE_EVENTS_DIR}/{ts}_{uid}.json"
try:
api.upload_file(
repo_id=USAGE_DATASET_REPO,
repo_type="dataset",
path_in_repo=path_in_repo,
path_or_fileobj=BytesIO(json.dumps(event).encode("utf-8")),
commit_message=f"log {event_type}",
)
except Exception as e:
print("telemetry upload failed:", repr(e))
def record_visit(request: gr.Request):
# 1) Header hint
country_hint = _country_from_headers(request)
if _is_valid_country_code(country_hint):
append_visit_to_dataset(
country=_expand_country_code(country_hint),
city="",
event_type="usage_start",
country_source="header",
country_code=country_hint.strip().upper(),
)
return
# 2) IP-based lookup
ip = _get_client_ip(request)
if ip:
cc, name = _country_lookup(ip)
if _is_valid_country_code(cc):
append_visit_to_dataset(
country=name or _expand_country_code(cc),
city="",
event_type="usage_start",
country_source="ipinfo",
country_code=cc,
)
else:
append_visit_to_dataset(
country="Unknown",
city="",
event_type="usage_start",
country_source="ipinfo_unknown",
country_code="",
)
return
# 3) Nothing usable
append_visit_to_dataset(
country="Unknown",
city="",
event_type="usage_start",
country_source="none",
country_code="",
)
def _country_from_headers(request: gr.Request) -> str:
if not request:
return ""
return (
request.headers.get("cf-ipcountry") or
request.headers.get("x-country") or
request.headers.get("x-geo-country") or
""
).strip().upper()
def _is_valid_country_code(code: str) -> bool:
if not code:
return False
code = code.strip().upper()
# Common "unknown" markers from CDNs / proxies
if code in {"XX", "ZZ", "UNKNOWN", "NA", "N/A", "NONE", "-"}:
return False
# ISO2 should be exactly 2 letters
return len(code) == 2 and code.isalpha()
def _expand_country_code(code: str) -> str:
if not code or len(code) != 2:
return "Unknown"
try:
country = pycountry.countries.get(alpha_2=code.upper())
return country.name if country else "Unknown"
except Exception:
return "Unknown"
def migrate_legacy_jsonl_to_event_files(
max_rows: int = 100000,
sleep_s: float = 0.0,
) -> str:
"""
One-time migration:
- Reads usage/visits_legacy.jsonl
- Writes each row as its own event file under usage/events/legacy_<ts>_<n>.json
- Skips if the legacy file doesn't exist
- Does NOT delete legacy file (you can keep it as an archive)
"""
api = _hf_api()
if not api:
return "HF_TOKEN not available. Migration requires write access."
# 1) Download legacy JSONL from dataset repo
try:
legacy_local = hf_hub_download(
repo_id=USAGE_DATASET_REPO,
repo_type="dataset",
filename=LEGACY_JSONL_PATH,
)
except Exception as e:
return f"Legacy file not found or not accessible: {LEGACY_JSONL_PATH} ({repr(e)})"
# 2) Read legacy rows
rows = []
with open(legacy_local, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
rows.append(json.loads(line))
except Exception:
pass
if not rows:
return "Legacy file exists but contained 0 parseable rows."
rows = rows[:max_rows]
# 3) (Optional) check if migration already happened by looking for any legacy_* files
try:
files = list_repo_files(repo_id=USAGE_DATASET_REPO, repo_type="dataset")
already = any(p.startswith(f"{USAGE_EVENTS_DIR}/legacy_") for p in files)
if already:
return "Migration appears to have already run (found legacy_ files in usage/events). Aborting."
except Exception:
# If listing fails, proceed cautiously
pass
# 4) Upload each row as its own event file
uploaded = 0
skipped = 0
for i, evt in enumerate(rows):
# Ensure minimal schema
ts = (evt.get("ts_utc") or "").strip()
if not ts:
# If no timestamp, synthesize one to avoid empty sorting later
ts = datetime.utcnow().isoformat() + "Z"
evt["ts_utc"] = ts
# Sanitize filename timestamp (avoid ":" which is annoying in filenames)
safe_ts = (
ts.replace(":", "")
.replace("-", "")
.replace(".", "")
.replace("Z", "")
.replace("T", "T")
)
path_in_repo = f"{USAGE_EVENTS_DIR}/legacy_{safe_ts}_{i:05d}.json"
try:
api.upload_file(
repo_id=USAGE_DATASET_REPO,
repo_type="dataset",
path_in_repo=path_in_repo,
path_or_fileobj=BytesIO(json.dumps(evt, ensure_ascii=False).encode("utf-8")),
commit_message="migrate legacy telemetry row",
)
uploaded += 1
except Exception as e:
skipped += 1
print("legacy migration upload failed:", path_in_repo, repr(e))
if sleep_s:
time.sleep(sleep_s)
return f"Legacy migration complete. Uploaded={uploaded}, Skipped={skipped}, TotalRowsRead={len(rows)}."
def rebuild_visits_rollup_from_event_files() -> str:
"""
Rebuilds usage/visits.jsonl from immutable per-event JSON files in usage/events/.
This is safe if triggered manually (admin button).
"""
api = _hf_api()
if not api:
return "HF_TOKEN not available. Rollup requires write access."
# 1) List files
try:
files = list_repo_files(repo_id=USAGE_DATASET_REPO, repo_type="dataset")
except Exception as e:
return f"Could not list repo files: {repr(e)}"
event_files = [
f for f in files
if f.startswith(f"{USAGE_EVENTS_DIR}/") and f.endswith(".json")
]
if not event_files:
return f"No event files found under {USAGE_EVENTS_DIR}/"
events = []
bad = 0
# 2) Download & parse each event
for path in event_files:
try:
local_path = hf_hub_download(
repo_id=USAGE_DATASET_REPO,
repo_type="dataset",
filename=path,
)
with open(local_path, "r", encoding="utf-8") as f:
events.append(json.load(f))
except Exception:
bad += 1
if not events:
return f"Found {len(event_files)} event files, but 0 were parseable (bad={bad})."
# 3) Sort by ts_utc
events.sort(key=lambda e: (e.get("ts_utc") or ""))
# 4) Write JSONL
buf = BytesIO()
for evt in events:
buf.write((json.dumps(evt, ensure_ascii=False) + "\n").encode("utf-8"))
buf.seek(0)
# 5) Upload rollup
try:
api.upload_file(
repo_id=USAGE_DATASET_REPO,
repo_type="dataset",
path_in_repo=ROLLUP_PATH,
path_or_fileobj=buf,
commit_message=f"rebuild {ROLLUP_PATH} from {USAGE_EVENTS_DIR}",
)
except Exception as e:
return f"Rollup upload failed: {repr(e)}"
return f"Rollup rebuilt: {ROLLUP_PATH} rows={len(events)} (bad_files={bad})."