Spaces:
Sleeping
Sleeping
import pandas as pd | |
import requests | |
from pydantic import BaseModel, Field | |
from typing import List, Tuple, Optional | |
from langchain_openai import ChatOpenAI | |
from langchain_core.prompts import ChatPromptTemplate | |
import os | |
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends, Header, Request | |
from fastapi.responses import JSONResponse | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
from fastapi.middleware.cors import CORSMiddleware | |
import json | |
import tempfile | |
import shutil | |
import PyPDF2 | |
from dotenv import load_dotenv | |
import pdfplumber | |
import re | |
from db import * | |
import time | |
import asyncio | |
from contextlib import asynccontextmanager | |
import logging | |
from sqlalchemy.pool import NullPool | |
from cloud_config import * | |
import uuid | |
# Load environment variables | |
load_dotenv() | |
# Configure logging for Cloud Run | |
logging.basicConfig( | |
level=getattr(logging, LOG_LEVEL), | |
format=LOG_FORMAT | |
) | |
logger = logging.getLogger(__name__) | |
# Global variable to store access token | |
access_token = None | |
# Startup/shutdown events | |
async def lifespan(app: FastAPI): | |
# Startup | |
logger.info("Starting up Job Recommendation API...") | |
# You can initialize connection pools here if needed | |
yield | |
# Shutdown | |
logger.info("Shutting down Job Recommendation API...") | |
# Close any open connections here | |
# Initialize FastAPI app with lifespan | |
app = FastAPI( | |
title="Job Recommendation API", | |
description="API for processing resumes and recommending jobs", | |
lifespan=lifespan | |
) | |
# Add CORS middleware for cloud deployment | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # Configure based on your needs | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Add request ID middleware for better tracing | |
async def add_request_id(request: Request, call_next): | |
request_id = f"{time.time()}-{request.client.host}" | |
request.state.request_id = request_id | |
# Log the request | |
logger.info(f"Request ID: {request_id} - {request.method} {request.url.path}") | |
try: | |
response = await call_next(request) | |
response.headers["X-Request-ID"] = request_id | |
return response | |
except Exception as e: | |
logger.error(f"Request ID: {request_id} - Error: {str(e)}") | |
raise | |
# Security configuration | |
API_KEY = os.getenv("API_KEY") | |
security = HTTPBearer() | |
def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
""" | |
Verify the API key from the Authorization header | |
""" | |
if not API_KEY: | |
logger.error("API key not configured") | |
raise HTTPException( | |
status_code=500, | |
detail="API key not configured", | |
) | |
if credentials.credentials != API_KEY: | |
logger.warning("Invalid API key attempt") | |
raise HTTPException( | |
status_code=401, | |
detail="Invalid API key", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
return credentials.credentials | |
# Initialize OpenAI client with error handling | |
try: | |
llm = ChatOpenAI( | |
model="gpt-4o-mini", | |
temperature=0, | |
api_key=os.getenv("OPENAI_API_KEY") | |
) | |
logger.info("OpenAI client initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize OpenAI client: {e}") | |
raise | |
# Initialize database engine with connection pooling suitable for Cloud Run | |
def get_engine(): | |
""" | |
Get database engine with NullPool for Cloud Run | |
""" | |
try: | |
conn_string = f"postgresql://{DB_PARAMS['user']}:{DB_PARAMS['password']}@{DB_PARAMS['host']}:{DB_PARAMS['port']}/{DB_PARAMS['dbname']}" | |
# Use NullPool for Cloud Run to avoid connection issues | |
engine = create_engine(conn_string, poolclass=NullPool, pool_pre_ping=True) | |
logger.info("Database engine created successfully") | |
return engine | |
except Exception as e: | |
logger.error(f"Failed to create database engine: {e}") | |
raise | |
# Initialize database engine | |
engine = get_engine() | |
def get_access_token(): | |
""" | |
Get access token for the external API with better error handling | |
""" | |
global access_token | |
# If we already have a token, return it | |
if access_token: | |
return access_token | |
try: | |
login_url = str(os.getenv("login_url")) | |
login_data = { | |
"email": str(os.getenv("email")), | |
"password": str(os.getenv("password")) | |
} | |
login_headers = { | |
'accept': 'application/json', | |
'Content-Type': 'application/json' | |
} | |
# Add timeout to prevent hanging | |
login_response = requests.post(login_url, headers=login_headers, json=login_data, timeout=None) | |
if login_response.status_code == 200: | |
login_result = login_response.json() | |
access_token = login_result.get('data', {}).get('tokens', {}).get('accessToken') | |
if access_token: | |
logger.info("Successfully obtained access token") | |
return access_token | |
else: | |
logger.error("Login successful but no access token found in response") | |
return None | |
else: | |
logger.error(f"Login failed with status {login_response.status_code}: {login_response.text}") | |
return None | |
except requests.exceptions.Timeout: | |
logger.error("Login request timed out") | |
return None | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Network error during login: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Unexpected error getting access token: {e}") | |
return None | |
def generate_smart_hiring_collateral(job_description_text: str) -> tuple[str, str]: | |
""" | |
Generate collateral using the smart-hiring/generate endpoint | |
Returns a tuple of (collateral, job_id) | |
""" | |
try: | |
url = str(os.getenv("smart_hiring_url")) | |
# Generate a unique job ID using UUID | |
job_id = str(uuid.uuid4()) | |
# Prepare headers with authentication | |
headers = { | |
'accept': 'application/json', | |
'Authorization': f'Bearer {get_access_token()}' | |
} | |
# Prepare payload | |
payload = { | |
'job_id': job_id, | |
'job_description_text': job_description_text | |
} | |
# Make the API request | |
response = requests.post(url, headers=headers, data=payload, timeout=None) | |
if response.status_code == 200: | |
logger.info("Smart hiring collateral generated successfully") | |
# Parse the response to extract smart_hiring_criteria | |
try: | |
response_data = response.json() | |
if response_data.get('success') and 'data' in response_data: | |
smart_hiring_criteria = response_data['data'].get('smart_hiring_criteria', '') | |
if smart_hiring_criteria: | |
logger.info("Successfully extracted smart hiring criteria") | |
return smart_hiring_criteria, job_id | |
else: | |
logger.warning("No smart_hiring_criteria found in response") | |
return "", job_id | |
else: | |
logger.warning("Invalid response format from smart hiring API") | |
return "", job_id | |
except json.JSONDecodeError as e: | |
logger.error(f"Failed to parse smart hiring response as JSON: {e}") | |
return "", job_id | |
elif response.status_code == 401: | |
logger.warning("Authentication failed for smart hiring, getting fresh token...") | |
global access_token | |
access_token = None # Reset the token | |
new_token = get_access_token() | |
if new_token: | |
headers['Authorization'] = f'Bearer {new_token}' | |
response = requests.post(url, headers=headers, data=payload, timeout=None) | |
if response.status_code == 200: | |
logger.info("Smart hiring collateral generated successfully with fresh token") | |
# Parse the response to extract smart_hiring_criteria | |
try: | |
response_data = response.json() | |
if response_data.get('success') and 'data' in response_data: | |
smart_hiring_criteria = response_data['data'].get('smart_hiring_criteria', '') | |
if smart_hiring_criteria: | |
logger.info("Successfully extracted smart hiring criteria with fresh token") | |
return smart_hiring_criteria, job_id | |
else: | |
logger.warning("No smart_hiring_criteria found in response with fresh token") | |
return "", job_id | |
else: | |
logger.warning("Invalid response format from smart hiring API with fresh token") | |
return "", job_id | |
except json.JSONDecodeError as e: | |
logger.error(f"Failed to parse smart hiring response as JSON with fresh token: {e}") | |
return "", job_id | |
else: | |
logger.error(f"Smart hiring API call failed with status {response.status_code}") | |
return "", job_id | |
else: | |
logger.error("Could not obtain fresh token for smart hiring") | |
return "", job_id | |
else: | |
logger.error(f"Smart hiring API call failed with status {response.status_code}: {response.text}") | |
return "", job_id | |
except requests.exceptions.Timeout: | |
logger.error(f"Smart hiring API request timed out after {EXTERNAL_API_TIMEOUT} seconds") | |
return "", "" | |
except Exception as e: | |
logger.error(f"Exception occurred in smart hiring generation: {str(e)}") | |
return "", "" | |
class structure(BaseModel): | |
name: str = Field(description="Name of the candidate") | |
location: str = Field(description="The location of the candidate. Extract city and state if possible.") | |
skills: List[str] = Field(description="List of individual skills of the candidate") | |
ideal_jobs: str = Field(description="List of ideal jobs for the candidate based on past experience.") | |
email: str = Field(description="The email of the candidate") | |
yoe: str = Field(description="Years of experience of the candidate.") | |
experience: str = Field(description="A brief summary of the candidate's past experience.") | |
industry: str = Field(description="The industry the candidate has experience in.(Tech,Legal,Finance/Accounting,Healthcare,Industrial,Logistics,Telecom,Admin,Other)") | |
class JobAnalysis(BaseModel): | |
job_title: str | |
company_name: str | |
analysis: dict | |
def extract_text_from_pdf(pdf_file_path: str) -> str: | |
""" | |
Extract text from PDF file using multiple methods for better accuracy | |
""" | |
text = "" | |
# Method 1: Try pdfplumber (better for complex layouts) | |
try: | |
with pdfplumber.open(pdf_file_path) as pdf: | |
for page in pdf.pages: | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text + "\n" | |
if text.strip(): | |
logger.info(f"Successfully extracted text using pdfplumber: {len(text)} characters") | |
return text.strip() | |
except Exception as e: | |
logger.warning(f"pdfplumber failed: {e}") | |
# Method 2: Try PyPDF2 (fallback) | |
try: | |
with open(pdf_file_path, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
for page in pdf_reader.pages: | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text + "\n" | |
if text.strip(): | |
logger.info(f"Successfully extracted text using PyPDF2: {len(text)} characters") | |
return text.strip() | |
except Exception as e: | |
logger.error(f"PyPDF2 failed: {e}") | |
# If both methods fail, return empty string | |
logger.error("Failed to extract text from PDF") | |
return "" | |
def extract_resume_info(resume_text: str) -> structure: | |
""" | |
Extract structured information from resume using LLM | |
""" | |
prompt = ChatPromptTemplate.from_template(""" | |
You are an expert resume parser. Extract the following information from the resume text provided and return it in a structured JSON format. | |
Resume Text: | |
{resume_text} | |
Please extract and structure the information according to the following schema: | |
- name: Full name of the candidate | |
- location: City and state if available, otherwise general location | |
- skills: List of technical skills, tools, technologies, programming languages, etc. | |
- ideal_jobs: Based on their experience, what types of jobs would be ideal for this candidate | |
- email: Email address of the candidate (if found in resume) | |
- yoe: Years of experience (extract from work history) | |
- experience: Brief summary of their work experience and background | |
- industry: Categorize into one of these industries: Tech, Legal, Finance/Accounting, Healthcare, Industrial, Logistics, Telecom, Admin, Other | |
Return ONLY a valid JSON object with these fields. Do not include any other text or explanations. | |
""") | |
try: | |
str_llm = llm.with_structured_output(structure) | |
chain = prompt | str_llm | |
response = chain.invoke({"resume_text": resume_text}) | |
validated_data = { | |
'name': response.name, | |
'location': response.location, | |
'email': response.email, | |
'skills': response.skills, | |
'ideal_jobs': response.ideal_jobs, | |
'yoe': response.yoe, | |
'experience': response.experience, | |
'industry': response.industry | |
} | |
logger.info(f"Successfully extracted resume info for: {validated_data['name']}") | |
return validated_data | |
except Exception as e: | |
logger.error(f"Failed to extract resume info: {e}") | |
return { | |
'name': "Unknown", | |
'location': "Unknown", | |
'email': "", | |
'skills': [], | |
'ideal_jobs': "Software Engineer", | |
'yoe': "0", | |
'experience': "No experience listed", | |
'industry': "Tech" | |
} | |
def filter_jobs_by_industry(jobs_df: pd.DataFrame, target_industry: str) -> pd.DataFrame: | |
""" | |
Filter jobs by industry | |
""" | |
# Map the extracted industry to database industry values | |
industry_mapping = { | |
'Tech': ['technology', 'VC Tech'], | |
'Legal': ['Legal'], | |
'Finance/Accounting': ['finance/Accounting'], | |
'Healthcare': ['healthcare'], | |
'Industrial': ['industrial'], | |
'Logistics': ['logistics'], | |
'Telecom': ['telecom'], | |
'Admin': ['admin'], | |
'Other': ['Other'] | |
} | |
target_industries = industry_mapping.get(target_industry, ['Tech']) | |
# Filter jobs by industry (using database column name 'industry') | |
filtered_jobs = jobs_df[jobs_df['industry'].isin(target_industries)] | |
logger.info(f"Filtered {len(filtered_jobs)} jobs for industry: {target_industry}") | |
return filtered_jobs | |
def filter_jobs_by_location(jobs_df: pd.DataFrame, candidate_location: str) -> pd.DataFrame: | |
""" | |
Filter jobs by location matching the candidate's location | |
""" | |
if not candidate_location or candidate_location.lower() in ['unknown', 'n/a', '']: | |
logger.info(f"No location info provided, returning all {len(jobs_df)} jobs") | |
return jobs_df # Return all jobs if no location info | |
# Clean and normalize candidate location | |
candidate_location = candidate_location.lower().strip() | |
logger.info(f"Filtering jobs for candidate location: {candidate_location}") | |
# Extract state abbreviations and full names | |
state_mapping = { | |
'alabama': 'al', 'alaska': 'ak', 'arizona': 'az', 'arkansas': 'ar', 'california': 'ca', | |
'colorado': 'co', 'connecticut': 'ct', 'delaware': 'de', 'district of columbia': 'dc', 'florida': 'fl', 'georgia': 'ga', | |
'hawaii': 'hi', 'idaho': 'id', 'illinois': 'il', 'indiana': 'in', 'iowa': 'ia', | |
'kansas': 'ks', 'kentucky': 'ky', 'louisiana': 'la', 'maine': 'me', 'maryland': 'md', | |
'massachusetts': 'ma', 'michigan': 'mi', 'minnesota': 'mn', 'mississippi': 'ms', 'missouri': 'mo', | |
'montana': 'mt', 'nebraska': 'ne', 'nevada': 'nv', 'new hampshire': 'nh', 'new jersey': 'nj', | |
'new mexico': 'nm', 'new york': 'ny', 'north carolina': 'nc', 'north dakota': 'nd', 'ohio': 'oh', | |
'oklahoma': 'ok', 'oregon': 'or', 'pennsylvania': 'pa', 'rhode island': 'ri', 'south carolina': 'sc', | |
'south dakota': 'sd', 'tennessee': 'tn', 'texas': 'tx', 'utah': 'ut', 'vermont': 'vt', | |
'virginia': 'va', 'washington': 'wa', 'west virginia': 'wv', 'wisconsin': 'wi', 'wyoming': 'wy' | |
} | |
# Create location patterns to match | |
location_patterns = [] | |
# Add the original location | |
location_patterns.append(candidate_location) | |
# Add state variations | |
for state_name, state_abbr in state_mapping.items(): | |
if state_name in candidate_location or state_abbr in candidate_location: | |
location_patterns.extend([state_name, state_abbr]) | |
# Add common city variations (extract city name) | |
city_match = re.search(r'^([^,]+)', candidate_location) | |
if city_match: | |
city_name = city_match.group(1).strip() | |
location_patterns.append(city_name) | |
# Add remote/anywhere patterns if location is remote | |
if 'remote' in candidate_location or 'anywhere' in candidate_location: | |
location_patterns.extend(['remote', 'anywhere', 'work from home', 'wfh']) | |
logger.info(f"Location patterns to match: {location_patterns}") | |
# Filter jobs by location | |
matching_jobs = [] | |
for _, job_row in jobs_df.iterrows(): | |
job_location = str(job_row.get('job_location', '')).lower() | |
# Check if any location pattern matches | |
location_matches = any(pattern in job_location for pattern in location_patterns) | |
# Also check for remote jobs if candidate location includes remote | |
if 'remote' in candidate_location and any(remote_term in job_location for remote_term in ['remote', 'anywhere', 'work from home', 'wfh']): | |
location_matches = True | |
# Check for exact city/state matches | |
if candidate_location in job_location or job_location in candidate_location: | |
location_matches = True | |
if location_matches: | |
matching_jobs.append(job_row) | |
result_df = pd.DataFrame(matching_jobs) if matching_jobs else jobs_df | |
logger.info(f"Found {len(matching_jobs)} jobs matching location out of {len(jobs_df)} total jobs") | |
return result_df | |
def extract_experience_requirement(requirements_text: str) -> dict: | |
""" | |
Extract experience requirements from job requirements text | |
Returns a dictionary with min_years, max_years, and level | |
""" | |
if not requirements_text or pd.isna(requirements_text): | |
return {'min_years': 0, 'max_years': 999, 'level': 'any'} | |
requirements_text = str(requirements_text).lower() | |
# Common experience patterns | |
experience_patterns = [ | |
# Specific year ranges | |
r'(\d+)[\-\+]\s*(\d+)\s*years?\s*experience', | |
r'(\d+)\s*to\s*(\d+)\s*years?\s*experience', | |
r'(\d+)\s*-\s*(\d+)\s*years?\s*experience', | |
# Minimum years | |
r'(\d+)\+?\s*years?\s*experience', | |
r'minimum\s*(\d+)\s*years?\s*experience', | |
r'at\s*least\s*(\d+)\s*years?\s*experience', | |
# Level-based patterns | |
r'(entry\s*level|junior|associate)', | |
r'(mid\s*level|intermediate|mid\s*senior)', | |
r'(senior|lead|principal|staff)', | |
r'(executive|director|vp|chief|c\s*level)', | |
# Specific year mentions | |
r'(\d+)\s*years?\s*in\s*the\s*field', | |
r'(\d+)\s*years?\s*of\s*professional\s*experience', | |
r'(\d+)\s*years?\s*of\s*relevant\s*experience' | |
] | |
min_years = 0 | |
max_years = 999 | |
level = 'any' | |
# Check for specific year ranges | |
for pattern in experience_patterns[:3]: # First 3 patterns are for ranges | |
matches = re.findall(pattern, requirements_text) | |
if matches: | |
try: | |
min_years = int(matches[0][0]) | |
max_years = int(matches[0][1]) | |
break | |
except (ValueError, IndexError): | |
continue | |
# Check for minimum years if no range found | |
if min_years == 0: | |
for pattern in experience_patterns[3:6]: # Minimum year patterns | |
matches = re.findall(pattern, requirements_text) | |
if matches: | |
try: | |
min_years = int(matches[0]) | |
break | |
except (ValueError, IndexError): | |
continue | |
# Check for level-based requirements | |
for pattern in experience_patterns[6:10]: # Level patterns | |
matches = re.findall(pattern, requirements_text) | |
if matches: | |
level_match = matches[0].lower() | |
if 'entry' in level_match or 'junior' in level_match or 'associate' in level_match: | |
level = 'entry' | |
if min_years == 0: | |
min_years = 0 | |
max_years = 2 | |
elif 'mid' in level_match or 'intermediate' in level_match: | |
level = 'mid' | |
if min_years == 0: | |
min_years = 2 | |
max_years = 5 | |
elif 'senior' in level_match or 'lead' in level_match or 'principal' in level_match or 'staff' in level_match: | |
level = 'senior' | |
if min_years == 0: | |
min_years = 5 | |
max_years = 10 | |
elif 'executive' in level_match or 'director' in level_match or 'vp' in level_match or 'chief' in level_match: | |
level = 'executive' | |
if min_years == 0: | |
min_years = 10 | |
max_years = 999 | |
break | |
# Check for specific year mentions if still no match | |
if min_years == 0: | |
for pattern in experience_patterns[10:]: # Specific year mention patterns | |
matches = re.findall(pattern, requirements_text) | |
if matches: | |
try: | |
min_years = int(matches[0]) | |
max_years = min_years + 2 # Add buffer | |
break | |
except (ValueError, IndexError): | |
continue | |
return { | |
'min_years': min_years, | |
'max_years': max_years, | |
'level': level | |
} | |
def filter_jobs_by_experience(jobs_df: pd.DataFrame, candidate_yoe: str) -> pd.DataFrame: | |
""" | |
Filter jobs by experience level matching the candidate's years of experience | |
""" | |
if not candidate_yoe or candidate_yoe.lower() in ['unknown', 'n/a', '']: | |
logger.info(f"No experience info provided, returning all {len(jobs_df)} jobs") | |
return jobs_df | |
# Extract numeric years from candidate experience | |
try: | |
# Handle various formats like "5 years", "5+ years", "5-7 years", etc. | |
yoe_match = re.search(r'(\d+(?:\.\d+)?)', str(candidate_yoe)) | |
if yoe_match: | |
candidate_years = float(yoe_match.group(1)) | |
else: | |
logger.warning(f"Could not extract years from: {candidate_yoe}") | |
return jobs_df | |
except (ValueError, TypeError): | |
logger.error(f"Invalid experience format: {candidate_yoe}") | |
return jobs_df | |
logger.info(f"Filtering jobs for candidate with {candidate_years} years of experience") | |
# Filter jobs by experience requirements | |
matching_jobs = [] | |
for _, job_row in jobs_df.iterrows(): | |
requirements_text = str(job_row.get('requirements', '')) | |
experience_req = extract_experience_requirement(requirements_text) | |
# Check if candidate's experience matches the job requirements | |
if (candidate_years >= experience_req['min_years'] and | |
candidate_years <= experience_req['max_years']): | |
matching_jobs.append(job_row) | |
result_df = pd.DataFrame(matching_jobs) if matching_jobs else jobs_df | |
logger.info(f"Found {len(matching_jobs)} jobs matching experience out of {len(jobs_df)} total jobs") | |
return result_df | |
def filter_jobs_by_priority(jobs_df: pd.DataFrame) -> pd.DataFrame: | |
""" | |
Filter jobs to only include high priority jobs | |
""" | |
if jobs_df.empty: | |
logger.info("No jobs to filter by priority") | |
return jobs_df | |
# Filter jobs by priority - only include high priority jobs | |
priority_filtered_jobs = jobs_df[jobs_df['priority'].str.lower() == 'high'] | |
logger.info(f"Found {len(priority_filtered_jobs)} high priority jobs out of {len(jobs_df)} total jobs") | |
return priority_filtered_jobs | |
def create_job_description(job_row: pd.Series) -> str: | |
""" | |
Create a comprehensive job description from job data | |
""" | |
description_parts = [] | |
if pd.notna(job_row.get('company_blurb')): | |
description_parts.append(f"Company: {job_row['company_blurb']}") | |
if pd.notna(job_row.get('company_culture')): | |
description_parts.append(f"Company Culture: {job_row['company_culture']}") | |
if pd.notna(job_row.get('description')): | |
description_parts.append(f"Description: {job_row['description']}") | |
if pd.notna(job_row.get('requirements')): | |
description_parts.append(f"Requirements: {job_row['requirements']}") | |
if pd.notna(job_row.get('role_responsibilities')): | |
description_parts.append(f"Role Responsibilities: {job_row['role_responsibilities']}") | |
if pd.notna(job_row.get('job_location')): | |
description_parts.append(f"Location: {job_row['job_location']}") | |
return "\n\n".join(description_parts) | |
def create_jd_smart_hiring(job_row: pd.Series) -> str: | |
""" | |
Create a smart hiring job description from job data | |
""" | |
description_parts = [] | |
if pd.notna(job_row.get('description')): | |
description_parts.append(f"Description: {job_row['description']}") | |
if pd.notna(job_row.get('requirements')): | |
description_parts.append(f"Requirements: {job_row['requirements']}") | |
return "\n\n".join(description_parts) | |
def clean_analysis_result(analysis_result: dict) -> dict: | |
""" | |
Clean up the analysis result to only include final_score and summary | |
""" | |
if not isinstance(analysis_result, dict): | |
return analysis_result | |
# Remove user_context if present | |
if 'user_context' in analysis_result: | |
del analysis_result['user_context'] | |
# Clean up final_response if present | |
if 'final_response' in analysis_result: | |
try: | |
# Handle both string and dict formats | |
if isinstance(analysis_result['final_response'], str): | |
final_response = json.loads(analysis_result['final_response']) | |
else: | |
final_response = analysis_result['final_response'] | |
# Extract and format the evaluation data | |
if 'evaluation' in final_response and len(final_response['evaluation']) > 0: | |
evaluation = final_response['evaluation'][0] | |
# Create a minimal structure with only final_score and summary | |
cleaned_response = { | |
'final_score': evaluation.get('final_score', 0), | |
'summary': {} | |
} | |
# Extract summary information | |
if 'summary' in evaluation and len(evaluation['summary']) > 0: | |
summary = evaluation['summary'][0] | |
cleaned_response['summary'] = { | |
'strengths': summary.get('strengths', []), | |
'weaknesses': summary.get('weaknesses', []), | |
'opportunities': summary.get('opportunities', []), | |
'recommendations': summary.get('recommendations', []) | |
} | |
analysis_result['final_response'] = cleaned_response | |
except (json.JSONDecodeError, KeyError, IndexError) as e: | |
logger.error(f"Error cleaning analysis result: {e}") | |
# Keep original if cleaning fails | |
pass | |
return analysis_result | |
def sort_jobs_by_score(job_analyses: list) -> list: | |
""" | |
Sort jobs by final_score in descending order (highest scores first) | |
""" | |
def extract_score(job_analysis): | |
try: | |
analysis = job_analysis.get('analysis', {}) | |
if 'final_response' in analysis and isinstance(analysis['final_response'], dict): | |
return analysis['final_response'].get('final_score', 0) | |
return 0 | |
except: | |
return 0 | |
return sorted(job_analyses, key=extract_score, reverse=True) | |
async def analyze_job_fit_with_retry(job_description: str, resume_file_path: str, job_row: pd.Series = None, max_retries: int = 3) -> dict: | |
""" | |
Analyze job-candidate fit with retry logic for resilience | |
""" | |
for attempt in range(max_retries): | |
try: | |
result = analyze_job_fit(job_description, resume_file_path, job_row) | |
if "error" not in result: | |
return result | |
# If authentication error and not last attempt, retry | |
if "Authentication failed" in result.get("error", "") and attempt < max_retries - 1: | |
logger.warning(f"Authentication failed, retrying... (attempt {attempt + 1}/{max_retries})") | |
global access_token | |
access_token = None # Reset token to force refresh | |
await asyncio.sleep(2 ** attempt) # Exponential backoff | |
continue | |
# If timeout error and not last attempt, retry with longer timeout | |
if "timed out" in result.get("error", "").lower() and attempt < max_retries - 1: | |
logger.warning(f"Request timed out, retrying with longer timeout... (attempt {attempt + 1}/{max_retries})") | |
await asyncio.sleep(2 ** attempt) # Exponential backoff | |
continue | |
return result | |
except Exception as e: | |
logger.error(f"Attempt {attempt + 1}/{max_retries} failed: {str(e)}") | |
if attempt == max_retries - 1: | |
return {"error": f"Failed after {max_retries} attempts: {str(e)}"} | |
await asyncio.sleep(2 ** attempt) | |
def analyze_job_fit(job_description: str, resume_file_path: str, job_row: pd.Series = None) -> dict: | |
""" | |
Analyze job-candidate fit using the external API | |
""" | |
url = str(os.getenv("analyze_url")) | |
# Check if resume file exists | |
if not os.path.exists(resume_file_path): | |
logger.error(f"Resume file not found: {resume_file_path}") | |
return {"error": f"Resume file not found: {resume_file_path}"} | |
# Prepare headers with authentication | |
headers = { | |
'accept': 'application/json', | |
'Authorization': f'Bearer {get_access_token()}' | |
} | |
# Prepare form data | |
files = { | |
'resume': (os.path.basename(resume_file_path), open(resume_file_path, 'rb'), 'application/pdf') | |
} | |
data = { | |
'jd_text': job_description | |
} | |
# Generate collateral if job_row is provided | |
if job_row is not None: | |
try: | |
job_description_text = create_jd_smart_hiring(job_row) | |
if job_description_text: | |
collateral, job_id = generate_smart_hiring_collateral(job_description_text) | |
if collateral: | |
data['collateral'] = collateral | |
data['job_id'] = job_id | |
logger.info(f"Added collateral and job_id ({job_id}) to job fit analysis request") | |
elif job_id: | |
# Even if collateral is empty, we can still use the job_id | |
data['job_id'] = job_id | |
logger.info(f"Added job_id ({job_id}) to job fit analysis request (no collateral)") | |
except Exception as e: | |
logger.warning(f"Failed to generate collateral: {e}") | |
# Continue without collateral if generation fails | |
try: | |
# Make the API request with configured timeout | |
response = requests.post(url, headers=headers, files=files, data=data, timeout=None) | |
# If we get an authentication error, try to get a fresh token and retry once | |
if response.status_code == 401: | |
logger.warning("Authentication failed, getting fresh token...") | |
global access_token | |
access_token = None # Reset the token | |
new_token = get_access_token() | |
if new_token: | |
headers['Authorization'] = f'Bearer {new_token}' | |
# Close the previous file and reopen | |
files['resume'][1].close() | |
files['resume'] = (os.path.basename(resume_file_path), open(resume_file_path, 'rb'), 'application/pdf') | |
response = requests.post(url, headers=headers, files=files, data=data, timeout=None) | |
else: | |
# If we can't get a fresh token, return error | |
return {"error": "Authentication failed and could not obtain fresh token"} | |
if response.status_code == 200: | |
logger.info("Job fit analysis completed successfully") | |
return response.json() | |
elif response.status_code == 401: | |
# If we still get 401 after fresh token, return error | |
return {"error": "Authentication failed even with fresh token"} | |
else: | |
logger.error(f"API call failed with status {response.status_code}") | |
return {"error": f"API call failed with status {response.status_code}", "details": response.text} | |
except requests.exceptions.Timeout: | |
logger.error(f"API request timed out after {EXTERNAL_API_TIMEOUT} seconds") | |
return {"error": f"API request timed out after {EXTERNAL_API_TIMEOUT} seconds"} | |
except Exception as e: | |
logger.error(f"Exception occurred: {str(e)}") | |
return {"error": f"Exception occurred: {str(e)}"} | |
finally: | |
# Ensure the file is closed | |
if 'resume' in files: | |
try: | |
files['resume'][1].close() | |
except: | |
pass | |
async def process_resume_and_recommend_jobs( | |
resume: UploadFile = File(...), | |
resume_text: str = Form(""), | |
api_key: str = Depends(verify_api_key) | |
): | |
""" | |
Process resume, extract information, filter jobs by industry, and analyze fit | |
""" | |
request_start_time = time.time() | |
try: | |
logger.info(f"Processing resume: {resume.filename}") | |
# Save uploaded file temporarily | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
shutil.copyfileobj(resume.file, tmp_file) | |
tmp_file_path = tmp_file.name | |
try: | |
# Extract text from PDF if no resume_text provided | |
if not resume_text: | |
resume_text = extract_text_from_pdf(tmp_file_path) | |
if not resume_text: | |
logger.error("Could not extract text from PDF file") | |
return JSONResponse( | |
status_code=400, | |
content={"error": "Could not extract text from PDF file"} | |
) | |
# Extract resume information using LLM | |
resume_info = extract_resume_info(resume_text) | |
# Load jobs data from PostgreSQL database | |
try: | |
jobs_df = pd.read_sql_table("jobs", con=engine) | |
candidates_df = pd.read_sql_table("candidates", con=engine) | |
submissions_df = pd.read_sql_table("candidate_submissions", con=engine) | |
logger.info(f"Loaded {len(jobs_df)} jobs, {len(candidates_df)} candidates, {len(submissions_df)} submissions") | |
except Exception as db_error: | |
logger.error(f"Database error: {db_error}") | |
return JSONResponse( | |
status_code=500, | |
content={"error": "Database connection error"} | |
) | |
# Filter jobs by industry | |
filtered_jobs = filter_jobs_by_industry(jobs_df, resume_info['industry']) | |
if filtered_jobs.empty: | |
logger.warning(f"No jobs found for industry: {resume_info['industry']}") | |
return JSONResponse( | |
status_code=404, | |
content={"message": f"No jobs found for industry: {resume_info['industry']}"} | |
) | |
# Filter jobs by location | |
location_filtered_jobs = filter_jobs_by_location(filtered_jobs, resume_info['location']) | |
# Filter jobs by experience level | |
experience_filtered_jobs = filter_jobs_by_experience(location_filtered_jobs, resume_info['yoe']) | |
# Filter jobs by priority | |
priority_filtered_jobs = filter_jobs_by_priority(experience_filtered_jobs) | |
# Use priority filtered jobs if available, otherwise fall back to experience filtered jobs, then location filtered jobs | |
if not priority_filtered_jobs.empty: | |
jobs_to_analyze = priority_filtered_jobs | |
elif not experience_filtered_jobs.empty: | |
jobs_to_analyze = experience_filtered_jobs | |
else: | |
jobs_to_analyze = location_filtered_jobs | |
# Create filtered_submission_df with job_ids from jobs_to_analyze | |
job_ids_to_analyze = jobs_to_analyze['id'].tolist() | |
filtered_submission_df = submissions_df[submissions_df['jobId'].isin(job_ids_to_analyze)] | |
# Check if candidate email exists in candidates_df | |
candidate_id = None | |
if resume_info.get('email'): | |
candidate_match = candidates_df[candidates_df['email'] == resume_info['email']] | |
if not candidate_match.empty: | |
candidate_id = candidate_match.iloc[0]['id'] | |
logger.info(f"Found existing candidate with ID: {candidate_id}") | |
# Analyze job fit for each filtered job | |
job_analyses = [] | |
# Use configured number of jobs to analyze | |
for _, job_row in jobs_to_analyze.head(MAX_JOBS_TO_ANALYZE).iterrows(): | |
job_id = job_row.get('id') | |
# Check if we have an existing submission for this candidate and job | |
existing_submission = None | |
if candidate_id and job_id: | |
submission_match = filtered_submission_df[ | |
(filtered_submission_df['candidate_id'] == candidate_id) & | |
(filtered_submission_df['jobId'] == job_id) | |
] | |
if not submission_match.empty: | |
existing_submission = submission_match.iloc[0] | |
logger.info(f"Found existing submission for job_id: {job_id}, candidate_id: {candidate_id}") | |
if existing_submission is not None: | |
# Use existing fit score from submission | |
fit_score = existing_submission.get('fit_score', 0) | |
existing_analysis = { | |
'final_response': { | |
'final_score': fit_score, | |
'summary': { | |
'strengths': [], | |
'weaknesses': [], | |
'opportunities': [], | |
'recommendations': [] | |
} | |
}, | |
'source': 'existing_submission' | |
} | |
analysis_result = existing_analysis | |
else: | |
# Call API for new analysis with retry logic | |
job_description = create_job_description(job_row) | |
analysis_result = await analyze_job_fit_with_retry(job_description, tmp_file_path, job_row) | |
analysis_result['source'] = 'api_call' | |
# Clean up the analysis result | |
cleaned_analysis = clean_analysis_result(analysis_result) | |
job_analysis = JobAnalysis( | |
job_title=job_row.get('job_title', 'Unknown'), | |
company_name=job_row.get('company_name', 'Unknown'), | |
analysis=cleaned_analysis | |
) | |
job_analyses.append(job_analysis.dict()) | |
# Sort jobs by final_score in descending order (highest scores first) | |
job_analyses = sort_jobs_by_score(job_analyses) | |
# Count existing submissions vs API calls | |
existing_submissions_count = sum(1 for analysis in job_analyses if analysis.get('analysis', {}).get('source') == 'existing_submission') | |
api_calls_count = sum(1 for analysis in job_analyses if analysis.get('analysis', {}).get('source') == 'api_call') | |
# Clean up temporary file | |
os.unlink(tmp_file_path) | |
# Calculate processing time | |
processing_time = time.time() - request_start_time | |
logger.info(f"Request completed in {processing_time:.2f} seconds") | |
return { | |
"resume_info": resume_info, | |
"industry": resume_info['industry'], | |
"location": resume_info['location'], | |
"experience_years": resume_info['yoe'], | |
"jobs_analyzed": len(job_analyses), | |
"location_filtered": not location_filtered_jobs.empty, | |
"experience_filtered": not experience_filtered_jobs.empty, | |
"priority_filtered": not priority_filtered_jobs.empty, | |
"existing_submissions_used": existing_submissions_count, | |
"api_calls_made": api_calls_count, | |
"candidate_found": candidate_id is not None, | |
"processing_time_seconds": round(processing_time, 2), | |
"job_analyses": job_analyses | |
} | |
except Exception as e: | |
# Clean up temporary file in case of error | |
if os.path.exists(tmp_file_path): | |
os.unlink(tmp_file_path) | |
raise e | |
except Exception as e: | |
logger.error(f"Processing failed: {str(e)}", exc_info=True) | |
return JSONResponse( | |
status_code=500, | |
content={"error": f"Processing failed: {str(e)}"} | |
) | |
async def health_check(api_key: str = Depends(verify_api_key)): | |
""" | |
Health check endpoint with database connectivity check | |
""" | |
health_status = { | |
"status": "healthy", | |
"message": "Job Recommendation API is running", | |
"timestamp": time.time() | |
} | |
# Check database connectivity | |
try: | |
with engine.connect() as conn: | |
result = conn.execute(text("SELECT 1")) | |
health_status["database"] = "connected" | |
except Exception as e: | |
logger.error(f"Database health check failed: {e}") | |
health_status["database"] = "disconnected" | |
health_status["status"] = "degraded" | |
return health_status | |
async def root(): | |
""" | |
Root endpoint | |
""" | |
return { | |
"message": "Job Recommendation API", | |
"version": "1.0.0", | |
"docs": "/docs", | |
"health": "/health" | |
} | |
if __name__ == "__main__": | |
import uvicorn | |
port = int(os.getenv("PORT", 8080)) | |
logger.info(f"Starting server on port {port}") | |
uvicorn.run(app, host="0.0.0.0", port=port) |