import gradio as gr import requests import json import os import time from typing import List, Dict, Any from dotenv import load_dotenv import concurrent.futures import git repo = git.Repo(search_parent_directories=True) sha = repo.head.object.hexsha js_func = """ function refresh() { const url = new URL(window.location); if (url.searchParams.get('__theme') !== 'light') { url.searchParams.set('__theme', 'light'); window.location.href = url.href; } } """ # Load environment variables load_dotenv(".env.local") # Load endpoints from JSON file with open('endpoints.json', 'r') as f: ENDPOINTS = json.load(f) # Get HuggingFace API token from environment variable HF_TOKEN = os.getenv('HUGGINGFACE_TOKEN') if not HF_TOKEN: print("Warning: HUGGINGFACE_TOKEN environment variable not set") # API calling function with retry logic def call_api(endpoint_url: str, payload: Dict[str, Any], max_retries: int = 5, retry_delay: int = 2) -> Dict: """Call API endpoint with retry logic""" headers = {"Authorization": f"Bearer {HF_TOKEN}"} for attempt in range(max_retries): try: response = requests.post( endpoint_url, json=payload, headers=headers, timeout=30 ) if response.status_code == 200: return response.json() elif response.status_code == 503: print(f"Service temporarily unavailable (503). Retrying... (Attempt {attempt + 1}/{max_retries})") time.sleep(retry_delay * (attempt + 1)) # Exponential backoff continue else: print(f"Error calling API: {response.status_code}") print(f"Response: {response.text}") return {} except requests.exceptions.Timeout: print(f"Request timed out. Attempt {attempt + 1}/{max_retries}") if attempt < max_retries - 1: time.sleep(retry_delay) except Exception as e: print(f"Exception while calling API: {str(e)}") if attempt < max_retries - 1: time.sleep(retry_delay) return {} def wake_servers(progress=gr.Progress()): """Send wake-up requests to all endpoints in parallel with real-time updates""" results = {} status_html = "

Server Wake-up Results:

" # Initial status HTML yield status_html def update_status_html(): """Generate HTML for current status""" html = "

Server Wake-up Results:

" return html def try_wake_endpoint(name, url): """Helper function to wake endpoint with retry logic""" retry_delays = [10] * 30 # Seconds to wait between retries for retry_count, retry_delay in enumerate(retry_delays): try: # Update status to show attempt results[name] = f"Attempting to connect... (try {retry_count+1}/{len(retry_delays)+1})" # Send a small payload just to wake up the server minimal_payload = {"inputs": "Hello"} response = requests.post( url, json=minimal_payload, headers={"Authorization": f"Bearer {HF_TOKEN}"}, timeout=45 ) if response.status_code == 200: results[name] = f"Status: {response.status_code}" return else: # Non-200 response, prepare for retry if retry_count < len(retry_delays): results[name] = f"Status: {response.status_code}, retrying in {retry_delay}s... (attempt {retry_count+1}/{len(retry_delays)})" time.sleep(retry_delay) else: # All retries failed results[name] = f"Status: {response.status_code} (Failed after {len(retry_delays)} retries)" return except Exception as e: # Connection error, prepare for retry if retry_count < len(retry_delays): results[name] = f"Error connecting, retrying in {retry_delay}s... (attempt {retry_count+1}/{len(retry_delays)})" time.sleep(retry_delay) else: # All retries failed results[name] = f"Error: {str(e)} (Failed after {len(retry_delays)} retries)" return # Function to process a single endpoint and update UI def process_endpoint(name, url): try: try_wake_endpoint(name, url) finally: # Return the updated status HTML return update_status_html() # Create a thread pool to wake up servers in parallel with concurrent.futures.ThreadPoolExecutor() as executor: # Start all tasks futures = {executor.submit(process_endpoint, name, url): name for name, url in ENDPOINTS.items()} # Process results as they complete for future in concurrent.futures.as_completed(futures): name = futures[future] try: # Get the updated status HTML status_html = future.result() # Update progress progress(sum(1 for r in results.values() if "Status: 200" in r) / len(ENDPOINTS), desc=f"Waking up servers ({sum(1 for r in results.values() if r != 'Pending...')} of {len(ENDPOINTS)} processed)") # Yield the updated status to show in real-time yield status_html except Exception as e: print(f"Error processing {name}: {str(e)}") results[name] = f"Error: Internal processing error" yield update_status_html() # Final update after all are complete progress(1.0, desc="Complete!") yield update_status_html() def process_job_description(job_description: str) -> Dict: """Process job description and extract skills using the job endpoint""" payload = {"inputs": job_description} result = call_api(ENDPOINTS["job"], payload) if not result: # Return a fallback structure if API call fails return {"skills": [], "total_skills": 0} # Format the result to match expected structure if "skills" in result: # Add a "text" field to each skill for compatibility for skill in result["skills"]: skill["text"] = skill.get("name", "Unknown Skill") result["total_skills"] = len(result["skills"]) else: result = {"skills": [], "total_skills": 0} return result def process_skill_quality(text: str) -> Dict: """Process a sentence through the skill quality endpoint""" payload = {"inputs": text} result = call_api(ENDPOINTS["skill_quality"], payload) if not result: return {"leadership": 0, "leadership_token": "No", "collaboration": 0, "collaboration_token": "No"} return result def process_skill_quality_batch(sentences): """Process multiple sentences through the skill quality endpoint concurrently""" results = [] with concurrent.futures.ThreadPoolExecutor() as executor: future_to_sentence = { executor.submit(process_skill_quality, sentence): sentence for sentence in sentences } for future in concurrent.futures.as_completed(future_to_sentence): sentence = future_to_sentence[future] try: quality_score = future.result() is_leadership = quality_score["leadership_token"] == "Yes" is_collaboration = not is_leadership and quality_score["collaboration_token"] == "Yes" results.append({ "sentence": sentence, "is_leadership": is_leadership, "is_collaboration": is_collaboration, "raw_score": quality_score }) except Exception as e: print(f"Error processing sentence: {sentence[:30]}... - {str(e)}") results.append({ "sentence": sentence, "is_leadership": False, "is_collaboration": False, "raw_score": {"leadership": 0, "leadership_token": "No", "collaboration": 0, "collaboration_token": "No"} }) return results def process_single_resume(file_path, job_skills, progress=None, resume_index=0, total_resumes=1, resume_status=None): """Process a single resume file""" progress_base = 0.4 + (0.5 * resume_index / total_resumes) progress_cap = 0.4 + (0.5 * (resume_index + 1) / total_resumes) resume_name = os.path.basename(file_path) try: if progress is not None: progress(progress_base, desc=f"Processing resume {resume_index+1}/{total_resumes}: {resume_name}...") # Update resume status if provided if resume_status is not None: resume_status[resume_name] = {"progress": 0, "status": "Starting analysis...", "sentences_processed": 0, "total_sentences": 0} print(f"Initialized status for {resume_name}") except Exception as e: print(f"Error initializing status for {resume_name}: {str(e)}") try: with open(file_path, 'r', encoding='utf-8') as f: resume_content = f.read() print(f"Starting processing of {resume_name}") result = process_resume(resume_content, job_skills, progress=progress, progress_base=progress_base, progress_cap=progress_cap, resume_status=resume_status, resume_name=resume_name) print(f"Finished processing {resume_name}") # Return both the result and the filename return result, resume_name except Exception as e: print(f"Error processing {resume_name}: {str(e)}") if resume_status is not None: resume_status[resume_name]["status"] = f"Error: {str(e)}" return {"skills": [], "total_skills": 0}, resume_name def update_resume_status_html(resume_status): """Generate HTML table for resume processing status""" html = "

Resume Processing Status:

" html += "" html += "" html += "" html += "" # Print the current status to aid debugging print(f"Current resume status: {resume_status}") for resume_name, status in resume_status.items(): progress_pct = 0 if status["total_sentences"] > 0: progress_pct = round((status["sentences_processed"] / status["total_sentences"]) * 100) progress_text = f"{status['sentences_processed']}/{status['total_sentences']} sentences ({progress_pct}%)" # Color based on progress if progress_pct == 100: color = "green" elif progress_pct > 0: color = "orange" else: color = "gray" html += f"" html += f"" html += f"" html += "
ResumeProgressStatus
{resume_name}{progress_text}{status['status']}
" return html def process_resume(resume_text: str, job_skills: List[str], progress=None, progress_base=0.4, progress_cap=0.9, resume_status=None, resume_name=None) -> Dict: """Process resume using the resume endpoint""" payload = {"inputs": resume_text} # Thread-safe updating of resume status try: if resume_status is not None and resume_name is not None: # Use a try-except block for thread safety when updating shared state try: resume_status[resume_name]["status"] = "Extracting roles..." print(f"Updating status for {resume_name}: Extracting roles...") except Exception as e: print(f"Error updating initial status for {resume_name}: {str(e)}") except Exception as e: print(f"Error in initial status update: {str(e)}") # Call the API result = call_api(ENDPOINTS["resume"], payload, max_retries=10, retry_delay=10) if not result: # Update status on failure and raise exception to indicate failure if resume_status is not None and resume_name is not None: resume_status[resume_name]["status"] = "Error: Failed to extract roles" resume_status[resume_name]["failed"] = True # Mark as failed raise Exception("Failed to extract roles from resume") # Count total sentences for progress tracking total_sentences = 0 for job in result: if "description" in job: total_sentences += len(job.get("description", [])) # Update status with total sentences - thread safe try: if resume_status is not None and resume_name is not None: resume_status[resume_name]["total_sentences"] = total_sentences print(f"Total sentences for {resume_name}: {total_sentences}") except Exception as e: print(f"Error updating total sentences for {resume_name}: {str(e)}") # Extract all skills from all job experiences all_skills = [] processed_sentences = 0 # Process skill quality for each role description for job in result: if "skills" in job: for skill in job["skills"]: # Add a "text" field for compatibility skill["text"] = skill.get("name", "Unknown Skill") all_skills.append(skill) # Process skill quality for sentences if "description" in job and job["description"]: # Get all sentences for this job sentences = job.get("description", []) # Process all sentences for this job quality_scores = process_skill_quality_batch(sentences) # Update progress after batch processing - thread safe processed_sentences += len(sentences) try: if resume_status is not None and resume_name is not None: resume_status[resume_name]["sentences_processed"] = processed_sentences resume_status[resume_name]["progress"] = round(processed_sentences/total_sentences*100) resume_status[resume_name]["status"] = f"Analyzing skill quality... ({processed_sentences}/{total_sentences})" print(f"Updated {resume_name} progress: {processed_sentences}/{total_sentences} sentences") except Exception as e: print(f"Error updating progress for {resume_name}: {str(e)}") job["quality_scores"] = quality_scores # Update status to complete - thread safe try: if resume_status is not None and resume_name is not None and not resume_status[resume_name].get("failed", False): resume_status[resume_name]["status"] = "Analysis complete" resume_status[resume_name]["sentences_processed"] = total_sentences resume_status[resume_name]["progress"] = 100 print(f"Completed analysis for {resume_name}") except Exception as e: print(f"Error updating final status for {resume_name}: {str(e)}") # Add fields to match expected structure formatted_result = { "skills": all_skills, "total_skills": len(all_skills), "roles": result # Keep the original roles data } return formatted_result # Create a helper function to format years of experience in a readable format def format_years_of_experience(years): """Format years as a combination of years and months""" full_years = int(years) months = int(round((years - full_years) * 12)) if full_years > 0 and months > 0: return f"{full_years}y {months}m" elif full_years > 0: return f"{full_years}y" elif months > 0: return f"{months}m" else: return "0" def create_html_output(job_result: Dict, resume_results: List[Dict], filenames: List[str] = None) -> str: """Create HTML output for the interface""" html = "
" # Remove the global script since it's not working # Set default filenames if not provided if not filenames: filenames = [f"Resume {i}" for i in range(1, len(resume_results) + 1)] # Job Description Section html += "

Job Description Analysis

" html += f"

Total Skills Found: {job_result['total_skills']}

" html += "

Skills:

" html += "
" for skill in job_result['skills']: html += f"{skill['text']}" html += "
" # Get job skills for matching job_skills = [skill['text'].lower() for skill in job_result['skills']] # Resume Analysis Section html += "

Resume Analysis

" # Check if we have multiple resumes to display summary table multiple_resumes = len(resume_results) > 1 # Calculate leadership and collaboration counts, and total experience for each resume leadership_counts = [] collaboration_counts = [] total_experiences = [] skill_experience_maps = [] skill_leadership_maps = [] # New map for tracking leadership skills skill_collaboration_maps = [] # New map for tracking collaboration skills for resume_result in resume_results: # Count leadership and collaboration sentences leadership_count = 0 collaboration_count = 0 # Calculate total experience total_experience = 0 skill_experience = {} skill_leadership = {} # Track which skills have leadership statements skill_collaboration = {} # Track which skills have collaboration statements if 'roles' in resume_result: for role in resume_result['roles']: # Count quality scores and track skills with leadership/collaboration if 'quality_scores' in role: for score in role['quality_scores']: if score['is_leadership']: leadership_count += 1 # Extract skills from leadership statement for skill in role.get('skills', []): skill_name = skill.get('name', '').lower() if skill_name: skill_leadership[skill_name] = True elif score['is_collaboration']: collaboration_count += 1 # Extract skills from collaboration statement for skill in role.get('skills', []): skill_name = skill.get('name', '').lower() if skill_name: skill_collaboration[skill_name] = True # Calculate experience duration if 'role_length' in role: # Convert months to years (role_length is in months) years_in_role = role['role_length'] / 12 total_experience += years_in_role # Calculate experience per skill for skill in role.get('skills', []): skill_name = skill.get('name', '').lower() if skill_name: if skill_name in skill_experience: skill_experience[skill_name] += years_in_role else: skill_experience[skill_name] = years_in_role elif 'dates' in role and role['dates']: # Fallback to old method if role_length is not available start_date = role['dates'].get('date_started', '') end_date = role['dates'].get('date_ended', '') try: # Try to extract years from dates start_year = int(''.join(filter(str.isdigit, start_date[-4:]))) if start_date else 0 end_year = int(''.join(filter(str.isdigit, end_date[-4:]))) if end_date and end_date.lower() != 'present' else time.localtime().tm_year years_in_role = max(0, end_year - start_year) total_experience += years_in_role # Calculate experience per skill for skill in role.get('skills', []): skill_name = skill.get('name', '').lower() if skill_name: if skill_name in skill_experience: skill_experience[skill_name] += years_in_role else: skill_experience[skill_name] = years_in_role except: # Skip if date parsing fails pass leadership_counts.append(leadership_count) collaboration_counts.append(collaboration_count) total_experiences.append(total_experience) skill_experience_maps.append(skill_experience) skill_leadership_maps.append(skill_leadership) skill_collaboration_maps.append(skill_collaboration) # Calculate averages for leadership and collaboration avg_leadership = sum(leadership_counts) / len(leadership_counts) if leadership_counts else 0 avg_collaboration = sum(collaboration_counts) / len(collaboration_counts) if collaboration_counts else 0 # Create summary table if multiple resumes if multiple_resumes: html += "

Match Summary

" html += "" html += "" html += "" html += "" html += "" html += "" html += "" html += "" html += "" html += "" html += "" # Create a list of resume data for sorting resume_data = [] for i, resume_result in enumerate(resume_results, 1): # Get primary skills (Java and React in this case, as per image) primary_skills = {} skill_exp = {} # Create a new dict to aggregate experience # First, aggregate all experience for each skill for skill_name, years in skill_experience_maps[i-1].items(): skill_name_lower = skill_name.lower() if skill_name_lower in skill_exp: skill_exp[skill_name_lower] = max(skill_exp[skill_name_lower], years) # Take the max experience else: skill_exp[skill_name_lower] = years # Get unique resume skills resume_skills = set(skill['text'].lower() for skill in resume_result['skills']) # Only include skills that are in both the resume and job requirements matched_skills = [] for skill in job_skills: if skill.lower() in resume_skills: matched_skills.append(skill) if skill.lower() in skill_exp: primary_skills[skill] = skill_exp[skill.lower()] # Calculate skill match # resume_skills = [skill['text'].lower() for skill in resume_result['skills']] # matched_skills = [skill for skill in resume_skills if skill in job_skills] match_count = len(matched_skills) match_percentage = round((match_count / job_result['total_skills'] * 100) if job_result['total_skills'] > 0 else 0, 1) # Get leadership and collaboration counts leadership_count = leadership_counts[i-1] collaboration_count = collaboration_counts[i-1] # Get total experience and determine category total_experience = total_experiences[i-1] # Determine leadership quality (YES/NO) leadership_quality = "YES" if leadership_count > avg_leadership * 1.2 else "NO" # Determine thoroughness quality (YES/NO) collaboration_quality = "YES" if collaboration_count > avg_collaboration * 1.2 else "NO" # Determine match category (Strong, Close, Weak) if match_percentage >= 80: match_category = "Strong Match" if leadership_quality == "YES" and collaboration_quality == "YES": match_category = "Strong Quality Match (Leadership and Collaboration)" elif leadership_quality == "YES": match_category = "Strong Quality Match (Leadership)" elif collaboration_quality == "YES": match_category = "Strong Quality Match (Collaboration)" elif match_percentage >= 50: match_category = "Close Match" if leadership_quality == "YES" and collaboration_quality == "YES": match_category = "Close Quality Match (Leadership and Collaboration)" elif leadership_quality == "YES": match_category = "Close Quality Match (Leadership)" elif collaboration_quality == "YES": match_category = "Close Match (Collaboration)" else: match_category = "Weak Match" if leadership_quality == "YES" and collaboration_quality == "YES": match_category = "Weak Quality Match (Leadership and Collaboration)" elif leadership_quality == "YES": match_category = "Weak Quality Match (Leadership)" elif collaboration_quality == "YES": match_category = "Weak Quality Match (Collaboration)" # Add to resume data list for sorting resume_data.append({ 'job_id': "JD-1", 'index': i, 'match_percentage': match_percentage, 'matched_skills': matched_skills, 'primary_skills': primary_skills, 'match_category': match_category, 'leadership_quality': leadership_quality, 'collaboration_quality': collaboration_quality, 'leadership_count': leadership_count, 'collaboration_count': collaboration_count, 'filename': filenames[i-1] }) # Sort resumes by match percentage (highest first) sorted_resumes = sorted(resume_data, key=lambda x: -x['match_percentage']) # Add rows to summary table for resume_data in sorted_resumes: # Generate table row for each candidate file_name = resume_data['filename'] # Convert 1,2,3 to A,B,C # Create a single row per candidate html += "" # Job ID html += f"" # Candidate html += f"" # Match Percentage html += f"" # Skills and Experience if resume_data['matched_skills']: # Show the first matched skill and its experience skill = resume_data['matched_skills'][0] html += f"" html += f"" else: # Show dashes for no matches html += "" html += "" # Match Category html += f"" # Leadership and Collaboration Quality with counts specific to this resume leadership_color = "#e6ffe6" if resume_data['leadership_quality'] == "YES" else "#f5f5f5" collaboration_color = "#e6ffe6" if resume_data['collaboration_quality'] == "YES" else "#f5f5f5" html += f"" html += f"" # Actions html += f"" html += "" # If there are additional matched skills, add them in subsequent rows if len(resume_data['matched_skills']) > 1: for skill in resume_data['matched_skills'][1:]: html += "" # Empty cells for the first three columns html += "" html += "" html += "" # Skill and Experience html += f"" html += f"" # Empty cells for the remaining columns html += "" html += "" html += "" html += "" html += "" html += "
JOB IDCANDIDATE% MATCHED SKILLSSKILLYEARS OF EXPERIENCEMATCH CATEGORYLEADERSHIP QUALITYCOLLABORATION QUALITYACTIONS
{resume_data['job_id']}{file_name}{resume_data['match_percentage']}%{skill}{format_years_of_experience(resume_data['primary_skills'].get(skill, 0))}--{resume_data['match_category']}Leadership: {resume_data['leadership_quality']} ({resume_data['leadership_count']})Collaboration: {resume_data['collaboration_quality']} ({resume_data['collaboration_count']})" html += f"" html += "
{skill}{format_years_of_experience(resume_data['primary_skills'].get(skill, 0))}
" # Add individual resume sections for i, resume_result in enumerate(resume_results, 1): # For multiple resumes, make detailed view hidden by default display_style = "none" if multiple_resumes else "block" # If single resume, show matched skills count if not multiple_resumes: resume_skills = [skill['text'].lower() for skill in resume_result['skills']] matched_skills = [skill for skill in resume_skills if skill in job_skills] match_count = len(matched_skills) match_percentage = round((match_count / job_result['total_skills'] * 100) if job_result['total_skills'] > 0 else 0, 1) html += f"

Skills Matched: {match_count}/{job_result['total_skills']} ({match_percentage}%)

" # Show leadership and collaboration counts leadership_count = leadership_counts[i-1] collaboration_count = collaboration_counts[i-1] total_experience = total_experiences[i-1] if total_experience < 3: category = "Entry" elif total_experience < 5: category = "Intermediate" else: category = "Advanced" # Add quality modifier if 50% above average if leadership_count > avg_leadership * 1.5: category = f"Quality {category} (Leadership)" elif collaboration_count > avg_collaboration * 1.5: category = f"Quality {category} (Collaboration)" html += f"

Leadership Sentences: {leadership_count}

" html += f"

Collaboration Sentences: {collaboration_count}

" html += f"

Total Years of Experience: {format_years_of_experience(total_experience)}

" html += f"

Category: {category}

" # Get the filename for this resume resume_file = filenames[i-1] # Detailed resume section with visibility control html += f"
" html += f"

{resume_file} Details

" # Skills Summary specific to this resume - Now inside the detail section html += "

Skills Summary

" html += "
" html += f"""""" html += "
" html += f"" html += "" html += "" html += "" html += "" html += "" # Get all unique skills for this specific resume resume_skills = set() for skill in resume_result['skills']: resume_skills.add(skill['text'].lower()) # Get skill maps for this resume skill_experience = skill_experience_maps[i-1] skill_leadership = skill_leadership_maps[i-1] skill_collaboration = skill_collaboration_maps[i-1] # Create a list of skill data for sorting skill_data = [] added_skills = set() # Track which skills we've already added # Count quality statements per skill skill_leadership_counts = {} skill_collaboration_counts = {} if 'roles' in resume_result: for role in resume_result['roles']: if 'quality_scores' in role: role_skills = {skill['name'].lower() for skill in role.get('skills', [])} for score in role['quality_scores']: for skill in role_skills: if score['is_leadership']: skill_leadership_counts[skill] = skill_leadership_counts.get(skill, 0) + 1 elif score['is_collaboration']: skill_collaboration_counts[skill] = skill_collaboration_counts.get(skill, 0) + 1 for skill in resume_skills: # Only add if we haven't seen this skill before if skill not in added_skills: # Get years of experience for this skill years = skill_experience.get(skill, 0) # Get quality counts for this skill leadership_count = skill_leadership_counts.get(skill, 0) collaboration_count = skill_collaboration_counts.get(skill, 0) # Check if skill matches job requirements is_match = skill in job_skills skill_data.append({ 'skill': skill, 'years': years, 'leadership_count': leadership_count, 'collaboration_count': collaboration_count, 'is_match': is_match }) added_skills.add(skill) # Sort skills by years of experience (descending) skill_data.sort(key=lambda x: (-x['years'], x['skill'])) # Add rows for each skill for data in skill_data: # Set initial display based on match (only show matched skills by default) display = "none" if not data['is_match'] else "" # Add row to summary table html += f"" html += f"" html += f"" html += f"" html += f"" html += f"" html += "
SkillYears of ExperienceLeadership Quality CountCollaboration Quality CountMatch
{data['skill']}{format_years_of_experience(data['years'])}{data['leadership_count']}{data['collaboration_count']}{'Yes' if data['is_match'] else 'No'}
" # Display all skills found in the resume html += "

Skills Found:

" html += "
" # Keep track of skills we've already added added_skills = set() for skill in resume_result['skills']: skill_text = skill['text'].lower() # Only add if we haven't seen this skill before if skill_text not in added_skills: # Highlight matched skills is_match = skill_text in job_skills bg_color = "#c8e6c9" if is_match else "#e0e0e0" # Green tint for matches # Add years of experience for this skill if available skill_years = skill_experience.get(skill_text, 0) experience_text = f" ({format_years_of_experience(skill_years)})" if skill_years > 0 else "" html += f"{skill['text']}{experience_text}" added_skills.add(skill_text) html += "
" # Job roles section if 'roles' in resume_result and resume_result['roles']: html += "

Job Experience:

" for role in resume_result['roles']: html += f"
" html += f"

Title: {' '.join(role.get('title', ['Unknown']))}

" if 'dates' in role and role['dates']: html += f"

Period: {role['dates'].get('date_started', 'Unknown')} to {role['dates'].get('date_ended', 'Unknown')}

" if 'role_length' in role: years = role['role_length'] / 12 months = role['role_length'] % 12 duration_text = "" if years >= 1: duration_text += f"{int(years)} year{'s' if int(years) > 1 else ''}" if months > 0: if duration_text: duration_text += " and " duration_text += f"{int(months)} month{'s' if int(months) > 1 else ''}" html += f"

Duration: {duration_text}

" html += f"

Role Skills:

" html += "
" for skill in role.get('skills', []): # Highlight matched skills in roles too skill_name = skill.get('name', 'Unknown') is_match = skill_name.lower() in job_skills bg_color = "#c8e6c9" if is_match else "#e0e0e0" # Green tint for matches html += f"{skill_name}" html += "
" # Display skill quality analysis if 'quality_scores' in role and role['quality_scores']: html += "

Skill Quality Analysis:

" html += "" html += "" for score in role['quality_scores']: leadership_class = "green-text" if score['is_leadership'] else "red-text" collab_class = "green-text" if score['is_collaboration'] else "red-text" html += f"" html += f"" html += f"" html += "
StatementLeadershipCollaboration
{score['sentence']}{'Yes' if score['is_leadership'] else 'No'}{'Yes' if score['is_collaboration'] else 'No'}
" html += "
" html += "
" html += "
" return html # Create Gradio interface with gr.Blocks(title="Beyond Keywords: Resume Analysis System", js=js_func) as demo: gr.Markdown("# Beyond Keywords: Job Description and Resume Analyzer") gr.Markdown(f"Running on Commit: {sha[:7]}") gr.Markdown("Upload a job description and resume(s) to analyze skill matches and quality.") # Remove the JavaScript that isn't working # Instead, use Gradio's built-in features for status updates # Server wake-up section with gr.Row(): wake_btn = gr.Button("Wake Servers (Do this first!) - Might take a 1-3 minutes for cold starts.") wake_status = gr.HTML(label="Server Status", value="
Click 'Wake Servers'
to initialize the system...
") # Input section with gr.Row(): with gr.Column(): job_description = gr.Textbox( label="Job Description", placeholder="Paste the job description here...", lines=13.10 ) with gr.Column(): resume_input = gr.Group() with resume_input: input_type = gr.Radio( choices=["Paste Text", "Upload File(s)"], label="Input Method", value="Paste Text" ) resume_text = gr.Textbox( label="Resume Text", placeholder="Paste the resume text here...", lines=8.85, visible=True ) resume_file = gr.Files( label="Upload Resume(s) (.txt files)", file_types=[".txt"], visible=False, interactive=True, type="filepath" ) def toggle_input(choice): return { resume_text: gr.update(visible=choice=="Paste Text"), resume_file: gr.update(visible=choice=="Upload File(s)") } input_type.change( fn=toggle_input, inputs=input_type, outputs=[resume_text, resume_file] ) submit_btn = gr.Button("Analyze", variant="primary") # Place status table before the main results resume_status = gr.HTML(label="Resume Processing Status", elem_id="resume-status-div", value="
Click 'Analyze'
to see processing status...
") # Main output area output = gr.HTML(label="Analysis Results", value="
") # Function to process with real-time updates using generators def process_with_updates(job_description, input_type, resume_text, resume_files): """Process inputs with real-time status updates using generators""" # Initialize resume status tracking resume_status = {} # Initial status HTML status_html = "

Resume Processing Status:

Preparing to analyze resumes...

" yield status_html, gr.update(value="
Processing job description...
") # Process job description first job_result = process_job_description(job_description) job_skills = [skill['text'] for skill in job_result['skills']] # Set up initial status table resume_results = [] filenames = [] if input_type == "Paste Text": # Process single resume resume_name = "Pasted Resume" resume_status[resume_name] = { "progress": 0, "status": "Starting analysis...", "sentences_processed": 0, "total_sentences": 0, "failed": False } status_html = update_resume_status_html(resume_status) yield status_html, gr.update(value="
Extracting resume roles...
") try: resume_result = process_resume(resume_text, job_skills, resume_status=resume_status, resume_name=resume_name) resume_results.append(resume_result) filenames.append(resume_name) except Exception as e: print(f"Error processing pasted resume: {str(e)}") resume_status[resume_name]["failed"] = True resume_status[resume_name]["status"] = f"Error: {str(e)}" # Update status status_html = update_resume_status_html(resume_status) yield status_html, gr.update(value="
Generating final report...
") else: # Process multiple resumes resume_count = len(resume_files) # Initialize status for each resume for file_path in resume_files: resume_name = os.path.basename(file_path) resume_status[resume_name] = { "progress": 0, "status": "Queued", "sentences_processed": 0, "total_sentences": 0, "failed": False } # Initial status update status_html = update_resume_status_html(resume_status) yield status_html, gr.update(value=f"
Processing {resume_count} resumes...
") # Create a thread pool to process resumes concurrently with concurrent.futures.ThreadPoolExecutor() as executor: # Create a dict to track all futures future_to_resume = {} # Submit all resume processing tasks for file_path in resume_files: resume_name = os.path.basename(file_path) # Update status to "Processing" resume_status[resume_name]["status"] = "Starting analysis..." # Read the file content try: with open(file_path, 'r', encoding='utf-8') as f: resume_content = f.read() # Submit the processing task future = executor.submit( process_resume, resume_content, job_skills, resume_status=resume_status, resume_name=resume_name ) future_to_resume[future] = resume_name except Exception as e: print(f"Error reading {resume_name}: {str(e)}") resume_status[resume_name]["status"] = f"Error: {str(e)}" resume_status[resume_name]["failed"] = True # Process with status updates pending = set(future_to_resume.keys()) completed = 0 # Use a while loop to check pending futures and update status while pending: # Yield current status every iteration status_html = update_resume_status_html(resume_status) yield status_html, gr.update(value=f"
Processed {completed}/{resume_count} resumes...
") # Wait for the next future to complete (with short timeout) done, pending = concurrent.futures.wait( pending, timeout=1.0, return_when=concurrent.futures.FIRST_COMPLETED ) # Process completed futures for future in done: resume_name = future_to_resume[future] try: result = future.result() if not resume_status[resume_name].get("failed", False): resume_results.append(result) filenames.append(resume_name) completed += 1 print(f"Completed processing of {resume_name} ({completed}/{resume_count})") except Exception as e: print(f"Error processing resume {resume_name}: {str(e)}") resume_status[resume_name]["status"] = f"Error: {str(e)}" resume_status[resume_name]["failed"] = True # Final status update status_html = update_resume_status_html(resume_status) yield status_html, gr.update(value=f"
All {resume_count} resumes processed. Generating report...
") # Generate final output html_output = create_html_output(job_result, resume_results, filenames) # Final yield with complete status and output status_html = update_resume_status_html(resume_status) yield status_html, html_output # Connect the submit button to the processing function with real-time updates submit_btn.click( fn=process_with_updates, inputs=[job_description, input_type, resume_text, resume_file], outputs=[resume_status, output] ) # Connect wake button wake_btn.click(fn=wake_servers, inputs=None, outputs=wake_status) gr.Markdown("""
""") if __name__ == "__main__": demo.launch()