Spaces:
Runtime error
Runtime error
| """ | |
| Parallel Agent Executor | |
| Implements async parallel execution of agents for faster processing | |
| Based on the parallel agent pattern for improved performance | |
| """ | |
| import asyncio | |
| import time | |
| import logging | |
| from typing import List, Dict, Any, Tuple, Optional | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| import nest_asyncio | |
| import matplotlib.pyplot as plt | |
| from concurrent.futures import ThreadPoolExecutor | |
| from models.schemas import JobPosting, ResumeDraft, CoverLetterDraft, OrchestrationResult | |
| # Apply nest_asyncio to allow nested event loops (useful in Jupyter/Gradio) | |
| try: | |
| nest_asyncio.apply() | |
| except: | |
| pass | |
| logger = logging.getLogger(__name__) | |
| class AgentResult: | |
| """Result from an agent execution""" | |
| agent_name: str | |
| output: Any | |
| start_time: float | |
| end_time: float | |
| duration: float | |
| success: bool | |
| error: Optional[str] = None | |
| class ParallelAgentExecutor: | |
| """Execute multiple agents in parallel for faster processing""" | |
| def __init__(self, max_workers: int = 4): | |
| self.max_workers = max_workers | |
| self.executor = ThreadPoolExecutor(max_workers=max_workers) | |
| self.execution_history: List[Tuple[str, float, float]] = [] | |
| async def run_agent_async( | |
| self, | |
| agent_func: callable, | |
| agent_name: str, | |
| *args, | |
| **kwargs | |
| ) -> AgentResult: | |
| """Run a single agent asynchronously""" | |
| start_time = time.time() | |
| try: | |
| # Log start | |
| logger.info(f"Starting {agent_name} at {datetime.now()}") | |
| # Run the agent function | |
| if asyncio.iscoroutinefunction(agent_func): | |
| result = await agent_func(*args, **kwargs) | |
| else: | |
| # Run sync function in executor | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor( | |
| self.executor, | |
| agent_func, | |
| *args | |
| ) | |
| end_time = time.time() | |
| duration = end_time - start_time | |
| # Track execution | |
| self.execution_history.append((agent_name, start_time, end_time)) | |
| logger.info(f"Completed {agent_name} in {duration:.2f}s") | |
| return AgentResult( | |
| agent_name=agent_name, | |
| output=result, | |
| start_time=start_time, | |
| end_time=end_time, | |
| duration=duration, | |
| success=True | |
| ) | |
| except Exception as e: | |
| end_time = time.time() | |
| duration = end_time - start_time | |
| logger.error(f"Error in {agent_name}: {str(e)}") | |
| return AgentResult( | |
| agent_name=agent_name, | |
| output=None, | |
| start_time=start_time, | |
| end_time=end_time, | |
| duration=duration, | |
| success=False, | |
| error=str(e) | |
| ) | |
| async def run_parallel_agents( | |
| self, | |
| agents: List[Dict[str, Any]] | |
| ) -> Dict[str, AgentResult]: | |
| """ | |
| Run multiple agents in parallel | |
| Args: | |
| agents: List of dicts with 'name', 'func', 'args', 'kwargs' | |
| Returns: | |
| Dict mapping agent names to results | |
| """ | |
| tasks = [] | |
| for agent in agents: | |
| task = self.run_agent_async( | |
| agent['func'], | |
| agent['name'], | |
| *agent.get('args', []), | |
| **agent.get('kwargs', {}) | |
| ) | |
| tasks.append(task) | |
| # Run all agents in parallel | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Map results by name | |
| result_map = {} | |
| for i, agent in enumerate(agents): | |
| if isinstance(results[i], Exception): | |
| result_map[agent['name']] = AgentResult( | |
| agent_name=agent['name'], | |
| output=None, | |
| start_time=time.time(), | |
| end_time=time.time(), | |
| duration=0, | |
| success=False, | |
| error=str(results[i]) | |
| ) | |
| else: | |
| result_map[agent['name']] = results[i] | |
| return result_map | |
| def plot_timeline(self, save_path: Optional[str] = None): | |
| """Plot execution timeline of agents""" | |
| if not self.execution_history: | |
| logger.warning("No execution history to plot") | |
| return | |
| # Normalize times to zero | |
| base = min(start for _, start, _ in self.execution_history) | |
| # Prepare data | |
| labels = [] | |
| start_offsets = [] | |
| durations = [] | |
| for name, start, end in self.execution_history: | |
| labels.append(name) | |
| start_offsets.append(start - base) | |
| durations.append(end - start) | |
| # Create plot | |
| plt.figure(figsize=(10, 6)) | |
| plt.barh(labels, durations, left=start_offsets, height=0.5) | |
| plt.xlabel("Seconds since start") | |
| plt.title("Agent Execution Timeline") | |
| plt.grid(True, alpha=0.3) | |
| # Add duration labels | |
| for i, (offset, duration) in enumerate(zip(start_offsets, durations)): | |
| plt.text(offset + duration/2, i, f'{duration:.2f}s', | |
| ha='center', va='center', color='white', fontsize=8) | |
| plt.tight_layout() | |
| if save_path: | |
| plt.savefig(save_path) | |
| logger.info(f"Timeline saved to {save_path}") | |
| else: | |
| plt.show() | |
| return plt.gcf() | |
| class ParallelJobProcessor: | |
| """Process multiple jobs in parallel using agent parallelization""" | |
| def __init__(self): | |
| self.executor = ParallelAgentExecutor(max_workers=4) | |
| async def process_jobs_parallel( | |
| self, | |
| jobs: List[JobPosting], | |
| cv_agent_func: callable, | |
| cover_agent_func: callable, | |
| research_func: callable = None, | |
| **kwargs | |
| ) -> List[OrchestrationResult]: | |
| """ | |
| Process multiple jobs in parallel | |
| Each job gets: | |
| 1. Resume generation | |
| 2. Cover letter generation | |
| 3. Optional web research | |
| All running in parallel per job | |
| """ | |
| all_results = [] | |
| for job in jobs: | |
| # Define agents for this job | |
| agents = [ | |
| { | |
| 'name': f'Resume_{job.company}', | |
| 'func': cv_agent_func, | |
| 'args': [job], | |
| 'kwargs': kwargs | |
| }, | |
| { | |
| 'name': f'CoverLetter_{job.company}', | |
| 'func': cover_agent_func, | |
| 'args': [job], | |
| 'kwargs': kwargs | |
| } | |
| ] | |
| # Add research if available | |
| if research_func: | |
| agents.append({ | |
| 'name': f'Research_{job.company}', | |
| 'func': research_func, | |
| 'args': [job.company], | |
| 'kwargs': {} | |
| }) | |
| # Run agents in parallel for this job | |
| results = await self.executor.run_parallel_agents(agents) | |
| # Combine results | |
| orchestration_result = OrchestrationResult( | |
| job=job, | |
| resume=results[f'Resume_{job.company}'].output, | |
| cover_letter=results[f'CoverLetter_{job.company}'].output, | |
| keywords=[], # Would be extracted | |
| research=results.get(f'Research_{job.company}', {}).output if research_func else None | |
| ) | |
| all_results.append(orchestration_result) | |
| # Generate timeline | |
| self.executor.plot_timeline(save_path="parallel_execution_timeline.png") | |
| return all_results | |
| class MetaAgent: | |
| """ | |
| Meta-agent that combines outputs from multiple specialized agents | |
| Similar to the article's pattern of combining summaries | |
| """ | |
| def __init__(self): | |
| self.executor = ParallelAgentExecutor() | |
| async def analyze_job_fit( | |
| self, | |
| job: JobPosting, | |
| resume: ResumeDraft | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run multiple analysis agents in parallel and combine results | |
| """ | |
| # Define specialized analysis agents | |
| agents = [ | |
| { | |
| 'name': 'SkillsMatcher', | |
| 'func': self._match_skills, | |
| 'args': [job, resume] | |
| }, | |
| { | |
| 'name': 'ExperienceAnalyzer', | |
| 'func': self._analyze_experience, | |
| 'args': [job, resume] | |
| }, | |
| { | |
| 'name': 'CultureFit', | |
| 'func': self._assess_culture_fit, | |
| 'args': [job, resume] | |
| }, | |
| { | |
| 'name': 'SalaryEstimator', | |
| 'func': self._estimate_salary_fit, | |
| 'args': [job, resume] | |
| } | |
| ] | |
| # Run all agents in parallel | |
| results = await self.executor.run_parallel_agents(agents) | |
| # Combine into executive summary | |
| summary = self._combine_analyses(results) | |
| return summary | |
| def _match_skills(self, job: JobPosting, resume: ResumeDraft) -> Dict: | |
| """Match skills between job and resume""" | |
| job_skills = set(job.description.lower().split()) | |
| resume_skills = set(resume.text.lower().split()) | |
| matched = job_skills & resume_skills | |
| missing = job_skills - resume_skills | |
| return { | |
| 'matched_skills': len(matched), | |
| 'missing_skills': len(missing), | |
| 'match_percentage': len(matched) / len(job_skills) * 100 if job_skills else 0, | |
| 'top_matches': list(matched)[:10] | |
| } | |
| def _analyze_experience(self, job: JobPosting, resume: ResumeDraft) -> Dict: | |
| """Analyze experience relevance""" | |
| # Simplified analysis | |
| return { | |
| 'years_experience': 5, # Would extract from resume | |
| 'relevant_roles': 3, | |
| 'industry_match': True | |
| } | |
| def _assess_culture_fit(self, job: JobPosting, resume: ResumeDraft) -> Dict: | |
| """Assess cultural fit""" | |
| return { | |
| 'remote_preference': 'remote' in job.location.lower() if job.location else False, | |
| 'company_size_fit': True, | |
| 'values_alignment': 0.8 | |
| } | |
| def _estimate_salary_fit(self, job: JobPosting, resume: ResumeDraft) -> Dict: | |
| """Estimate salary fit""" | |
| return { | |
| 'estimated_range': '$100k-$150k', | |
| 'market_rate': True, | |
| 'negotiation_room': 'moderate' | |
| } | |
| def _combine_analyses(self, results: Dict[str, AgentResult]) -> Dict: | |
| """Combine all analyses into executive summary""" | |
| summary = { | |
| 'overall_fit_score': 0, | |
| 'strengths': [], | |
| 'gaps': [], | |
| 'recommendations': [], | |
| 'detailed_analysis': {} | |
| } | |
| # Extract successful results | |
| for name, result in results.items(): | |
| if result.success and result.output: | |
| summary['detailed_analysis'][name] = result.output | |
| # Calculate overall score | |
| if 'SkillsMatcher' in summary['detailed_analysis']: | |
| skills_score = summary['detailed_analysis']['SkillsMatcher'].get('match_percentage', 0) | |
| summary['overall_fit_score'] = skills_score | |
| # Generate recommendations | |
| if summary['overall_fit_score'] > 70: | |
| summary['recommendations'].append("Strong candidate - proceed with application") | |
| elif summary['overall_fit_score'] > 50: | |
| summary['recommendations'].append("Moderate fit - customize resume for better match") | |
| else: | |
| summary['recommendations'].append("Low fit - consider if this role aligns with goals") | |
| return summary | |
| # Usage example | |
| async def demo_parallel_execution(): | |
| """Demonstrate parallel agent execution""" | |
| # Create executor | |
| executor = ParallelAgentExecutor() | |
| # Define sample agents | |
| async def agent1(): | |
| await asyncio.sleep(2) | |
| return "Agent 1 result" | |
| async def agent2(): | |
| await asyncio.sleep(1) | |
| return "Agent 2 result" | |
| async def agent3(): | |
| await asyncio.sleep(3) | |
| return "Agent 3 result" | |
| agents = [ | |
| {'name': 'FastAgent', 'func': agent2}, | |
| {'name': 'MediumAgent', 'func': agent1}, | |
| {'name': 'SlowAgent', 'func': agent3} | |
| ] | |
| # Run in parallel | |
| results = await executor.run_parallel_agents(agents) | |
| # Show results | |
| for name, result in results.items(): | |
| print(f"{name}: {result.output} (took {result.duration:.2f}s)") | |
| # Plot timeline | |
| executor.plot_timeline() | |
| if __name__ == "__main__": | |
| # Run demo | |
| asyncio.run(demo_parallel_execution()) |