import gradio as gr
import requests
import json
import os
import time
from typing import List, Dict, Any
from dotenv import load_dotenv
import concurrent.futures
import git
repo = git.Repo(search_parent_directories=True)
sha = repo.head.object.hexsha
js_func = """
function refresh() {
const url = new URL(window.location);
if (url.searchParams.get('__theme') !== 'light') {
url.searchParams.set('__theme', 'light');
window.location.href = url.href;
}
}
"""
# Load environment variables
load_dotenv(".env.local")
# Load endpoints from JSON file
with open('endpoints.json', 'r') as f:
ENDPOINTS = json.load(f)
# Get HuggingFace API token from environment variable
HF_TOKEN = os.getenv('HUGGINGFACE_TOKEN')
if not HF_TOKEN:
print("Warning: HUGGINGFACE_TOKEN environment variable not set")
# API calling function with retry logic
def call_api(endpoint_url: str, payload: Dict[str, Any], max_retries: int = 5, retry_delay: int = 2) -> Dict:
"""Call API endpoint with retry logic"""
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
for attempt in range(max_retries):
try:
response = requests.post(
endpoint_url,
json=payload,
headers=headers,
timeout=30
)
if response.status_code == 200:
return response.json()
elif response.status_code == 503:
print(f"Service temporarily unavailable (503). Retrying... (Attempt {attempt + 1}/{max_retries})")
time.sleep(retry_delay * (attempt + 1)) # Exponential backoff
continue
else:
print(f"Error calling API: {response.status_code}")
print(f"Response: {response.text}")
return {}
except requests.exceptions.Timeout:
print(f"Request timed out. Attempt {attempt + 1}/{max_retries}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
except Exception as e:
print(f"Exception while calling API: {str(e)}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
return {}
def wake_servers(progress=gr.Progress()):
"""Send wake-up requests to all endpoints in parallel with real-time updates"""
results = {}
status_html = "
Server Wake-up Results:
"
for name in ENDPOINTS.keys():
results[name] = "Pending..."
status_html += f"
{name}: Pending...
"
status_html += "
"
# Initial status HTML
yield status_html
def update_status_html():
"""Generate HTML for current status"""
html = "
Server Wake-up Results:
"
for name, status in results.items():
status_color = "green" if "Status: 200" in status else "red" if "Error" in status or "Failed" in status else "gray"
html += f"
{name}: {status}
"
html += "
"
return html
def try_wake_endpoint(name, url):
"""Helper function to wake endpoint with retry logic"""
retry_delays = [10] * 30 # Seconds to wait between retries
for retry_count, retry_delay in enumerate(retry_delays):
try:
# Update status to show attempt
results[name] = f"Attempting to connect... (try {retry_count+1}/{len(retry_delays)+1})"
# Send a small payload just to wake up the server
minimal_payload = {"inputs": "Hello"}
response = requests.post(
url,
json=minimal_payload,
headers={"Authorization": f"Bearer {HF_TOKEN}"},
timeout=45
)
if response.status_code == 200:
results[name] = f"Status: {response.status_code}"
return
else:
# Non-200 response, prepare for retry
if retry_count < len(retry_delays):
results[name] = f"Status: {response.status_code}, retrying in {retry_delay}s... (attempt {retry_count+1}/{len(retry_delays)})"
time.sleep(retry_delay)
else:
# All retries failed
results[name] = f"Status: {response.status_code} (Failed after {len(retry_delays)} retries)"
return
except Exception as e:
# Connection error, prepare for retry
if retry_count < len(retry_delays):
results[name] = f"Error connecting, retrying in {retry_delay}s... (attempt {retry_count+1}/{len(retry_delays)})"
time.sleep(retry_delay)
else:
# All retries failed
results[name] = f"Error: {str(e)} (Failed after {len(retry_delays)} retries)"
return
# Function to process a single endpoint and update UI
def process_endpoint(name, url):
try:
try_wake_endpoint(name, url)
finally:
# Return the updated status HTML
return update_status_html()
# Create a thread pool to wake up servers in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
# Start all tasks
futures = {executor.submit(process_endpoint, name, url): name
for name, url in ENDPOINTS.items()}
# Process results as they complete
for future in concurrent.futures.as_completed(futures):
name = futures[future]
try:
# Get the updated status HTML
status_html = future.result()
# Update progress
progress(sum(1 for r in results.values() if "Status: 200" in r) / len(ENDPOINTS),
desc=f"Waking up servers ({sum(1 for r in results.values() if r != 'Pending...')} of {len(ENDPOINTS)} processed)")
# Yield the updated status to show in real-time
yield status_html
except Exception as e:
print(f"Error processing {name}: {str(e)}")
results[name] = f"Error: Internal processing error"
yield update_status_html()
# Final update after all are complete
progress(1.0, desc="Complete!")
yield update_status_html()
def process_job_description(job_description: str) -> Dict:
"""Process job description and extract skills using the job endpoint"""
payload = {"inputs": job_description}
result = call_api(ENDPOINTS["job"], payload)
if not result:
# Return a fallback structure if API call fails
return {"skills": [], "total_skills": 0}
# Format the result to match expected structure
if "skills" in result:
# Add a "text" field to each skill for compatibility
for skill in result["skills"]:
skill["text"] = skill.get("name", "Unknown Skill")
result["total_skills"] = len(result["skills"])
else:
result = {"skills": [], "total_skills": 0}
return result
def process_skill_quality(text: str) -> Dict:
"""Process a sentence through the skill quality endpoint"""
payload = {"inputs": text}
result = call_api(ENDPOINTS["skill_quality"], payload)
if not result:
return {"leadership": 0, "leadership_token": "No", "collaboration": 0, "collaboration_token": "No"}
return result
def process_skill_quality_batch(sentences):
"""Process multiple sentences through the skill quality endpoint concurrently"""
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_sentence = {
executor.submit(process_skill_quality, sentence): sentence
for sentence in sentences
}
for future in concurrent.futures.as_completed(future_to_sentence):
sentence = future_to_sentence[future]
try:
quality_score = future.result()
is_leadership = quality_score["leadership_token"] == "Yes"
is_collaboration = not is_leadership and quality_score["collaboration_token"] == "Yes"
results.append({
"sentence": sentence,
"is_leadership": is_leadership,
"is_collaboration": is_collaboration,
"raw_score": quality_score
})
except Exception as e:
print(f"Error processing sentence: {sentence[:30]}... - {str(e)}")
results.append({
"sentence": sentence,
"is_leadership": False,
"is_collaboration": False,
"raw_score": {"leadership": 0, "leadership_token": "No", "collaboration": 0, "collaboration_token": "No"}
})
return results
def process_single_resume(file_path, job_skills, progress=None, resume_index=0, total_resumes=1, resume_status=None):
"""Process a single resume file"""
progress_base = 0.4 + (0.5 * resume_index / total_resumes)
progress_cap = 0.4 + (0.5 * (resume_index + 1) / total_resumes)
resume_name = os.path.basename(file_path)
try:
if progress is not None:
progress(progress_base, desc=f"Processing resume {resume_index+1}/{total_resumes}: {resume_name}...")
# Update resume status if provided
if resume_status is not None:
resume_status[resume_name] = {"progress": 0, "status": "Starting analysis...", "sentences_processed": 0, "total_sentences": 0}
print(f"Initialized status for {resume_name}")
except Exception as e:
print(f"Error initializing status for {resume_name}: {str(e)}")
try:
with open(file_path, 'r', encoding='utf-8') as f:
resume_content = f.read()
print(f"Starting processing of {resume_name}")
result = process_resume(resume_content, job_skills,
progress=progress, progress_base=progress_base, progress_cap=progress_cap,
resume_status=resume_status, resume_name=resume_name)
print(f"Finished processing {resume_name}")
# Return both the result and the filename
return result, resume_name
except Exception as e:
print(f"Error processing {resume_name}: {str(e)}")
if resume_status is not None:
resume_status[resume_name]["status"] = f"Error: {str(e)}"
return {"skills": [], "total_skills": 0}, resume_name
def update_resume_status_html(resume_status):
"""Generate HTML table for resume processing status"""
html = "
Resume Processing Status:
"
html += "
"
html += "
Resume
"
html += "
Progress
"
html += "
Status
"
# Print the current status to aid debugging
print(f"Current resume status: {resume_status}")
for resume_name, status in resume_status.items():
progress_pct = 0
if status["total_sentences"] > 0:
progress_pct = round((status["sentences_processed"] / status["total_sentences"]) * 100)
progress_text = f"{status['sentences_processed']}/{status['total_sentences']} sentences ({progress_pct}%)"
# Color based on progress
if progress_pct == 100:
color = "green"
elif progress_pct > 0:
color = "orange"
else:
color = "gray"
html += f"
{resume_name}
"
html += f"
{progress_text}
"
html += f"
{status['status']}
"
html += "
"
return html
def process_resume(resume_text: str, job_skills: List[str], progress=None, progress_base=0.4, progress_cap=0.9, resume_status=None, resume_name=None) -> Dict:
"""Process resume using the resume endpoint"""
payload = {"inputs": resume_text}
# Thread-safe updating of resume status
try:
if resume_status is not None and resume_name is not None:
# Use a try-except block for thread safety when updating shared state
try:
resume_status[resume_name]["status"] = "Extracting roles..."
print(f"Updating status for {resume_name}: Extracting roles...")
except Exception as e:
print(f"Error updating initial status for {resume_name}: {str(e)}")
except Exception as e:
print(f"Error in initial status update: {str(e)}")
# Call the API
result = call_api(ENDPOINTS["resume"], payload, max_retries=10, retry_delay=10)
if not result:
# Update status on failure and raise exception to indicate failure
if resume_status is not None and resume_name is not None:
resume_status[resume_name]["status"] = "Error: Failed to extract roles"
resume_status[resume_name]["failed"] = True # Mark as failed
raise Exception("Failed to extract roles from resume")
# Count total sentences for progress tracking
total_sentences = 0
for job in result:
if "description" in job:
total_sentences += len(job.get("description", []))
# Update status with total sentences - thread safe
try:
if resume_status is not None and resume_name is not None:
resume_status[resume_name]["total_sentences"] = total_sentences
print(f"Total sentences for {resume_name}: {total_sentences}")
except Exception as e:
print(f"Error updating total sentences for {resume_name}: {str(e)}")
# Extract all skills from all job experiences
all_skills = []
processed_sentences = 0
# Process skill quality for each role description
for job in result:
if "skills" in job:
for skill in job["skills"]:
# Add a "text" field for compatibility
skill["text"] = skill.get("name", "Unknown Skill")
all_skills.append(skill)
# Process skill quality for sentences
if "description" in job and job["description"]:
# Get all sentences for this job
sentences = job.get("description", [])
# Process all sentences for this job
quality_scores = process_skill_quality_batch(sentences)
# Update progress after batch processing - thread safe
processed_sentences += len(sentences)
try:
if resume_status is not None and resume_name is not None:
resume_status[resume_name]["sentences_processed"] = processed_sentences
resume_status[resume_name]["progress"] = round(processed_sentences/total_sentences*100)
resume_status[resume_name]["status"] = f"Analyzing skill quality... ({processed_sentences}/{total_sentences})"
print(f"Updated {resume_name} progress: {processed_sentences}/{total_sentences} sentences")
except Exception as e:
print(f"Error updating progress for {resume_name}: {str(e)}")
job["quality_scores"] = quality_scores
# Update status to complete - thread safe
try:
if resume_status is not None and resume_name is not None and not resume_status[resume_name].get("failed", False):
resume_status[resume_name]["status"] = "Analysis complete"
resume_status[resume_name]["sentences_processed"] = total_sentences
resume_status[resume_name]["progress"] = 100
print(f"Completed analysis for {resume_name}")
except Exception as e:
print(f"Error updating final status for {resume_name}: {str(e)}")
# Add fields to match expected structure
formatted_result = {
"skills": all_skills,
"total_skills": len(all_skills),
"roles": result # Keep the original roles data
}
return formatted_result
# Create a helper function to format years of experience in a readable format
def format_years_of_experience(years):
"""Format years as a combination of years and months"""
full_years = int(years)
months = int(round((years - full_years) * 12))
if full_years > 0 and months > 0:
return f"{full_years}y {months}m"
elif full_years > 0:
return f"{full_years}y"
elif months > 0:
return f"{months}m"
else:
return "0"
def create_html_output(job_result: Dict, resume_results: List[Dict], filenames: List[str] = None) -> str:
"""Create HTML output for the interface"""
html = "
"
# Remove the global script since it's not working
# Set default filenames if not provided
if not filenames:
filenames = [f"Resume {i}" for i in range(1, len(resume_results) + 1)]
# Job Description Section
html += "
Job Description Analysis
"
html += f"
Total Skills Found: {job_result['total_skills']}
"
html += "
Skills:
"
html += "
"
for skill in job_result['skills']:
html += f"{skill['text']}"
html += "
"
# Get job skills for matching
job_skills = [skill['text'].lower() for skill in job_result['skills']]
# Resume Analysis Section
html += "
Resume Analysis
"
# Check if we have multiple resumes to display summary table
multiple_resumes = len(resume_results) > 1
# Calculate leadership and collaboration counts, and total experience for each resume
leadership_counts = []
collaboration_counts = []
total_experiences = []
skill_experience_maps = []
skill_leadership_maps = [] # New map for tracking leadership skills
skill_collaboration_maps = [] # New map for tracking collaboration skills
for resume_result in resume_results:
# Count leadership and collaboration sentences
leadership_count = 0
collaboration_count = 0
# Calculate total experience
total_experience = 0
skill_experience = {}
skill_leadership = {} # Track which skills have leadership statements
skill_collaboration = {} # Track which skills have collaboration statements
if 'roles' in resume_result:
for role in resume_result['roles']:
# Count quality scores and track skills with leadership/collaboration
if 'quality_scores' in role:
for score in role['quality_scores']:
if score['is_leadership']:
leadership_count += 1
# Extract skills from leadership statement
for skill in role.get('skills', []):
skill_name = skill.get('name', '').lower()
if skill_name:
skill_leadership[skill_name] = True
elif score['is_collaboration']:
collaboration_count += 1
# Extract skills from collaboration statement
for skill in role.get('skills', []):
skill_name = skill.get('name', '').lower()
if skill_name:
skill_collaboration[skill_name] = True
# Calculate experience duration
if 'role_length' in role:
# Convert months to years (role_length is in months)
years_in_role = role['role_length'] / 12
total_experience += years_in_role
# Calculate experience per skill
for skill in role.get('skills', []):
skill_name = skill.get('name', '').lower()
if skill_name:
if skill_name in skill_experience:
skill_experience[skill_name] += years_in_role
else:
skill_experience[skill_name] = years_in_role
elif 'dates' in role and role['dates']:
# Fallback to old method if role_length is not available
start_date = role['dates'].get('date_started', '')
end_date = role['dates'].get('date_ended', '')
try:
# Try to extract years from dates
start_year = int(''.join(filter(str.isdigit, start_date[-4:]))) if start_date else 0
end_year = int(''.join(filter(str.isdigit, end_date[-4:]))) if end_date and end_date.lower() != 'present' else time.localtime().tm_year
years_in_role = max(0, end_year - start_year)
total_experience += years_in_role
# Calculate experience per skill
for skill in role.get('skills', []):
skill_name = skill.get('name', '').lower()
if skill_name:
if skill_name in skill_experience:
skill_experience[skill_name] += years_in_role
else:
skill_experience[skill_name] = years_in_role
except:
# Skip if date parsing fails
pass
leadership_counts.append(leadership_count)
collaboration_counts.append(collaboration_count)
total_experiences.append(total_experience)
skill_experience_maps.append(skill_experience)
skill_leadership_maps.append(skill_leadership)
skill_collaboration_maps.append(skill_collaboration)
# Calculate averages for leadership and collaboration
avg_leadership = sum(leadership_counts) / len(leadership_counts) if leadership_counts else 0
avg_collaboration = sum(collaboration_counts) / len(collaboration_counts) if collaboration_counts else 0
# Create summary table if multiple resumes
if multiple_resumes:
html += "
Match Summary
"
html += "
"
html += "
"
html += "
JOB ID
"
html += "
CANDIDATE
"
html += "
% MATCHED SKILLS
"
html += "
SKILL
"
html += "
YEARS OF EXPERIENCE
"
html += "
MATCH CATEGORY
"
html += "
LEADERSHIP QUALITY
"
html += "
COLLABORATION QUALITY
"
html += "
ACTIONS
"
# Create a list of resume data for sorting
resume_data = []
for i, resume_result in enumerate(resume_results, 1):
# Get primary skills (Java and React in this case, as per image)
primary_skills = {}
skill_exp = {} # Create a new dict to aggregate experience
# First, aggregate all experience for each skill
for skill_name, years in skill_experience_maps[i-1].items():
skill_name_lower = skill_name.lower()
if skill_name_lower in skill_exp:
skill_exp[skill_name_lower] = max(skill_exp[skill_name_lower], years) # Take the max experience
else:
skill_exp[skill_name_lower] = years
# Get unique resume skills
resume_skills = set(skill['text'].lower() for skill in resume_result['skills'])
# Only include skills that are in both the resume and job requirements
matched_skills = []
for skill in job_skills:
if skill.lower() in resume_skills:
matched_skills.append(skill)
if skill.lower() in skill_exp:
primary_skills[skill] = skill_exp[skill.lower()]
# Calculate skill match
# resume_skills = [skill['text'].lower() for skill in resume_result['skills']]
# matched_skills = [skill for skill in resume_skills if skill in job_skills]
match_count = len(matched_skills)
match_percentage = round((match_count / job_result['total_skills'] * 100) if job_result['total_skills'] > 0 else 0, 1)
# Get leadership and collaboration counts
leadership_count = leadership_counts[i-1]
collaboration_count = collaboration_counts[i-1]
# Get total experience and determine category
total_experience = total_experiences[i-1]
# Determine leadership quality (YES/NO)
leadership_quality = "YES" if leadership_count > avg_leadership * 1.2 else "NO"
# Determine thoroughness quality (YES/NO)
collaboration_quality = "YES" if collaboration_count > avg_collaboration * 1.2 else "NO"
# Determine match category (Strong, Close, Weak)
if match_percentage >= 80:
match_category = "Strong Match"
if leadership_quality == "YES" and collaboration_quality == "YES":
match_category = "Strong Quality Match (Leadership and Collaboration)"
elif leadership_quality == "YES":
match_category = "Strong Quality Match (Leadership)"
elif collaboration_quality == "YES":
match_category = "Strong Quality Match (Collaboration)"
elif match_percentage >= 50:
match_category = "Close Match"
if leadership_quality == "YES" and collaboration_quality == "YES":
match_category = "Close Quality Match (Leadership and Collaboration)"
elif leadership_quality == "YES":
match_category = "Close Quality Match (Leadership)"
elif collaboration_quality == "YES":
match_category = "Close Match (Collaboration)"
else:
match_category = "Weak Match"
if leadership_quality == "YES" and collaboration_quality == "YES":
match_category = "Weak Quality Match (Leadership and Collaboration)"
elif leadership_quality == "YES":
match_category = "Weak Quality Match (Leadership)"
elif collaboration_quality == "YES":
match_category = "Weak Quality Match (Collaboration)"
# Add to resume data list for sorting
resume_data.append({
'job_id': "JD-1",
'index': i,
'match_percentage': match_percentage,
'matched_skills': matched_skills,
'primary_skills': primary_skills,
'match_category': match_category,
'leadership_quality': leadership_quality,
'collaboration_quality': collaboration_quality,
'leadership_count': leadership_count,
'collaboration_count': collaboration_count,
'filename': filenames[i-1]
})
# Sort resumes by match percentage (highest first)
sorted_resumes = sorted(resume_data, key=lambda x: -x['match_percentage'])
# Add rows to summary table
for resume_data in sorted_resumes:
# Generate table row for each candidate
file_name = resume_data['filename'] # Convert 1,2,3 to A,B,C
# Create a single row per candidate
html += "
"
# Job ID
html += f"
{resume_data['job_id']}
"
# Candidate
html += f"
{file_name}
"
# Match Percentage
html += f"
{resume_data['match_percentage']}%
"
# Skills and Experience
if resume_data['matched_skills']:
# Show the first matched skill and its experience
skill = resume_data['matched_skills'][0]
html += f"
"
# Leadership and Collaboration Quality with counts specific to this resume
leadership_color = "#e6ffe6" if resume_data['leadership_quality'] == "YES" else "#f5f5f5"
collaboration_color = "#e6ffe6" if resume_data['collaboration_quality'] == "YES" else "#f5f5f5"
html += f"
"
# If there are additional matched skills, add them in subsequent rows
if len(resume_data['matched_skills']) > 1:
for skill in resume_data['matched_skills'][1:]:
html += "
"
# Empty cells for the first three columns
html += "
"
# Empty cells for the remaining columns
html += "
"
html += "
"
html += "
"
html += "
"
html += "
"
html += "
"
# Add individual resume sections
for i, resume_result in enumerate(resume_results, 1):
# For multiple resumes, make detailed view hidden by default
display_style = "none" if multiple_resumes else "block"
# If single resume, show matched skills count
if not multiple_resumes:
resume_skills = [skill['text'].lower() for skill in resume_result['skills']]
matched_skills = [skill for skill in resume_skills if skill in job_skills]
match_count = len(matched_skills)
match_percentage = round((match_count / job_result['total_skills'] * 100) if job_result['total_skills'] > 0 else 0, 1)
html += f"
"
# Show leadership and collaboration counts
leadership_count = leadership_counts[i-1]
collaboration_count = collaboration_counts[i-1]
total_experience = total_experiences[i-1]
if total_experience < 3:
category = "Entry"
elif total_experience < 5:
category = "Intermediate"
else:
category = "Advanced"
# Add quality modifier if 50% above average
if leadership_count > avg_leadership * 1.5:
category = f"Quality {category} (Leadership)"
elif collaboration_count > avg_collaboration * 1.5:
category = f"Quality {category} (Collaboration)"
html += f"
Leadership Sentences: {leadership_count}
"
html += f"
Collaboration Sentences: {collaboration_count}
"
html += f"
Total Years of Experience: {format_years_of_experience(total_experience)}
"
html += f"
Category: {category}
"
# Get the filename for this resume
resume_file = filenames[i-1]
# Detailed resume section with visibility control
html += f"
"
html += f"
{resume_file} Details
"
# Skills Summary specific to this resume - Now inside the detail section
html += "
Skills Summary
"
html += "
"
html += f""""""
html += "
"
html += f"
"
html += "
Skill
"
html += "
Years of Experience
"
html += "
Leadership Quality Count
"
html += "
Collaboration Quality Count
"
html += "
Match
"
# Get all unique skills for this specific resume
resume_skills = set()
for skill in resume_result['skills']:
resume_skills.add(skill['text'].lower())
# Get skill maps for this resume
skill_experience = skill_experience_maps[i-1]
skill_leadership = skill_leadership_maps[i-1]
skill_collaboration = skill_collaboration_maps[i-1]
# Create a list of skill data for sorting
skill_data = []
added_skills = set() # Track which skills we've already added
# Count quality statements per skill
skill_leadership_counts = {}
skill_collaboration_counts = {}
if 'roles' in resume_result:
for role in resume_result['roles']:
if 'quality_scores' in role:
role_skills = {skill['name'].lower() for skill in role.get('skills', [])}
for score in role['quality_scores']:
for skill in role_skills:
if score['is_leadership']:
skill_leadership_counts[skill] = skill_leadership_counts.get(skill, 0) + 1
elif score['is_collaboration']:
skill_collaboration_counts[skill] = skill_collaboration_counts.get(skill, 0) + 1
for skill in resume_skills:
# Only add if we haven't seen this skill before
if skill not in added_skills:
# Get years of experience for this skill
years = skill_experience.get(skill, 0)
# Get quality counts for this skill
leadership_count = skill_leadership_counts.get(skill, 0)
collaboration_count = skill_collaboration_counts.get(skill, 0)
# Check if skill matches job requirements
is_match = skill in job_skills
skill_data.append({
'skill': skill,
'years': years,
'leadership_count': leadership_count,
'collaboration_count': collaboration_count,
'is_match': is_match
})
added_skills.add(skill)
# Sort skills by years of experience (descending)
skill_data.sort(key=lambda x: (-x['years'], x['skill']))
# Add rows for each skill
for data in skill_data:
# Set initial display based on match (only show matched skills by default)
display = "none" if not data['is_match'] else ""
# Add row to summary table
html += f"
"
html += f"
{data['skill']}
"
html += f"
{format_years_of_experience(data['years'])}
"
html += f"
{data['leadership_count']}
"
html += f"
{data['collaboration_count']}
"
html += f"
{'Yes' if data['is_match'] else 'No'}
"
html += "
"
# Display all skills found in the resume
html += "
Skills Found:
"
html += "
"
# Keep track of skills we've already added
added_skills = set()
for skill in resume_result['skills']:
skill_text = skill['text'].lower()
# Only add if we haven't seen this skill before
if skill_text not in added_skills:
# Highlight matched skills
is_match = skill_text in job_skills
bg_color = "#c8e6c9" if is_match else "#e0e0e0" # Green tint for matches
# Add years of experience for this skill if available
skill_years = skill_experience.get(skill_text, 0)
experience_text = f" ({format_years_of_experience(skill_years)})" if skill_years > 0 else ""
html += f"{skill['text']}{experience_text}"
added_skills.add(skill_text)
html += "
"
# Job roles section
if 'roles' in resume_result and resume_result['roles']:
html += "
Job Experience:
"
for role in resume_result['roles']:
html += f"
"
html += f"
Title: {' '.join(role.get('title', ['Unknown']))}
"
if 'dates' in role and role['dates']:
html += f"
Period: {role['dates'].get('date_started', 'Unknown')} to {role['dates'].get('date_ended', 'Unknown')}
"
if 'role_length' in role:
years = role['role_length'] / 12
months = role['role_length'] % 12
duration_text = ""
if years >= 1:
duration_text += f"{int(years)} year{'s' if int(years) > 1 else ''}"
if months > 0:
if duration_text:
duration_text += " and "
duration_text += f"{int(months)} month{'s' if int(months) > 1 else ''}"
html += f"
Duration: {duration_text}
"
html += f"
Role Skills:
"
html += "
"
for skill in role.get('skills', []):
# Highlight matched skills in roles too
skill_name = skill.get('name', 'Unknown')
is_match = skill_name.lower() in job_skills
bg_color = "#c8e6c9" if is_match else "#e0e0e0" # Green tint for matches
html += f"{skill_name}"
html += "
"
# Display skill quality analysis
if 'quality_scores' in role and role['quality_scores']:
html += "
Skill Quality Analysis:
"
html += "
"
html += "
Statement
Leadership
Collaboration
"
for score in role['quality_scores']:
leadership_class = "green-text" if score['is_leadership'] else "red-text"
collab_class = "green-text" if score['is_collaboration'] else "red-text"
html += f"
{score['sentence']}
"
html += f"
{'Yes' if score['is_leadership'] else 'No'}
"
html += f"
{'Yes' if score['is_collaboration'] else 'No'}
"
html += "
"
html += "
"
html += "
"
html += "
"
return html
# Create Gradio interface
with gr.Blocks(title="Beyond Keywords: Resume Analysis System", js=js_func) as demo:
gr.Markdown("# Beyond Keywords: Job Description and Resume Analyzer")
gr.Markdown(f"Running on Commit: {sha[:7]}")
gr.Markdown("Upload a job description and resume(s) to analyze skill matches and quality.")
# Remove the JavaScript that isn't working
# Instead, use Gradio's built-in features for status updates
# Server wake-up section
with gr.Row():
wake_btn = gr.Button("Wake Servers (Do this first!) - Might take a 1-3 minutes for cold starts.")
wake_status = gr.HTML(label="Server Status", value="
Click 'Wake Servers' to initialize the system...
")
# Input section
with gr.Row():
with gr.Column():
job_description = gr.Textbox(
label="Job Description",
placeholder="Paste the job description here...",
lines=13.10
)
with gr.Column():
resume_input = gr.Group()
with resume_input:
input_type = gr.Radio(
choices=["Paste Text", "Upload File(s)"],
label="Input Method",
value="Paste Text"
)
resume_text = gr.Textbox(
label="Resume Text",
placeholder="Paste the resume text here...",
lines=8.85,
visible=True
)
resume_file = gr.Files(
label="Upload Resume(s) (.txt files)",
file_types=[".txt"],
visible=False,
interactive=True,
type="filepath"
)
def toggle_input(choice):
return {
resume_text: gr.update(visible=choice=="Paste Text"),
resume_file: gr.update(visible=choice=="Upload File(s)")
}
input_type.change(
fn=toggle_input,
inputs=input_type,
outputs=[resume_text, resume_file]
)
submit_btn = gr.Button("Analyze", variant="primary")
# Place status table before the main results
resume_status = gr.HTML(label="Resume Processing Status", elem_id="resume-status-div", value="
Click 'Analyze' to see processing status...
")
# Main output area
output = gr.HTML(label="Analysis Results", value="")
# Function to process with real-time updates using generators
def process_with_updates(job_description, input_type, resume_text, resume_files):
"""Process inputs with real-time status updates using generators"""
# Initialize resume status tracking
resume_status = {}
# Initial status HTML
status_html = "
Resume Processing Status:
Preparing to analyze resumes...
"
yield status_html, gr.update(value="
Processing job description...
")
# Process job description first
job_result = process_job_description(job_description)
job_skills = [skill['text'] for skill in job_result['skills']]
# Set up initial status table
resume_results = []
filenames = []
if input_type == "Paste Text":
# Process single resume
resume_name = "Pasted Resume"
resume_status[resume_name] = {
"progress": 0,
"status": "Starting analysis...",
"sentences_processed": 0,
"total_sentences": 0,
"failed": False
}
status_html = update_resume_status_html(resume_status)
yield status_html, gr.update(value="
")
else:
# Process multiple resumes
resume_count = len(resume_files)
# Initialize status for each resume
for file_path in resume_files:
resume_name = os.path.basename(file_path)
resume_status[resume_name] = {
"progress": 0,
"status": "Queued",
"sentences_processed": 0,
"total_sentences": 0,
"failed": False
}
# Initial status update
status_html = update_resume_status_html(resume_status)
yield status_html, gr.update(value=f"
Processing {resume_count} resumes...
")
# Create a thread pool to process resumes concurrently
with concurrent.futures.ThreadPoolExecutor() as executor:
# Create a dict to track all futures
future_to_resume = {}
# Submit all resume processing tasks
for file_path in resume_files:
resume_name = os.path.basename(file_path)
# Update status to "Processing"
resume_status[resume_name]["status"] = "Starting analysis..."
# Read the file content
try:
with open(file_path, 'r', encoding='utf-8') as f:
resume_content = f.read()
# Submit the processing task
future = executor.submit(
process_resume,
resume_content,
job_skills,
resume_status=resume_status,
resume_name=resume_name
)
future_to_resume[future] = resume_name
except Exception as e:
print(f"Error reading {resume_name}: {str(e)}")
resume_status[resume_name]["status"] = f"Error: {str(e)}"
resume_status[resume_name]["failed"] = True
# Process with status updates
pending = set(future_to_resume.keys())
completed = 0
# Use a while loop to check pending futures and update status
while pending:
# Yield current status every iteration
status_html = update_resume_status_html(resume_status)
yield status_html, gr.update(value=f"
Processed {completed}/{resume_count} resumes...
")
# Wait for the next future to complete (with short timeout)
done, pending = concurrent.futures.wait(
pending, timeout=1.0,
return_when=concurrent.futures.FIRST_COMPLETED
)
# Process completed futures
for future in done:
resume_name = future_to_resume[future]
try:
result = future.result()
if not resume_status[resume_name].get("failed", False):
resume_results.append(result)
filenames.append(resume_name)
completed += 1
print(f"Completed processing of {resume_name} ({completed}/{resume_count})")
except Exception as e:
print(f"Error processing resume {resume_name}: {str(e)}")
resume_status[resume_name]["status"] = f"Error: {str(e)}"
resume_status[resume_name]["failed"] = True
# Final status update
status_html = update_resume_status_html(resume_status)
yield status_html, gr.update(value=f"
All {resume_count} resumes processed. Generating report...
")
# Generate final output
html_output = create_html_output(job_result, resume_results, filenames)
# Final yield with complete status and output
status_html = update_resume_status_html(resume_status)
yield status_html, html_output
# Connect the submit button to the processing function with real-time updates
submit_btn.click(
fn=process_with_updates,
inputs=[job_description, input_type, resume_text, resume_file],
outputs=[resume_status, output]
)
# Connect wake button
wake_btn.click(fn=wake_servers, inputs=None, outputs=wake_status)
gr.Markdown("""""")
if __name__ == "__main__":
demo.launch()