latch_candidates / main.py
aursalan's picture
Added condition to update embeddings when profile is updated
93be849
import psycopg2
from psycopg2.extras import execute_values
import pandas as pd
from sentence_transformers import SentenceTransformer
import os
import datetime
import logging
from collections import deque
from fastapi import FastAPI, BackgroundTasks, HTTPException
from contextlib import asynccontextmanager
from fastapi.responses import HTMLResponse
import threading
# --- Configuration ---
SUPABASE_CONNECTION_STRING = os.getenv("SUPABASE_CONNECTION_STRING")
# --- Toggles & Tuning ---
PROCESSING_CHUNK_SIZE = 32
EMBEDDING_BATCH_SIZE = 32
DRY_RUN = False
# --- Global State ---
model = None
execution_logs = deque(maxlen=50)
is_processing = False
processing_lock = threading.Lock()
# --- Lifespan Manager ---
@asynccontextmanager
async def lifespan(app: FastAPI):
global model
print("⏳ Loading Model...")
model = SentenceTransformer('Alibaba-NLP/gte-modernbert-base', trust_remote_code=True)
print("βœ… Model Loaded.")
yield
print("πŸ›‘ Shutting down...")
app = FastAPI(lifespan=lifespan)
# --- Helper Functions ---
def fetch_and_lock_chunk(conn, chunk_size):
"""
Fetches candidates from the denormalized table where embeddings are missing.
"""
query = """
SELECT
id,
name,
summary,
work_experience,
projects,
education,
achievements,
certifications,
volunteering,
skills,
languages
FROM public.candidates
WHERE
-- Condition 1: Embedding is missing (New Job)
embeddings IS NULL
OR
-- Condition 2: Job created after the last embedding (Retry/Update Logic)
-- Note: Since there is no 'updated_at' column, we rely on created_at vs embeddings_created_at
(embeddings_created_at IS NOT NULL AND created_at > embeddings_created_at)
FOR UPDATE SKIP LOCKED
LIMIT %s
"""
# Note: If you add an 'updated_at' column later, change WHERE to:
# WHERE embeddings IS NULL OR updated_at > embeddings_created_at
return pd.read_sql_query(query, conn, params=(chunk_size,))
def clean_and_format_text(row):
"""
Parses the JSONB and Array columns from the new schema to create a
rich text representation for embedding.
"""
text_parts = []
# 1. Basic Info
if row.get('name'):
text_parts.append(f"Name: {row['name']}")
if row.get('summary'):
text_parts.append(f"Summary: {row['summary']}")
# 2. Skills (Postgres Array -> Python List)
if row.get('skills') and isinstance(row['skills'], list):
# Filter out empty strings/None
valid_skills = [s for s in row['skills'] if s]
if valid_skills:
text_parts.append(f"Skills: {', '.join(valid_skills)}")
# 3. Work Experience (JSONB List of Dicts)
# Schema keys: role, company, description, duration
if row.get('work_experience') and isinstance(row['work_experience'], list):
exps = []
for item in row['work_experience']:
if isinstance(item, dict):
role = item.get('role', '')
company = item.get('company', '')
desc = item.get('description', '')
# Format: "Role at Company: Description"
entry = f"{role} at {company}".strip()
if desc:
entry += f": {desc}"
exps.append(entry)
if exps:
text_parts.append("Work Experience:\n" + "\n".join(exps))
# 4. Projects (JSONB List of Dicts)
# Schema keys: title, description, link
if row.get('projects') and isinstance(row['projects'], list):
projs = []
for item in row['projects']:
if isinstance(item, dict):
title = item.get('title', '')
desc = item.get('description', '')
entry = f"{title}".strip()
if desc:
entry += f": {desc}"
projs.append(entry)
if projs:
text_parts.append("Projects:\n" + "\n".join(projs))
# 5. Education (JSONB List of Dicts)
# Schema keys: degree, institution, year
if row.get('education') and isinstance(row['education'], list):
edus = []
for item in row['education']:
if isinstance(item, dict):
degree = item.get('degree', '')
inst = item.get('institution', '')
entry = f"{degree} from {inst}".strip()
edus.append(entry)
if edus:
text_parts.append("Education: " + ", ".join(edus))
# 6. Certifications (JSONB List of Dicts)
# Schema keys: name, issuer
if row.get('certifications') and isinstance(row['certifications'], list):
certs = []
for item in row['certifications']:
if isinstance(item, dict):
name = item.get('name', '')
issuer = item.get('issuer', '')
entry = f"{name} by {issuer}".strip()
certs.append(entry)
if certs:
text_parts.append("Certifications: " + ", ".join(certs))
# 7. Achievements (JSONB List of Dicts)
if row.get('achievements') and isinstance(row['achievements'], list):
achievements = []
for item in row['achievements']:
if isinstance(item, dict):
title = item.get('title', '')
desc = item.get('description', '')
entry = f"{title}: {desc}".strip()
achievements.append(entry)
if achievements:
text_parts.append("Achievements: " + "; ".join(achievements))
return "\n\n".join(text_parts)
def update_db_batch(conn, updates):
if DRY_RUN: return
# Updated to target public.candidates and cast ID to UUID
query = """
UPDATE public.candidates AS c
SET embeddings = data.vector::vector,
embeddings_created_at = NOW()
FROM (VALUES %s) AS data (id, vector)
WHERE c.id = data.id::uuid
"""
cursor = conn.cursor()
try:
execute_values(cursor, query, updates)
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
def run_worker_logic():
"""
The core logic that runs one single batch processing.
"""
log_buffer = []
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_buffer.append(f"<b>BATCH RUN: {timestamp}</b>")
conn = None
try:
conn = psycopg2.connect(SUPABASE_CONNECTION_STRING, sslmode='require')
# 1. Fetch & Lock
df = fetch_and_lock_chunk(conn, PROCESSING_CHUNK_SIZE)
if df.empty:
conn.rollback()
log_buffer.append("πŸ’€ No pending candidates found.")
execution_logs.appendleft("<br>".join(log_buffer))
return "No data"
log_buffer.append(f"πŸ”’ Locked & Processing {len(df)} candidates...")
# 2. Clean Text
df['full_text'] = df.apply(clean_and_format_text, axis=1)
# 3. Log Inputs (For the Root API view)
for index, row in df.iterrows():
log_buffer.append(f"<div style='border:1px solid #ccc; margin:5px; padding:5px; background:#f9f9f9'>")
# row['id'] is now the UUID
log_buffer.append(f"<strong>ID: {row['id']} ({row.get('name', 'Unknown')})</strong>")
log_buffer.append(f"<pre style='white-space: pre-wrap;'>{row['full_text']}</pre>")
log_buffer.append("</div>")
# 4. Generate Embeddings
embeddings = model.encode(
df['full_text'].tolist(),
batch_size=EMBEDDING_BATCH_SIZE,
show_progress_bar=False,
convert_to_numpy=True,
normalize_embeddings=True
)
# 5. Update DB
# Ensure ID is converted to string for the tuple list if it isn't already
updates = list(zip(df['id'].astype(str).tolist(), embeddings.tolist()))
if not DRY_RUN:
update_db_batch(conn, updates)
log_buffer.append(f"βœ… Successfully updated {len(df)} profiles.")
else:
conn.rollback()
log_buffer.append("⚠️ Dry Run: No DB updates made.")
except Exception as e:
if conn: conn.rollback()
log_buffer.append(f"❌ ERROR: {str(e)}")
print(f"Error: {e}")
finally:
if conn: conn.close()
execution_logs.appendleft("<br>".join(log_buffer))
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def read_root():
html_content = """
<html>
<head>
<title>Embedding Worker Logs</title>
<style>
body { font-family: monospace; padding: 20px; }
h1 { color: #333; }
.log-entry { margin-bottom: 20px; border-bottom: 2px solid #333; padding-bottom: 20px; }
</style>
</head>
<body>
<h1>πŸ“œ Candidates Embedding Worker</h1>
<p><i>Most recent batches shown first.</i></p>
<hr>
"""
if not execution_logs:
html_content += "<p>No logs yet. Hit the <code>/trigger-batch</code> endpoint to start processing.</p>"
for entry in execution_logs:
html_content += f"<div class='log-entry'>{entry}</div>"
html_content += "</body></html>"
return html_content
@app.get("/trigger-batch")
async def trigger_processing(background_tasks: BackgroundTasks):
if processing_lock.locked():
return {"status": "busy", "message": "Worker is currently processing a previous batch."}
background_tasks.add_task(wrapped_worker)
return {"status": "started", "message": "Batch processing started in background."}
def wrapped_worker():
if processing_lock.acquire(blocking=False):
try:
run_worker_logic()
finally:
processing_lock.release()