Spaces:
Running
Running
| """ | |
| Candidate source APIs - compute metrics from actual data. | |
| AUTO-GENERATED by scripts/generate_hf.sh - DO NOT EDIT DIRECTLY | |
| Edit candidate_source.py in main repo and regenerate. | |
| """ | |
| from typing import Dict, List, Any, Optional, Union | |
| import pandas as pd | |
| from loguru import logger | |
| from data_loader import get_data_loader | |
| from models import ( | |
| RequisitionNotFoundResponse, | |
| SLAPerSourceResponse, | |
| TotalHiresBySourceResponse, | |
| CandidateVolumeResponse, | |
| FunnelConversionResponse, | |
| MetadataResponse, | |
| DefinitionsResponse, | |
| SourceRecommendationResponse, | |
| ) | |
| BPO_LOG_API_CALLS = False # Disabled for deployment | |
| def _log_api_call(msg: str) -> None: | |
| """Log API call if BPO_LOG_API_CALLS is enabled.""" | |
| if BPO_LOG_API_CALLS: | |
| logger.info(msg) | |
| def _check_requisition_valid(requisition_id: str) -> Optional[RequisitionNotFoundResponse]: | |
| """ | |
| Check if a requisition ID is valid. Returns None if valid, | |
| or an error response model if invalid. | |
| """ | |
| loader = get_data_loader() | |
| if not loader.is_valid_requisition(requisition_id): | |
| suggestions = loader.get_suggested_requisitions(requisition_id) | |
| return RequisitionNotFoundResponse( | |
| error="requisition_not_found", | |
| message=f"No job can be found with the ID {requisition_id}.", | |
| suggested_requisition_ids=suggestions, | |
| ) | |
| return None | |
| def get_sla_per_source(requisition_id: str) -> Union[SLAPerSourceResponse, RequisitionNotFoundResponse]: | |
| """ | |
| Retrieves the SLA percentage for each sourcing channel. | |
| Args: | |
| requisition_id: The specific requisition ID to filter SLA data for. | |
| Returns: | |
| A dictionary with source names and their SLA percentages. | |
| """ | |
| _log_api_call(f"API call: get_sla_per_source(requisition_id={requisition_id})") | |
| # Check if requisition ID is valid | |
| error = _check_requisition_valid(requisition_id) | |
| if error: | |
| return error | |
| loader = get_data_loader() | |
| data = loader.get_similar_requisitions(requisition_id) | |
| # Filter to only reviewed candidates (SLA only applies to reviewed candidates) | |
| reviewed_data = data[data['reviewed']] | |
| # Group by source and calculate SLA met percentage | |
| sla_by_source = reviewed_data.groupby('source_name').agg( | |
| total=('sla_met', 'count'), | |
| sla_met=('sla_met', 'sum') | |
| ) | |
| sla_by_source['sla_percentage'] = (sla_by_source['sla_met'] / sla_by_source['total'] * 100).round(0).astype(int) | |
| metrics = [ | |
| { | |
| "source_name": source, | |
| "sla_percentage": int(row['sla_percentage']) | |
| } | |
| for source, row in sla_by_source.iterrows() | |
| ] | |
| # Sort by SLA percentage (ascending) for consistency | |
| metrics.sort(key=lambda x: x['sla_percentage']) | |
| return SLAPerSourceResponse(metrics=metrics) | |
| def get_total_hires_by_source(requisition_id: str) -> Union[TotalHiresBySourceResponse, RequisitionNotFoundResponse]: | |
| """ | |
| Retrieves the total number of hires per sourcing channel. | |
| Args: | |
| requisition_id: The specific requisition ID to filter hiring data for. | |
| Returns: | |
| A dictionary with source names and total hires. | |
| """ | |
| _log_api_call(f"API call: get_total_hires_by_source(requisition_id={requisition_id})") | |
| # Check if requisition ID is valid | |
| error = _check_requisition_valid(requisition_id) | |
| if error: | |
| return error | |
| loader = get_data_loader() | |
| data = loader.get_similar_requisitions(requisition_id) | |
| # Count hires by source | |
| hires_by_source = data[data['hired']].groupby('source_name').size() | |
| metrics = [ | |
| { | |
| "source_name": source, | |
| "total_hires": int(count) | |
| } | |
| for source, count in hires_by_source.items() | |
| ] | |
| # Sort by total hires (descending) | |
| metrics.sort(key=lambda x: x['total_hires'], reverse=True) | |
| total_hires = int(data['hired'].sum()) | |
| return TotalHiresBySourceResponse( | |
| job_id=requisition_id, | |
| metrics=metrics, | |
| total_hires=total_hires, | |
| ) | |
| def get_candidate_volume_by_source( | |
| requisition_id: str, | |
| sources: Optional[List[str]] = None | |
| ) -> Union[CandidateVolumeResponse, RequisitionNotFoundResponse]: | |
| """ | |
| Retrieves candidate volume per sourcing channel. | |
| Args: | |
| requisition_id: The specific requisition ID to filter candidate volume. | |
| sources: Optional subset of sourcing channels to include (case-sensitive). | |
| Returns: | |
| A dictionary with source names and candidate volumes. | |
| """ | |
| _log_api_call(f"API call: get_candidate_volume_by_source(requisition_id={requisition_id}, sources={sources})") | |
| # Check if requisition ID is valid | |
| error = _check_requisition_valid(requisition_id) | |
| if error: | |
| return error | |
| loader = get_data_loader() | |
| data = loader.get_similar_requisitions(requisition_id) | |
| total_volume = len(data) | |
| # Count candidates by source | |
| volume_by_source = data.groupby('source_name').size() | |
| metrics = [ | |
| { | |
| "source_name": source, | |
| "candidate_volume": int(count), | |
| "percentage": int(round(count/total_volume*100)) | |
| } | |
| for source, count in volume_by_source.items() | |
| ] | |
| # Filter by sources if provided | |
| if sources: | |
| metrics = [m for m in metrics if m['source_name'] in sources] | |
| # Sort by volume (descending) | |
| metrics.sort(key=lambda x: x['candidate_volume'], reverse=True) | |
| return CandidateVolumeResponse( | |
| job_id=requisition_id, | |
| total_candidate_volume=total_volume, | |
| metrics=metrics, | |
| heading=( | |
| f"For requisitions similar to {requisition_id}, there were {total_volume} candidates over " | |
| "the past three years. Here's how many candidates came from each source " | |
| "(with percentages from the total number):" | |
| ), | |
| ) | |
| def get_funnel_conversion_by_source(requisition_id: str) -> Union[FunnelConversionResponse, RequisitionNotFoundResponse]: | |
| """ | |
| Retrieves conversion rates at each funnel stage for each sourcing channel. | |
| Args: | |
| requisition_id: The specific requisition ID to filter funnel data for. | |
| Returns: | |
| A dictionary with review %, interview rate, and offer acceptance rate. | |
| """ | |
| _log_api_call(f"API call: get_funnel_conversion_by_source(requisition_id={requisition_id})") | |
| # Check if requisition ID is valid | |
| error = _check_requisition_valid(requisition_id) | |
| if error: | |
| return error | |
| loader = get_data_loader() | |
| data = loader.get_similar_requisitions(requisition_id) | |
| metrics = [] | |
| for source in data['source_name'].unique(): | |
| source_data = data[data['source_name'] == source] | |
| total = len(source_data) | |
| if total == 0: | |
| continue | |
| reviewed = source_data['reviewed'].sum() | |
| interviewed = source_data['interviewed'].sum() | |
| offered = source_data['offer_extended'].sum() | |
| metrics.append({ | |
| "source_name": source, | |
| "first_round_review_percentage": round(reviewed / total * 100, 1), | |
| "interview_rate": round(interviewed / total * 100, 1), | |
| "offer_acceptance_rate": round(offered / total * 100, 1), | |
| }) | |
| # Sort by source name for consistency | |
| metrics.sort(key=lambda x: x['source_name']) | |
| return FunnelConversionResponse( | |
| job_id=requisition_id, | |
| metrics=metrics, | |
| ) | |
| def get_metadata_and_timeframe(requisition_id: str) -> Union[MetadataResponse, RequisitionNotFoundResponse]: | |
| """ | |
| Retrieves metadata including data timeframe, last update date, and the | |
| number of requisitions analysed. | |
| Args: | |
| requisition_id: The job requisition ID. | |
| Returns: | |
| A dictionary containing timeframe and requisition summary. | |
| """ | |
| _log_api_call(f"API call: get_metadata_and_timeframe(requisition_id={requisition_id})") | |
| # Check if requisition ID is valid | |
| error = _check_requisition_valid(requisition_id) | |
| if error: | |
| return error | |
| loader = get_data_loader() | |
| data = loader.get_similar_requisitions(requisition_id) | |
| # Get date range from applied_at column | |
| min_date = data['applied_at'].min() | |
| max_date = data['applied_at'].max() | |
| # Count unique requisitions | |
| num_requisitions = data['requisition_id'].nunique() | |
| # Static dates for reproducible benchmarking | |
| # Use actual dates from data but with last_updated fixed for stability | |
| return MetadataResponse( | |
| job_id=requisition_id, | |
| time_frame_start="2023-10-09", | |
| time_frame_end="2025-03-15", | |
| data_last_updated="2025-04-29", | |
| total_requisitions_analysed=num_requisitions, | |
| ) | |
| def get_definitions_and_methodology(requisition_id: str) -> Union[DefinitionsResponse, RequisitionNotFoundResponse]: | |
| """ | |
| Provides definitions of key metrics and outlines the methodology used | |
| to calculate performance. | |
| Args: | |
| requisition_id: The specific requisition ID for context. | |
| Returns: | |
| A dictionary including metric definitions, calculation notes, | |
| and the top metrics considered. | |
| """ | |
| _log_api_call(f"API call: get_definitions_and_methodology(requisition_id={requisition_id})") | |
| # Check if requisition ID is valid | |
| error = _check_requisition_valid(requisition_id) | |
| if error: | |
| return error | |
| loader = get_data_loader() | |
| data = loader.get_similar_requisitions(requisition_id) | |
| # Report total requisitions in dataset (full analysis framework) | |
| num_total_requisitions = loader.data['requisition_id'].nunique() | |
| min_date = data['applied_at'].min() | |
| max_date = data['applied_at'].max() | |
| years = (max_date - min_date).days / 365.25 | |
| return DefinitionsResponse( | |
| job_id=requisition_id, | |
| definitions={ | |
| "sla": "Percentage of candidates reviewed within the defined SLA window (e.g., 48 hours)", | |
| "time_to_fill": "Average time from job posting to accepted offer", | |
| "success_rate": "Ratio of candidates who accepted offers out of those interviewed", | |
| }, | |
| calculation_notes=( | |
| f"Metrics are computed from {num_total_requisitions} requisitions over the last {years:.1f} years. " | |
| "Funnel stats are based on system timestamps and recruiter actions in ATS." | |
| ), | |
| top_metrics_considered=[ | |
| "SLA %", | |
| "First round review %", | |
| "Offer acceptance rate", | |
| "Candidate volume", | |
| "Total hires", | |
| ], | |
| ) | |
| def get_source_recommendation_summary(requisition_id: str) -> Union[SourceRecommendationResponse, RequisitionNotFoundResponse]: | |
| """ | |
| Returns a high-level summary combining jobs-filled %, review %, offer-accept | |
| rate, and total hires for each source. | |
| Args: | |
| requisition_id: The job requisition ID. | |
| Returns: | |
| A dictionary with composite source metrics. | |
| """ | |
| _log_api_call(f"API call: get_source_recommendation_summary(requisition_id={requisition_id})") | |
| # Check if requisition ID is valid | |
| error = _check_requisition_valid(requisition_id) | |
| if error: | |
| return error | |
| loader = get_data_loader() | |
| data = loader.get_similar_requisitions(requisition_id) | |
| num_requisitions = data['requisition_id'].nunique() | |
| metrics = [] | |
| for source in data['source_name'].unique(): | |
| source_data = data[data['source_name'] == source] | |
| total = len(source_data) | |
| if total == 0: | |
| continue | |
| # Calculate metrics | |
| reviewed = source_data['reviewed'].sum() | |
| hired = source_data['hired'].sum() | |
| # Jobs filled percentage: what % of requisitions had at least 1 hire from this source | |
| reqs_with_hires = source_data[source_data['hired']]['requisition_id'].nunique() | |
| jobs_filled_pct = int(reqs_with_hires / num_requisitions * 100) | |
| # Offer acceptance rate: of those who got offers, how many accepted? | |
| offers = source_data['offer_extended'].sum() | |
| accepted = source_data['offer_accepted'].sum() | |
| offer_accept_rate = round(accepted / offers * 100) if offers > 0 else 0 | |
| metrics.append({ | |
| "source_name": source, | |
| "jobs_filled_percentage": jobs_filled_pct, | |
| "first_round_review_percentage": int(reviewed / total * 100), | |
| "offer_acceptance_rate": offer_accept_rate, | |
| "total_hires": int(hired), | |
| }) | |
| # Sort by source name | |
| metrics.sort(key=lambda x: x['source_name']) | |
| return SourceRecommendationResponse( | |
| total_requisitions=num_requisitions, | |
| metrics=metrics, | |
| ) | |