Spaces:
Running
Running
| from core.pipeline.step import PipelineStep | |
| from observability import logger as obs_logger | |
| from observability import components as obs_components | |
| from datetime import datetime | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class IntelligenceStep(PipelineStep): | |
| name = "intelligence_generation" | |
| def __init__( | |
| self, | |
| insights_agent, | |
| plan_agent, | |
| brief_service, | |
| weekly_repo, | |
| guardrail_service, | |
| positioning_change_service, | |
| goal_progress_service, | |
| goal_service | |
| ): | |
| self.insights_agent = insights_agent | |
| self.plan_agent = plan_agent | |
| self.brief_service = brief_service | |
| self.weekly_repo = weekly_repo | |
| self.guardrail_service = guardrail_service | |
| self.positioning_change_service = positioning_change_service | |
| self.goal_progress_service = goal_progress_service | |
| self.goal_service = goal_service | |
| async def run(self, context): | |
| if context.weekly_snapshot is None: | |
| obs_logger.log_event( | |
| "info", | |
| "Skipping insights generation: no run data available", | |
| event="insights_skipped_no_data", | |
| component=obs_components.PIPELINE, | |
| ) | |
| return | |
| # NEW: Gate by intelligence flag if present in context | |
| enable_intel = getattr(context, "enable_intelligence", True) | |
| if not enable_intel: | |
| obs_logger.log_event( | |
| "info", | |
| "Skipping intelligence generation: historical week (LLM disabled)", | |
| event="insights_skipped_historical", | |
| component=obs_components.PIPELINE, | |
| fields={"week_start": str(context.weekly_snapshot.week_start_date)} | |
| ) | |
| context.insights = context.last_insights | |
| context.plan = context.last_plan | |
| return | |
| state_changed = self.positioning_change_service.has_changed( | |
| context.last_runner_positioning, | |
| context.runner_positioning | |
| ) | |
| has_comparison = getattr(context.runner_positioning, "comparison_available", False) | |
| positioning_status = getattr(context.runner_positioning, "position_status", None) | |
| is_baseline = positioning_status in [None, "UNKNOWN", "BASELINE_BUILDING"] | |
| has_existing_intelligence = len(context.last_insights) or context.last_plan is not None | |
| # --- Decision logic --- | |
| should_generate_intelligence = ( | |
| (state_changed and has_comparison) or # normal case | |
| is_baseline or # baseline case (NEW) | |
| not has_existing_intelligence # safety fallback (NEW) | |
| ) | |
| if should_generate_intelligence: | |
| obs_logger.log_event( | |
| "info", | |
| "Intelligence generation triggered", | |
| event="intelligence_generated", | |
| component=obs_components.ORCHESTRATOR, | |
| fields={ | |
| "state_changed": state_changed, | |
| "comparison_available": has_comparison, | |
| "baseline": is_baseline, | |
| "has_existing": has_existing_intelligence | |
| } | |
| ) | |
| if context.insights is None: | |
| await self._run_insights(context) | |
| if context.plan is None: | |
| await self._run_plan(context) | |
| # Brief is always sequential as it depends on trends/snapshot being stable | |
| res = await self._run_brief(context) | |
| if isinstance(res, tuple): | |
| brief, focus = res | |
| else: | |
| brief, focus = await res | |
| # Update cache in context | |
| self._update_cache(context, brief, focus) | |
| else: | |
| obs_logger.log_event( | |
| "info", | |
| f"Reuse intelligence for state: {context.runner_positioning.position_status}", | |
| event="intelligence_reused", | |
| component=obs_components.ORCHESTRATOR, | |
| positioning_state=context.runner_positioning.position_status | |
| ) | |
| context.insights = context.last_insights | |
| context.plan = context.last_plan | |
| # Apply cached brief to current snapshot | |
| if context.weekly_snapshot: | |
| context.weekly_snapshot.performance_brief = context.last_brief | |
| context.weekly_snapshot.performance_focus = context.last_focus | |
| if self.weekly_repo: | |
| self.weekly_repo.save(context.weekly_snapshot) | |
| def run_parallel_agents(self, context): | |
| """ | |
| Extraction point for parallel execution. | |
| Returns a list of awaitable tasks for independent agents. | |
| """ | |
| if context.weekly_snapshot is None: | |
| return [] | |
| # Check if state changed before deciding to run parallel tasks | |
| # NOTE: This repeats the logic but ensures we don't fire LLM calls if not needed | |
| state_changed = self.positioning_change_service.has_changed(context.last_runner_positioning, context.runner_positioning) | |
| if not state_changed: | |
| return [] | |
| return [ | |
| self._run_insights(context), | |
| self._run_plan(context) | |
| ] | |
| async def _run_insights(self, context): | |
| latest_run = context.runs[-1] if context.runs else None | |
| if isinstance(latest_run, dict): | |
| from domain.training.run import Run | |
| latest_run = Run(**latest_run) | |
| if context.weekly_snapshot is None or not context.runs: | |
| obs_logger.log_event( | |
| "info", | |
| "Skipping insights generation: no run data available", | |
| event="insights_skipped_no_data", | |
| component=obs_components.PIPELINE, | |
| ) | |
| return | |
| context.insights = await self.insights_agent.run( | |
| latest_run, | |
| context.trends, | |
| risk_level=context.risk_assessment.risk_level, | |
| language=context.language, | |
| profile=context.runner_profile, | |
| goal=context.active_goal, | |
| ) | |
| async def _run_plan(self, context): | |
| plan_result = await self.plan_agent.run( | |
| context.summary, language=context.language, profile=context.runner_profile, goal=context.active_goal | |
| ) | |
| draft_plan = plan_result.get("plan", "") | |
| context.plan = self.guardrail_service.apply(draft_plan, context.risk_assessment) | |
| async def _run_brief(self, context): | |
| brief, focus = "", "" | |
| if self.brief_service and context.weekly_snapshot and context.weekly_trend: | |
| goal_progress = self.goal_progress_service.compute( | |
| self.goal_service, context.active_goal, context.weekly_snapshot, context.weekly_trend | |
| ) | |
| brief, focus = await self.brief_service.generate_brief( | |
| context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language | |
| ) | |
| context.weekly_snapshot.performance_brief = brief | |
| context.weekly_snapshot.performance_focus = focus | |
| context.weekly_snapshot.brief_source_hash = self.brief_service.compute_brief_hash( | |
| context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language | |
| ) | |
| context.weekly_snapshot.brief_generated_at = datetime.now() | |
| if self.weekly_repo: | |
| self.weekly_repo.save(context.weekly_snapshot) | |
| return brief, focus | |
| def _update_cache(self, context, brief, focus): | |
| context.last_runner_positioning = context.runner_positioning | |
| context.last_insights = context.insights | |
| context.last_plan = context.plan | |
| context.last_brief = brief | |
| context.last_focus = focus | |