reccomendation-engine / reccomendation.py
ak0601's picture
Update reccomendation.py
299e29c verified
import pandas as pd
import requests
from pydantic import BaseModel, Field
from typing import List, Tuple, Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import os
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends, Header, Request
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
import json
import tempfile
import shutil
import PyPDF2
from dotenv import load_dotenv
import pdfplumber
import re
from db import *
import time
import asyncio
from contextlib import asynccontextmanager
import logging
from sqlalchemy.pool import NullPool
from cloud_config import *
import uuid
# Load environment variables
load_dotenv()
# Configure logging for Cloud Run
logging.basicConfig(
level=getattr(logging, LOG_LEVEL),
format=LOG_FORMAT
)
logger = logging.getLogger(__name__)
# Global variable to store access token
access_token = None
# Startup/shutdown events
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("Starting up Job Recommendation API...")
# You can initialize connection pools here if needed
yield
# Shutdown
logger.info("Shutting down Job Recommendation API...")
# Close any open connections here
# Initialize FastAPI app with lifespan
app = FastAPI(
title="Job Recommendation API",
description="API for processing resumes and recommending jobs",
lifespan=lifespan
)
# Add CORS middleware for cloud deployment
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure based on your needs
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add request ID middleware for better tracing
@app.middleware("http")
async def add_request_id(request: Request, call_next):
request_id = f"{time.time()}-{request.client.host}"
request.state.request_id = request_id
# Log the request
logger.info(f"Request ID: {request_id} - {request.method} {request.url.path}")
try:
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
except Exception as e:
logger.error(f"Request ID: {request_id} - Error: {str(e)}")
raise
# Security configuration
API_KEY = os.getenv("API_KEY")
security = HTTPBearer()
def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""
Verify the API key from the Authorization header
"""
if not API_KEY:
logger.error("API key not configured")
raise HTTPException(
status_code=500,
detail="API key not configured",
)
if credentials.credentials != API_KEY:
logger.warning("Invalid API key attempt")
raise HTTPException(
status_code=401,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)
return credentials.credentials
# Initialize OpenAI client with error handling
try:
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0,
api_key=os.getenv("OPENAI_API_KEY")
)
logger.info("OpenAI client initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize OpenAI client: {e}")
raise
# Initialize database engine with connection pooling suitable for Cloud Run
def get_engine():
"""
Get database engine with NullPool for Cloud Run
"""
try:
conn_string = f"postgresql://{DB_PARAMS['user']}:{DB_PARAMS['password']}@{DB_PARAMS['host']}:{DB_PARAMS['port']}/{DB_PARAMS['dbname']}"
# Use NullPool for Cloud Run to avoid connection issues
engine = create_engine(conn_string, poolclass=NullPool, pool_pre_ping=True)
logger.info("Database engine created successfully")
return engine
except Exception as e:
logger.error(f"Failed to create database engine: {e}")
raise
# Initialize database engine
engine = get_engine()
def get_access_token():
"""
Get access token for the external API with better error handling
"""
global access_token
# If we already have a token, return it
if access_token:
return access_token
try:
login_url = str(os.getenv("login_url"))
login_data = {
"email": str(os.getenv("email")),
"password": str(os.getenv("password"))
}
login_headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
# Add timeout to prevent hanging
login_response = requests.post(login_url, headers=login_headers, json=login_data, timeout=None)
if login_response.status_code == 200:
login_result = login_response.json()
access_token = login_result.get('data', {}).get('tokens', {}).get('accessToken')
if access_token:
logger.info("Successfully obtained access token")
return access_token
else:
logger.error("Login successful but no access token found in response")
return None
else:
logger.error(f"Login failed with status {login_response.status_code}: {login_response.text}")
return None
except requests.exceptions.Timeout:
logger.error("Login request timed out")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Network error during login: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error getting access token: {e}")
return None
def generate_smart_hiring_collateral(job_description_text: str) -> tuple[str, str]:
"""
Generate collateral using the smart-hiring/generate endpoint
Returns a tuple of (collateral, job_id)
"""
try:
url = str(os.getenv("smart_hiring_url"))
# Generate a unique job ID using UUID
job_id = str(uuid.uuid4())
# Prepare headers with authentication
headers = {
'accept': 'application/json',
'Authorization': f'Bearer {get_access_token()}'
}
# Prepare payload
payload = {
'job_id': job_id,
'job_description_text': job_description_text
}
# Make the API request
response = requests.post(url, headers=headers, data=payload, timeout=None)
if response.status_code == 200:
logger.info("Smart hiring collateral generated successfully")
# Parse the response to extract smart_hiring_criteria
try:
response_data = response.json()
if response_data.get('success') and 'data' in response_data:
smart_hiring_criteria = response_data['data'].get('smart_hiring_criteria', '')
if smart_hiring_criteria:
logger.info("Successfully extracted smart hiring criteria")
return smart_hiring_criteria, job_id
else:
logger.warning("No smart_hiring_criteria found in response")
return "", job_id
else:
logger.warning("Invalid response format from smart hiring API")
return "", job_id
except json.JSONDecodeError as e:
logger.error(f"Failed to parse smart hiring response as JSON: {e}")
return "", job_id
elif response.status_code == 401:
logger.warning("Authentication failed for smart hiring, getting fresh token...")
global access_token
access_token = None # Reset the token
new_token = get_access_token()
if new_token:
headers['Authorization'] = f'Bearer {new_token}'
response = requests.post(url, headers=headers, data=payload, timeout=None)
if response.status_code == 200:
logger.info("Smart hiring collateral generated successfully with fresh token")
# Parse the response to extract smart_hiring_criteria
try:
response_data = response.json()
if response_data.get('success') and 'data' in response_data:
smart_hiring_criteria = response_data['data'].get('smart_hiring_criteria', '')
if smart_hiring_criteria:
logger.info("Successfully extracted smart hiring criteria with fresh token")
return smart_hiring_criteria, job_id
else:
logger.warning("No smart_hiring_criteria found in response with fresh token")
return "", job_id
else:
logger.warning("Invalid response format from smart hiring API with fresh token")
return "", job_id
except json.JSONDecodeError as e:
logger.error(f"Failed to parse smart hiring response as JSON with fresh token: {e}")
return "", job_id
else:
logger.error(f"Smart hiring API call failed with status {response.status_code}")
return "", job_id
else:
logger.error("Could not obtain fresh token for smart hiring")
return "", job_id
else:
logger.error(f"Smart hiring API call failed with status {response.status_code}: {response.text}")
return "", job_id
except requests.exceptions.Timeout:
logger.error(f"Smart hiring API request timed out after {EXTERNAL_API_TIMEOUT} seconds")
return "", ""
except Exception as e:
logger.error(f"Exception occurred in smart hiring generation: {str(e)}")
return "", ""
class structure(BaseModel):
name: str = Field(description="Name of the candidate")
location: str = Field(description="The location of the candidate. Extract city and state if possible.")
skills: List[str] = Field(description="List of individual skills of the candidate")
ideal_jobs: str = Field(description="List of ideal jobs for the candidate based on past experience.")
email: str = Field(description="The email of the candidate")
yoe: str = Field(description="Years of experience of the candidate.")
experience: str = Field(description="A brief summary of the candidate's past experience.")
industry: str = Field(description="The industry the candidate has experience in.(Tech,Legal,Finance/Accounting,Healthcare,Industrial,Logistics,Telecom,Admin,Other)")
class JobAnalysis(BaseModel):
job_title: str
company_name: str
analysis: dict
def extract_text_from_pdf(pdf_file_path: str) -> str:
"""
Extract text from PDF file using multiple methods for better accuracy
"""
text = ""
# Method 1: Try pdfplumber (better for complex layouts)
try:
with pdfplumber.open(pdf_file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
if text.strip():
logger.info(f"Successfully extracted text using pdfplumber: {len(text)} characters")
return text.strip()
except Exception as e:
logger.warning(f"pdfplumber failed: {e}")
# Method 2: Try PyPDF2 (fallback)
try:
with open(pdf_file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
if text.strip():
logger.info(f"Successfully extracted text using PyPDF2: {len(text)} characters")
return text.strip()
except Exception as e:
logger.error(f"PyPDF2 failed: {e}")
# If both methods fail, return empty string
logger.error("Failed to extract text from PDF")
return ""
def extract_resume_info(resume_text: str) -> structure:
"""
Extract structured information from resume using LLM
"""
prompt = ChatPromptTemplate.from_template("""
You are an expert resume parser. Extract the following information from the resume text provided and return it in a structured JSON format.
Resume Text:
{resume_text}
Please extract and structure the information according to the following schema:
- name: Full name of the candidate
- location: City and state if available, otherwise general location
- skills: List of technical skills, tools, technologies, programming languages, etc.
- ideal_jobs: Based on their experience, what types of jobs would be ideal for this candidate
- email: Email address of the candidate (if found in resume)
- yoe: Years of experience (extract from work history)
- experience: Brief summary of their work experience and background
- industry: Categorize into one of these industries: Tech, Legal, Finance/Accounting, Healthcare, Industrial, Logistics, Telecom, Admin, Other
Return ONLY a valid JSON object with these fields. Do not include any other text or explanations.
""")
try:
str_llm = llm.with_structured_output(structure)
chain = prompt | str_llm
response = chain.invoke({"resume_text": resume_text})
validated_data = {
'name': response.name,
'location': response.location,
'email': response.email,
'skills': response.skills,
'ideal_jobs': response.ideal_jobs,
'yoe': response.yoe,
'experience': response.experience,
'industry': response.industry
}
logger.info(f"Successfully extracted resume info for: {validated_data['name']}")
return validated_data
except Exception as e:
logger.error(f"Failed to extract resume info: {e}")
return {
'name': "Unknown",
'location': "Unknown",
'email': "",
'skills': [],
'ideal_jobs': "Software Engineer",
'yoe': "0",
'experience': "No experience listed",
'industry': "Tech"
}
def filter_jobs_by_industry(jobs_df: pd.DataFrame, target_industry: str) -> pd.DataFrame:
"""
Filter jobs by industry
"""
# Map the extracted industry to database industry values
industry_mapping = {
'Tech': ['technology', 'VC Tech'],
'Legal': ['Legal'],
'Finance/Accounting': ['finance/Accounting'],
'Healthcare': ['healthcare'],
'Industrial': ['industrial'],
'Logistics': ['logistics'],
'Telecom': ['telecom'],
'Admin': ['admin'],
'Other': ['Other']
}
target_industries = industry_mapping.get(target_industry, ['Tech'])
# Filter jobs by industry (using database column name 'industry')
filtered_jobs = jobs_df[jobs_df['industry'].isin(target_industries)]
logger.info(f"Filtered {len(filtered_jobs)} jobs for industry: {target_industry}")
return filtered_jobs
def filter_jobs_by_location(jobs_df: pd.DataFrame, candidate_location: str) -> pd.DataFrame:
"""
Filter jobs by location matching the candidate's location
"""
if not candidate_location or candidate_location.lower() in ['unknown', 'n/a', '']:
logger.info(f"No location info provided, returning all {len(jobs_df)} jobs")
return jobs_df # Return all jobs if no location info
# Clean and normalize candidate location
candidate_location = candidate_location.lower().strip()
logger.info(f"Filtering jobs for candidate location: {candidate_location}")
# Extract state abbreviations and full names
state_mapping = {
'alabama': 'al', 'alaska': 'ak', 'arizona': 'az', 'arkansas': 'ar', 'california': 'ca',
'colorado': 'co', 'connecticut': 'ct', 'delaware': 'de', 'district of columbia': 'dc', 'florida': 'fl', 'georgia': 'ga',
'hawaii': 'hi', 'idaho': 'id', 'illinois': 'il', 'indiana': 'in', 'iowa': 'ia',
'kansas': 'ks', 'kentucky': 'ky', 'louisiana': 'la', 'maine': 'me', 'maryland': 'md',
'massachusetts': 'ma', 'michigan': 'mi', 'minnesota': 'mn', 'mississippi': 'ms', 'missouri': 'mo',
'montana': 'mt', 'nebraska': 'ne', 'nevada': 'nv', 'new hampshire': 'nh', 'new jersey': 'nj',
'new mexico': 'nm', 'new york': 'ny', 'north carolina': 'nc', 'north dakota': 'nd', 'ohio': 'oh',
'oklahoma': 'ok', 'oregon': 'or', 'pennsylvania': 'pa', 'rhode island': 'ri', 'south carolina': 'sc',
'south dakota': 'sd', 'tennessee': 'tn', 'texas': 'tx', 'utah': 'ut', 'vermont': 'vt',
'virginia': 'va', 'washington': 'wa', 'west virginia': 'wv', 'wisconsin': 'wi', 'wyoming': 'wy'
}
# Create location patterns to match
location_patterns = []
# Add the original location
location_patterns.append(candidate_location)
# Add state variations
for state_name, state_abbr in state_mapping.items():
if state_name in candidate_location or state_abbr in candidate_location:
location_patterns.extend([state_name, state_abbr])
# Add common city variations (extract city name)
city_match = re.search(r'^([^,]+)', candidate_location)
if city_match:
city_name = city_match.group(1).strip()
location_patterns.append(city_name)
# Add remote/anywhere patterns if location is remote
if 'remote' in candidate_location or 'anywhere' in candidate_location:
location_patterns.extend(['remote', 'anywhere', 'work from home', 'wfh'])
logger.info(f"Location patterns to match: {location_patterns}")
# Filter jobs by location
matching_jobs = []
for _, job_row in jobs_df.iterrows():
job_location = str(job_row.get('job_location', '')).lower()
# Check if any location pattern matches
location_matches = any(pattern in job_location for pattern in location_patterns)
# Also check for remote jobs if candidate location includes remote
if 'remote' in candidate_location and any(remote_term in job_location for remote_term in ['remote', 'anywhere', 'work from home', 'wfh']):
location_matches = True
# Check for exact city/state matches
if candidate_location in job_location or job_location in candidate_location:
location_matches = True
if location_matches:
matching_jobs.append(job_row)
result_df = pd.DataFrame(matching_jobs) if matching_jobs else jobs_df
logger.info(f"Found {len(matching_jobs)} jobs matching location out of {len(jobs_df)} total jobs")
return result_df
def extract_experience_requirement(requirements_text: str) -> dict:
"""
Extract experience requirements from job requirements text
Returns a dictionary with min_years, max_years, and level
"""
if not requirements_text or pd.isna(requirements_text):
return {'min_years': 0, 'max_years': 999, 'level': 'any'}
requirements_text = str(requirements_text).lower()
# Common experience patterns
experience_patterns = [
# Specific year ranges
r'(\d+)[\-\+]\s*(\d+)\s*years?\s*experience',
r'(\d+)\s*to\s*(\d+)\s*years?\s*experience',
r'(\d+)\s*-\s*(\d+)\s*years?\s*experience',
# Minimum years
r'(\d+)\+?\s*years?\s*experience',
r'minimum\s*(\d+)\s*years?\s*experience',
r'at\s*least\s*(\d+)\s*years?\s*experience',
# Level-based patterns
r'(entry\s*level|junior|associate)',
r'(mid\s*level|intermediate|mid\s*senior)',
r'(senior|lead|principal|staff)',
r'(executive|director|vp|chief|c\s*level)',
# Specific year mentions
r'(\d+)\s*years?\s*in\s*the\s*field',
r'(\d+)\s*years?\s*of\s*professional\s*experience',
r'(\d+)\s*years?\s*of\s*relevant\s*experience'
]
min_years = 0
max_years = 999
level = 'any'
# Check for specific year ranges
for pattern in experience_patterns[:3]: # First 3 patterns are for ranges
matches = re.findall(pattern, requirements_text)
if matches:
try:
min_years = int(matches[0][0])
max_years = int(matches[0][1])
break
except (ValueError, IndexError):
continue
# Check for minimum years if no range found
if min_years == 0:
for pattern in experience_patterns[3:6]: # Minimum year patterns
matches = re.findall(pattern, requirements_text)
if matches:
try:
min_years = int(matches[0])
break
except (ValueError, IndexError):
continue
# Check for level-based requirements
for pattern in experience_patterns[6:10]: # Level patterns
matches = re.findall(pattern, requirements_text)
if matches:
level_match = matches[0].lower()
if 'entry' in level_match or 'junior' in level_match or 'associate' in level_match:
level = 'entry'
if min_years == 0:
min_years = 0
max_years = 2
elif 'mid' in level_match or 'intermediate' in level_match:
level = 'mid'
if min_years == 0:
min_years = 2
max_years = 5
elif 'senior' in level_match or 'lead' in level_match or 'principal' in level_match or 'staff' in level_match:
level = 'senior'
if min_years == 0:
min_years = 5
max_years = 10
elif 'executive' in level_match or 'director' in level_match or 'vp' in level_match or 'chief' in level_match:
level = 'executive'
if min_years == 0:
min_years = 10
max_years = 999
break
# Check for specific year mentions if still no match
if min_years == 0:
for pattern in experience_patterns[10:]: # Specific year mention patterns
matches = re.findall(pattern, requirements_text)
if matches:
try:
min_years = int(matches[0])
max_years = min_years + 2 # Add buffer
break
except (ValueError, IndexError):
continue
return {
'min_years': min_years,
'max_years': max_years,
'level': level
}
def filter_jobs_by_experience(jobs_df: pd.DataFrame, candidate_yoe: str) -> pd.DataFrame:
"""
Filter jobs by experience level matching the candidate's years of experience
"""
if not candidate_yoe or candidate_yoe.lower() in ['unknown', 'n/a', '']:
logger.info(f"No experience info provided, returning all {len(jobs_df)} jobs")
return jobs_df
# Extract numeric years from candidate experience
try:
# Handle various formats like "5 years", "5+ years", "5-7 years", etc.
yoe_match = re.search(r'(\d+(?:\.\d+)?)', str(candidate_yoe))
if yoe_match:
candidate_years = float(yoe_match.group(1))
else:
logger.warning(f"Could not extract years from: {candidate_yoe}")
return jobs_df
except (ValueError, TypeError):
logger.error(f"Invalid experience format: {candidate_yoe}")
return jobs_df
logger.info(f"Filtering jobs for candidate with {candidate_years} years of experience")
# Filter jobs by experience requirements
matching_jobs = []
for _, job_row in jobs_df.iterrows():
requirements_text = str(job_row.get('requirements', ''))
experience_req = extract_experience_requirement(requirements_text)
# Check if candidate's experience matches the job requirements
if (candidate_years >= experience_req['min_years'] and
candidate_years <= experience_req['max_years']):
matching_jobs.append(job_row)
result_df = pd.DataFrame(matching_jobs) if matching_jobs else jobs_df
logger.info(f"Found {len(matching_jobs)} jobs matching experience out of {len(jobs_df)} total jobs")
return result_df
def filter_jobs_by_priority(jobs_df: pd.DataFrame) -> pd.DataFrame:
"""
Filter jobs to only include high priority jobs
"""
if jobs_df.empty:
logger.info("No jobs to filter by priority")
return jobs_df
# Filter jobs by priority - only include high priority jobs
priority_filtered_jobs = jobs_df[jobs_df['priority'].str.lower() == 'high']
logger.info(f"Found {len(priority_filtered_jobs)} high priority jobs out of {len(jobs_df)} total jobs")
return priority_filtered_jobs
def create_job_description(job_row: pd.Series) -> str:
"""
Create a comprehensive job description from job data
"""
description_parts = []
if pd.notna(job_row.get('company_blurb')):
description_parts.append(f"Company: {job_row['company_blurb']}")
if pd.notna(job_row.get('company_culture')):
description_parts.append(f"Company Culture: {job_row['company_culture']}")
if pd.notna(job_row.get('description')):
description_parts.append(f"Description: {job_row['description']}")
if pd.notna(job_row.get('requirements')):
description_parts.append(f"Requirements: {job_row['requirements']}")
if pd.notna(job_row.get('role_responsibilities')):
description_parts.append(f"Role Responsibilities: {job_row['role_responsibilities']}")
if pd.notna(job_row.get('job_location')):
description_parts.append(f"Location: {job_row['job_location']}")
return "\n\n".join(description_parts)
def create_jd_smart_hiring(job_row: pd.Series) -> str:
"""
Create a smart hiring job description from job data
"""
description_parts = []
if pd.notna(job_row.get('description')):
description_parts.append(f"Description: {job_row['description']}")
if pd.notna(job_row.get('requirements')):
description_parts.append(f"Requirements: {job_row['requirements']}")
return "\n\n".join(description_parts)
def clean_analysis_result(analysis_result: dict) -> dict:
"""
Clean up the analysis result to only include final_score and summary
"""
if not isinstance(analysis_result, dict):
return analysis_result
# Remove user_context if present
if 'user_context' in analysis_result:
del analysis_result['user_context']
# Clean up final_response if present
if 'final_response' in analysis_result:
try:
# Handle both string and dict formats
if isinstance(analysis_result['final_response'], str):
final_response = json.loads(analysis_result['final_response'])
else:
final_response = analysis_result['final_response']
# Extract and format the evaluation data
if 'evaluation' in final_response and len(final_response['evaluation']) > 0:
evaluation = final_response['evaluation'][0]
# Create a minimal structure with only final_score and summary
cleaned_response = {
'final_score': evaluation.get('final_score', 0),
'summary': {}
}
# Extract summary information
if 'summary' in evaluation and len(evaluation['summary']) > 0:
summary = evaluation['summary'][0]
cleaned_response['summary'] = {
'strengths': summary.get('strengths', []),
'weaknesses': summary.get('weaknesses', []),
'opportunities': summary.get('opportunities', []),
'recommendations': summary.get('recommendations', [])
}
analysis_result['final_response'] = cleaned_response
except (json.JSONDecodeError, KeyError, IndexError) as e:
logger.error(f"Error cleaning analysis result: {e}")
# Keep original if cleaning fails
pass
return analysis_result
def sort_jobs_by_score(job_analyses: list) -> list:
"""
Sort jobs by final_score in descending order (highest scores first)
"""
def extract_score(job_analysis):
try:
analysis = job_analysis.get('analysis', {})
if 'final_response' in analysis and isinstance(analysis['final_response'], dict):
return analysis['final_response'].get('final_score', 0)
return 0
except:
return 0
return sorted(job_analyses, key=extract_score, reverse=True)
async def analyze_job_fit_with_retry(job_description: str, resume_file_path: str, job_row: pd.Series = None, max_retries: int = 3) -> dict:
"""
Analyze job-candidate fit with retry logic for resilience
"""
for attempt in range(max_retries):
try:
result = analyze_job_fit(job_description, resume_file_path, job_row)
if "error" not in result:
return result
# If authentication error and not last attempt, retry
if "Authentication failed" in result.get("error", "") and attempt < max_retries - 1:
logger.warning(f"Authentication failed, retrying... (attempt {attempt + 1}/{max_retries})")
global access_token
access_token = None # Reset token to force refresh
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue
# If timeout error and not last attempt, retry with longer timeout
if "timed out" in result.get("error", "").lower() and attempt < max_retries - 1:
logger.warning(f"Request timed out, retrying with longer timeout... (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue
return result
except Exception as e:
logger.error(f"Attempt {attempt + 1}/{max_retries} failed: {str(e)}")
if attempt == max_retries - 1:
return {"error": f"Failed after {max_retries} attempts: {str(e)}"}
await asyncio.sleep(2 ** attempt)
def analyze_job_fit(job_description: str, resume_file_path: str, job_row: pd.Series = None) -> dict:
"""
Analyze job-candidate fit using the external API
"""
url = str(os.getenv("analyze_url"))
# Check if resume file exists
if not os.path.exists(resume_file_path):
logger.error(f"Resume file not found: {resume_file_path}")
return {"error": f"Resume file not found: {resume_file_path}"}
# Prepare headers with authentication
headers = {
'accept': 'application/json',
'Authorization': f'Bearer {get_access_token()}'
}
# Prepare form data
files = {
'resume': (os.path.basename(resume_file_path), open(resume_file_path, 'rb'), 'application/pdf')
}
data = {
'jd_text': job_description
}
# Generate collateral if job_row is provided
if job_row is not None:
try:
job_description_text = create_jd_smart_hiring(job_row)
if job_description_text:
collateral, job_id = generate_smart_hiring_collateral(job_description_text)
if collateral:
data['collateral'] = collateral
data['job_id'] = job_id
logger.info(f"Added collateral and job_id ({job_id}) to job fit analysis request")
elif job_id:
# Even if collateral is empty, we can still use the job_id
data['job_id'] = job_id
logger.info(f"Added job_id ({job_id}) to job fit analysis request (no collateral)")
except Exception as e:
logger.warning(f"Failed to generate collateral: {e}")
# Continue without collateral if generation fails
try:
# Make the API request with configured timeout
response = requests.post(url, headers=headers, files=files, data=data, timeout=None)
# If we get an authentication error, try to get a fresh token and retry once
if response.status_code == 401:
logger.warning("Authentication failed, getting fresh token...")
global access_token
access_token = None # Reset the token
new_token = get_access_token()
if new_token:
headers['Authorization'] = f'Bearer {new_token}'
# Close the previous file and reopen
files['resume'][1].close()
files['resume'] = (os.path.basename(resume_file_path), open(resume_file_path, 'rb'), 'application/pdf')
response = requests.post(url, headers=headers, files=files, data=data, timeout=None)
else:
# If we can't get a fresh token, return error
return {"error": "Authentication failed and could not obtain fresh token"}
if response.status_code == 200:
logger.info("Job fit analysis completed successfully")
return response.json()
elif response.status_code == 401:
# If we still get 401 after fresh token, return error
return {"error": "Authentication failed even with fresh token"}
else:
logger.error(f"API call failed with status {response.status_code}")
return {"error": f"API call failed with status {response.status_code}", "details": response.text}
except requests.exceptions.Timeout:
logger.error(f"API request timed out after {EXTERNAL_API_TIMEOUT} seconds")
return {"error": f"API request timed out after {EXTERNAL_API_TIMEOUT} seconds"}
except Exception as e:
logger.error(f"Exception occurred: {str(e)}")
return {"error": f"Exception occurred: {str(e)}"}
finally:
# Ensure the file is closed
if 'resume' in files:
try:
files['resume'][1].close()
except:
pass
@app.post("/process_resume_and_recommend_jobs")
async def process_resume_and_recommend_jobs(
resume: UploadFile = File(...),
resume_text: str = Form(""),
api_key: str = Depends(verify_api_key)
):
"""
Process resume, extract information, filter jobs by industry, and analyze fit
"""
request_start_time = time.time()
try:
logger.info(f"Processing resume: {resume.filename}")
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
shutil.copyfileobj(resume.file, tmp_file)
tmp_file_path = tmp_file.name
try:
# Extract text from PDF if no resume_text provided
if not resume_text:
resume_text = extract_text_from_pdf(tmp_file_path)
if not resume_text:
logger.error("Could not extract text from PDF file")
return JSONResponse(
status_code=400,
content={"error": "Could not extract text from PDF file"}
)
# Extract resume information using LLM
resume_info = extract_resume_info(resume_text)
# Load jobs data from PostgreSQL database
try:
jobs_df = pd.read_sql_table("jobs", con=engine)
candidates_df = pd.read_sql_table("candidates", con=engine)
submissions_df = pd.read_sql_table("candidate_submissions", con=engine)
logger.info(f"Loaded {len(jobs_df)} jobs, {len(candidates_df)} candidates, {len(submissions_df)} submissions")
except Exception as db_error:
logger.error(f"Database error: {db_error}")
return JSONResponse(
status_code=500,
content={"error": "Database connection error"}
)
# Filter jobs by industry
filtered_jobs = filter_jobs_by_industry(jobs_df, resume_info['industry'])
if filtered_jobs.empty:
logger.warning(f"No jobs found for industry: {resume_info['industry']}")
return JSONResponse(
status_code=404,
content={"message": f"No jobs found for industry: {resume_info['industry']}"}
)
# Filter jobs by location
location_filtered_jobs = filter_jobs_by_location(filtered_jobs, resume_info['location'])
# Filter jobs by experience level
experience_filtered_jobs = filter_jobs_by_experience(location_filtered_jobs, resume_info['yoe'])
# Filter jobs by priority
priority_filtered_jobs = filter_jobs_by_priority(experience_filtered_jobs)
# Use priority filtered jobs if available, otherwise fall back to experience filtered jobs, then location filtered jobs
if not priority_filtered_jobs.empty:
jobs_to_analyze = priority_filtered_jobs
elif not experience_filtered_jobs.empty:
jobs_to_analyze = experience_filtered_jobs
else:
jobs_to_analyze = location_filtered_jobs
# Create filtered_submission_df with job_ids from jobs_to_analyze
job_ids_to_analyze = jobs_to_analyze['id'].tolist()
filtered_submission_df = submissions_df[submissions_df['jobId'].isin(job_ids_to_analyze)]
# Check if candidate email exists in candidates_df
candidate_id = None
if resume_info.get('email'):
candidate_match = candidates_df[candidates_df['email'] == resume_info['email']]
if not candidate_match.empty:
candidate_id = candidate_match.iloc[0]['id']
logger.info(f"Found existing candidate with ID: {candidate_id}")
# Analyze job fit for each filtered job
job_analyses = []
# Use configured number of jobs to analyze
for _, job_row in jobs_to_analyze.head(MAX_JOBS_TO_ANALYZE).iterrows():
job_id = job_row.get('id')
# Check if we have an existing submission for this candidate and job
existing_submission = None
if candidate_id and job_id:
submission_match = filtered_submission_df[
(filtered_submission_df['candidate_id'] == candidate_id) &
(filtered_submission_df['jobId'] == job_id)
]
if not submission_match.empty:
existing_submission = submission_match.iloc[0]
logger.info(f"Found existing submission for job_id: {job_id}, candidate_id: {candidate_id}")
if existing_submission is not None:
# Use existing fit score from submission
fit_score = existing_submission.get('fit_score', 0)
existing_analysis = {
'final_response': {
'final_score': fit_score,
'summary': {
'strengths': [],
'weaknesses': [],
'opportunities': [],
'recommendations': []
}
},
'source': 'existing_submission'
}
analysis_result = existing_analysis
else:
# Call API for new analysis with retry logic
job_description = create_job_description(job_row)
analysis_result = await analyze_job_fit_with_retry(job_description, tmp_file_path, job_row)
analysis_result['source'] = 'api_call'
# Clean up the analysis result
cleaned_analysis = clean_analysis_result(analysis_result)
job_analysis = JobAnalysis(
job_title=job_row.get('job_title', 'Unknown'),
company_name=job_row.get('company_name', 'Unknown'),
analysis=cleaned_analysis
)
job_analyses.append(job_analysis.dict())
# Sort jobs by final_score in descending order (highest scores first)
job_analyses = sort_jobs_by_score(job_analyses)
# Count existing submissions vs API calls
existing_submissions_count = sum(1 for analysis in job_analyses if analysis.get('analysis', {}).get('source') == 'existing_submission')
api_calls_count = sum(1 for analysis in job_analyses if analysis.get('analysis', {}).get('source') == 'api_call')
# Clean up temporary file
os.unlink(tmp_file_path)
# Calculate processing time
processing_time = time.time() - request_start_time
logger.info(f"Request completed in {processing_time:.2f} seconds")
return {
"resume_info": resume_info,
"industry": resume_info['industry'],
"location": resume_info['location'],
"experience_years": resume_info['yoe'],
"jobs_analyzed": len(job_analyses),
"location_filtered": not location_filtered_jobs.empty,
"experience_filtered": not experience_filtered_jobs.empty,
"priority_filtered": not priority_filtered_jobs.empty,
"existing_submissions_used": existing_submissions_count,
"api_calls_made": api_calls_count,
"candidate_found": candidate_id is not None,
"processing_time_seconds": round(processing_time, 2),
"job_analyses": job_analyses
}
except Exception as e:
# Clean up temporary file in case of error
if os.path.exists(tmp_file_path):
os.unlink(tmp_file_path)
raise e
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
return JSONResponse(
status_code=500,
content={"error": f"Processing failed: {str(e)}"}
)
@app.get("/health")
async def health_check(api_key: str = Depends(verify_api_key)):
"""
Health check endpoint with database connectivity check
"""
health_status = {
"status": "healthy",
"message": "Job Recommendation API is running",
"timestamp": time.time()
}
# Check database connectivity
try:
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
health_status["database"] = "connected"
except Exception as e:
logger.error(f"Database health check failed: {e}")
health_status["database"] = "disconnected"
health_status["status"] = "degraded"
return health_status
@app.get("/")
async def root():
"""
Root endpoint
"""
return {
"message": "Job Recommendation API",
"version": "1.0.0",
"docs": "/docs",
"health": "/health"
}
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 8080))
logger.info(f"Starting server on port {port}")
uvicorn.run(app, host="0.0.0.0", port=port)