Spaces:
Running
Running
| import psycopg2 | |
| from psycopg2.extras import execute_values, Json | |
| import pandas as pd | |
| from sentence_transformers import SentenceTransformer | |
| import os | |
| import datetime | |
| import logging | |
| from collections import deque | |
| from fastapi import FastAPI, BackgroundTasks, HTTPException | |
| from contextlib import asynccontextmanager | |
| from fastapi.responses import HTMLResponse | |
| import threading | |
| import json | |
| # --- Configuration --- | |
| SUPABASE_CONNECTION_STRING = os.getenv("SUPABASE_CONNECTION_STRING") | |
| # --- Toggles & Tuning --- | |
| PROCESSING_CHUNK_SIZE = 10 | |
| EMBEDDING_BATCH_SIZE = 32 | |
| DRY_RUN = False | |
| # --- Global State --- | |
| model = None | |
| execution_logs = deque(maxlen=50) | |
| processing_lock = threading.Lock() | |
| # --- Lifespan Manager --- | |
| async def lifespan(app: FastAPI): | |
| global model | |
| print("β³ Loading Model...") | |
| # Using the Alibaba GTE ModernBERT as requested | |
| model = SentenceTransformer('Alibaba-NLP/gte-modernbert-base', trust_remote_code=True) | |
| print("β Model Loaded.") | |
| yield | |
| print("π Shutting down...") | |
| app = FastAPI(lifespan=lifespan) | |
| # --- Helper Functions --- | |
| def fetch_and_lock_chunk(conn, chunk_size): | |
| """ | |
| Fetches the next batch of JOBS from the new denormalized schema | |
| and LOCKS them using FOR UPDATE SKIP LOCKED. | |
| """ | |
| query = """ | |
| WITH locked_jobs AS ( | |
| SELECT | |
| id, | |
| title, | |
| company_name, | |
| location, | |
| work_model, | |
| employment_type, | |
| roles_and_responsibilities, | |
| qualification, | |
| min_experience | |
| FROM jobs | |
| WHERE | |
| -- Condition 1: Embedding is missing (New Job) | |
| embeddings IS NULL | |
| OR | |
| -- Condition 2: Job created after the last embedding (Retry/Update Logic) | |
| -- Note: Since there is no 'updated_at' column, we rely on created_at vs embeddings_created_at | |
| (embeddings_created_at IS NOT NULL AND created_at > embeddings_created_at) | |
| LIMIT %s | |
| FOR UPDATE SKIP LOCKED | |
| ) | |
| SELECT * FROM locked_jobs; | |
| """ | |
| # pandas read_sql usually handles JSONB columns as standard Python objects (lists/dicts) | |
| return pd.read_sql_query(query, conn, params=(chunk_size,)) | |
| def clean_and_format_text(row): | |
| """ | |
| Joins denormalized columns into a single semantic string for embedding. | |
| """ | |
| # Configuration: Maps DB Column -> Semantic Tag | |
| # (Column Name in DF, Label for Text) | |
| field_config = [ | |
| ('title', 'Job Title'), | |
| ('company_name', 'Company'), | |
| ('location', 'Location'), | |
| ('work_model', 'Work Model'), | |
| ('min_experience', 'Minimum Experience (Years)'), | |
| ('roles_and_responsibilities', 'Responsibilities'), | |
| ('qualification', 'Qualifications') | |
| ] | |
| text_parts = [] | |
| for col_name, tag in field_config: | |
| if col_name in row and row[col_name] is not None: | |
| data = row[col_name] | |
| # Case A: JSONB List (Roles, Qualifications) | |
| if isinstance(data, list): | |
| # Filter out empty strings or None values | |
| clean_items = [str(item).strip() for item in data if item and str(item).strip()] | |
| if clean_items: | |
| text_parts.append(f"{tag}: " + ", ".join(clean_items)) | |
| # Case B: Standard String/Int (Title, Company, Experience) | |
| elif str(data).strip(): | |
| clean_text = str(data).strip().replace('\r', '') | |
| text_parts.append(f"{tag}: {clean_text}") | |
| # Combine all parts with newlines | |
| return "\n".join(text_parts) | |
| def update_db_batch(conn, updates): | |
| if DRY_RUN: return | |
| # Update the 'embeddings' column and the 'embeddings_created_at' timestamp | |
| query = """ | |
| UPDATE jobs AS j | |
| SET embeddings = data.vector::vector, | |
| embeddings_created_at = NOW() | |
| FROM (VALUES %s) AS data (id, vector) | |
| WHERE j.id = data.id::uuid | |
| """ | |
| cursor = conn.cursor() | |
| try: | |
| execute_values(cursor, query, updates) | |
| conn.commit() | |
| except Exception as e: | |
| conn.rollback() | |
| raise e | |
| finally: | |
| cursor.close() | |
| def run_worker_logic(): | |
| """ | |
| The core logic that runs one single batch processing for JOBS. | |
| """ | |
| log_buffer = [] | |
| timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| log_buffer.append(f"<b>BATCH RUN: {timestamp}</b>") | |
| conn = None | |
| try: | |
| conn = psycopg2.connect(SUPABASE_CONNECTION_STRING) | |
| # 1. Fetch & Lock | |
| df = fetch_and_lock_chunk(conn, PROCESSING_CHUNK_SIZE) | |
| if df.empty: | |
| conn.rollback() | |
| log_buffer.append("π€ No pending jobs found.") | |
| execution_logs.appendleft("<br>".join(log_buffer)) | |
| return "No data" | |
| log_buffer.append(f"π Locked & Processing {len(df)} jobs...") | |
| # 2. Clean Text | |
| df['full_text'] = df.apply(clean_and_format_text, axis=1) | |
| # 3. Log Inputs (for debugging/visibility) | |
| for index, row in df.iterrows(): | |
| log_buffer.append(f"<div style='border:1px solid #ccc; margin:5px; padding:5px; background:#f9f9f9'>") | |
| log_buffer.append(f"<strong>ID: {row['id']} - {row.get('title', 'Unknown')}</strong>") | |
| log_buffer.append(f"<pre style='white-space: pre-wrap; font-size: 0.8em;'>{row['full_text']}</pre>") | |
| log_buffer.append("</div>") | |
| # 4. Generate Embeddings | |
| # Note: Ensure the model dimensions match your DB vector size (ModernBERT is typically 768) | |
| embeddings = model.encode( | |
| df['full_text'].tolist(), | |
| batch_size=EMBEDDING_BATCH_SIZE, | |
| show_progress_bar=False, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True | |
| ) | |
| # 5. Update DB | |
| updates = list(zip(df['id'].tolist(), embeddings.tolist())) | |
| if not DRY_RUN: | |
| update_db_batch(conn, updates) | |
| log_buffer.append(f"β Successfully updated {len(df)} jobs.") | |
| else: | |
| conn.rollback() | |
| log_buffer.append("β οΈ Dry Run: No DB updates made.") | |
| except Exception as e: | |
| if conn: conn.rollback() | |
| log_buffer.append(f"β ERROR: {str(e)}") | |
| print(f"Error: {e}") | |
| finally: | |
| if conn: conn.close() | |
| execution_logs.appendleft("<br>".join(log_buffer)) | |
| # --- API Endpoints --- | |
| async def read_root(): | |
| html_content = """ | |
| <html> | |
| <head> | |
| <title>Job Embedding Worker Logs</title> | |
| <style> | |
| body { font-family: monospace; padding: 20px; } | |
| h1 { color: #333; } | |
| .log-entry { margin-bottom: 20px; border-bottom: 2px solid #333; padding-bottom: 20px; } | |
| </style> | |
| </head> | |
| <body> | |
| <h1>π Job Embedding Worker Logs</h1> | |
| <p><i>Most recent batches shown first.</i></p> | |
| <hr> | |
| """ | |
| if not execution_logs: | |
| html_content += "<p>No logs yet. Hit the <code>/trigger-batch</code> endpoint to start processing.</p>" | |
| for entry in execution_logs: | |
| html_content += f"<div class='log-entry'>{entry}</div>" | |
| html_content += "</body></html>" | |
| return html_content | |
| async def trigger_processing(background_tasks: BackgroundTasks): | |
| if processing_lock.locked(): | |
| return {"status": "busy", "message": "Worker is currently processing a previous batch."} | |
| background_tasks.add_task(wrapped_worker) | |
| return {"status": "started", "message": "Batch processing started in background."} | |
| def wrapped_worker(): | |
| if processing_lock.acquire(blocking=False): | |
| try: | |
| run_worker_logic() | |
| finally: | |
| processing_lock.release() |