Spaces:
Running
Running
| import json | |
| import os | |
| import secrets | |
| import tempfile | |
| from datetime import datetime | |
| from typing import Dict, Any, Optional | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from huggingface_hub.utils import EntryNotFoundError | |
| # Configuration for HF Dataset-based ledger | |
| LEDGER_REPO = os.getenv("LEDGER_DATASET_ID", "") | |
| LEDGER_FILE = "subscriptions.jsonl" | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| # Fallback to local file if LEDGER_DATASET_ID not set (for local dev) | |
| LOCAL_LEDGER_FILE = "../subscriptions_ledger/subscriptions.jsonl" | |
| # Initialize HF API | |
| api = HfApi(token=HF_TOKEN) if HF_TOKEN else None | |
| def _use_hf_storage() -> bool: | |
| """Check if we should use HF Dataset storage.""" | |
| return bool(LEDGER_REPO and HF_TOKEN and api) | |
| def _download_ledger() -> Optional[str]: | |
| """Download current ledger from HF Dataset.""" | |
| if not _use_hf_storage(): | |
| return None | |
| try: | |
| path = hf_hub_download( | |
| repo_id=LEDGER_REPO, | |
| filename=LEDGER_FILE, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| return path | |
| except EntryNotFoundError: | |
| # File doesn't exist yet in the dataset | |
| return None | |
| except Exception as e: | |
| print(f"Error downloading ledger: {e}") | |
| return None | |
| def _upload_ledger(local_path: str) -> bool: | |
| """Upload ledger to HF Dataset.""" | |
| if not _use_hf_storage(): | |
| return False | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=LEDGER_FILE, | |
| repo_id=LEDGER_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message=f"Update subscriptions ledger - {datetime.utcnow().isoformat()}" | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"Error uploading ledger: {e}") | |
| return False | |
| def _get_ledger_path() -> str: | |
| """Get the path to read the ledger from.""" | |
| if _use_hf_storage(): | |
| hf_path = _download_ledger() | |
| if hf_path: | |
| return hf_path | |
| # Fallback to local file | |
| return LOCAL_LEDGER_FILE | |
| def append_subscription_event(event: Dict[str, Any]) -> bool: | |
| """ | |
| Appends a subscription event to the ledger. | |
| If using HF Dataset, downloads, appends, and re-uploads. | |
| """ | |
| # Ensure timestamp is present | |
| if "created_at" not in event: | |
| event["created_at"] = datetime.utcnow().isoformat() + "Z" | |
| if _use_hf_storage(): | |
| # Download current ledger | |
| current_path = _download_ledger() | |
| # Create temp file to work with | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as tmp: | |
| tmp_path = tmp.name | |
| # Copy existing content if any | |
| if current_path and os.path.exists(current_path): | |
| with open(current_path, 'r') as f: | |
| tmp.write(f.read()) | |
| # Append new event | |
| tmp.write(json.dumps(event) + "\n") | |
| # Upload back to HF | |
| success = _upload_ledger(tmp_path) | |
| # Clean up temp file | |
| try: | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| return success | |
| else: | |
| # Local file storage | |
| parent_dir = os.path.dirname(LOCAL_LEDGER_FILE) | |
| if parent_dir: | |
| os.makedirs(parent_dir, exist_ok=True) | |
| with open(LOCAL_LEDGER_FILE, "a") as f: | |
| f.write(json.dumps(event) + "\n") | |
| return True | |
| def get_all_subscriptions() -> Dict[tuple, Dict[str, Any]]: | |
| """ | |
| Reads the ledger and folds events to find the latest state for each (user, dataset). | |
| Returns a dict: key=(hf_user, dataset_id), value=subscription_record | |
| """ | |
| subscriptions = {} | |
| ledger_path = _get_ledger_path() | |
| if not ledger_path or not os.path.exists(ledger_path): | |
| return subscriptions | |
| try: | |
| with open(ledger_path, "r") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| event = json.loads(line) | |
| hf_user = event.get("hf_user") | |
| dataset_id = event.get("dataset_id") | |
| if hf_user and dataset_id: | |
| key = (hf_user, dataset_id) | |
| # Append-only ledger: latest record is current state | |
| subscriptions[key] = event | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| print(f"Error reading ledger: {e}") | |
| return subscriptions | |
| def get_active_subscription(hf_user: str, dataset_id: str) -> Optional[Dict[str, Any]]: | |
| """Checks if a user has an active subscription to a dataset.""" | |
| all_subs = get_all_subscriptions() | |
| sub = all_subs.get((hf_user, dataset_id)) | |
| if not sub: | |
| return None | |
| # Check expiry | |
| subscription_end = sub.get("subscription_end") | |
| if not subscription_end: | |
| return None | |
| try: | |
| # Handle Z suffix if present | |
| if subscription_end.endswith("Z"): | |
| subscription_end = subscription_end[:-1] | |
| end_date = datetime.fromisoformat(subscription_end) | |
| if end_date > datetime.utcnow(): | |
| return sub | |
| except ValueError: | |
| print(f"Error parsing date: {subscription_end}") | |
| return None | |
| return None | |
| def get_user_subscriptions(hf_user: str) -> list: | |
| """Get all subscriptions for a specific user.""" | |
| all_subs = get_all_subscriptions() | |
| user_subs = [] | |
| for (user, dataset_id), sub in all_subs.items(): | |
| if user == hf_user: | |
| # Add active status | |
| end_str = sub.get("subscription_end", "") | |
| is_active = False | |
| if end_str: | |
| try: | |
| if end_str.endswith("Z"): | |
| end_str = end_str[:-1] | |
| end_date = datetime.fromisoformat(end_str) | |
| is_active = end_date > datetime.utcnow() | |
| except: | |
| pass | |
| user_subs.append({ | |
| **sub, | |
| "is_active": is_active | |
| }) | |
| return user_subs | |
| def generate_access_token() -> str: | |
| """Generate a secure random access token.""" | |
| return f"hfdata_{secrets.token_urlsafe(32)}" | |
| def validate_access_token(access_token: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Validate an access token and return the subscription info if valid. | |
| Returns None if token is invalid or subscription expired. | |
| """ | |
| all_subs = get_all_subscriptions() | |
| for (hf_user, dataset_id), sub in all_subs.items(): | |
| if sub.get("access_token") == access_token: | |
| # Check if subscription is still active | |
| end_str = sub.get("subscription_end", "") | |
| if end_str: | |
| try: | |
| if end_str.endswith("Z"): | |
| end_str = end_str[:-1] | |
| end_date = datetime.fromisoformat(end_str) | |
| if end_date > datetime.utcnow(): | |
| return { | |
| "hf_user": hf_user, | |
| "dataset_id": dataset_id, | |
| "subscription": sub | |
| } | |
| except: | |
| pass | |
| return None # Token found but subscription expired | |
| return None # Token not found | |
| def get_subscription_by_token(access_token: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get subscription details by access token. | |
| Alias for validate_access_token for clarity. | |
| """ | |
| return validate_access_token(access_token) | |