runner-ai-intelligence / src /pipeline_steps /intelligence_step.py
avfranco's picture
HF Space deploy snapshot (minimal allow-list)
d64fd55
from core.pipeline.step import PipelineStep
from observability import logger as obs_logger
from observability import components as obs_components
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class IntelligenceStep(PipelineStep):
name = "intelligence_generation"
def __init__(
self,
insights_agent,
plan_agent,
brief_service,
weekly_repo,
guardrail_service,
positioning_change_service,
goal_progress_service,
goal_service
):
self.insights_agent = insights_agent
self.plan_agent = plan_agent
self.brief_service = brief_service
self.weekly_repo = weekly_repo
self.guardrail_service = guardrail_service
self.positioning_change_service = positioning_change_service
self.goal_progress_service = goal_progress_service
self.goal_service = goal_service
async def run(self, context):
if context.weekly_snapshot is None:
obs_logger.log_event(
"info",
"Skipping insights generation: no run data available",
event="insights_skipped_no_data",
component=obs_components.PIPELINE,
)
return
# NEW: Gate by intelligence flag if present in context
enable_intel = getattr(context, "enable_intelligence", True)
if not enable_intel:
obs_logger.log_event(
"info",
"Skipping intelligence generation: historical week (LLM disabled)",
event="insights_skipped_historical",
component=obs_components.PIPELINE,
fields={"week_start": str(context.weekly_snapshot.week_start_date)}
)
context.insights = context.last_insights
context.plan = context.last_plan
return
state_changed = self.positioning_change_service.has_changed(
context.last_runner_positioning,
context.runner_positioning
)
has_comparison = getattr(context.runner_positioning, "comparison_available", False)
positioning_status = getattr(context.runner_positioning, "position_status", None)
is_baseline = positioning_status in [None, "UNKNOWN", "BASELINE_BUILDING"]
has_existing_intelligence = len(context.last_insights) or context.last_plan is not None
# --- Decision logic ---
should_generate_intelligence = (
(state_changed and has_comparison) or # normal case
is_baseline or # baseline case (NEW)
not has_existing_intelligence # safety fallback (NEW)
)
if should_generate_intelligence:
obs_logger.log_event(
"info",
"Intelligence generation triggered",
event="intelligence_generated",
component=obs_components.ORCHESTRATOR,
fields={
"state_changed": state_changed,
"comparison_available": has_comparison,
"baseline": is_baseline,
"has_existing": has_existing_intelligence
}
)
if context.insights is None:
await self._run_insights(context)
if context.plan is None:
await self._run_plan(context)
# Brief is always sequential as it depends on trends/snapshot being stable
res = await self._run_brief(context)
if isinstance(res, tuple):
brief, focus = res
else:
brief, focus = await res
# Update cache in context
self._update_cache(context, brief, focus)
else:
obs_logger.log_event(
"info",
f"Reuse intelligence for state: {context.runner_positioning.position_status}",
event="intelligence_reused",
component=obs_components.ORCHESTRATOR,
positioning_state=context.runner_positioning.position_status
)
context.insights = context.last_insights
context.plan = context.last_plan
# Apply cached brief to current snapshot
if context.weekly_snapshot:
context.weekly_snapshot.performance_brief = context.last_brief
context.weekly_snapshot.performance_focus = context.last_focus
if self.weekly_repo:
self.weekly_repo.save(context.weekly_snapshot)
def run_parallel_agents(self, context):
"""
Extraction point for parallel execution.
Returns a list of awaitable tasks for independent agents.
"""
if context.weekly_snapshot is None:
return []
# Check if state changed before deciding to run parallel tasks
# NOTE: This repeats the logic but ensures we don't fire LLM calls if not needed
state_changed = self.positioning_change_service.has_changed(context.last_runner_positioning, context.runner_positioning)
if not state_changed:
return []
return [
self._run_insights(context),
self._run_plan(context)
]
async def _run_insights(self, context):
latest_run = context.runs[-1] if context.runs else None
if isinstance(latest_run, dict):
from domain.training.run import Run
latest_run = Run(**latest_run)
if context.weekly_snapshot is None or not context.runs:
obs_logger.log_event(
"info",
"Skipping insights generation: no run data available",
event="insights_skipped_no_data",
component=obs_components.PIPELINE,
)
return
context.insights = await self.insights_agent.run(
latest_run,
context.trends,
risk_level=context.risk_assessment.risk_level,
language=context.language,
profile=context.runner_profile,
goal=context.active_goal,
)
async def _run_plan(self, context):
plan_result = await self.plan_agent.run(
context.summary, language=context.language, profile=context.runner_profile, goal=context.active_goal
)
draft_plan = plan_result.get("plan", "")
context.plan = self.guardrail_service.apply(draft_plan, context.risk_assessment)
async def _run_brief(self, context):
brief, focus = "", ""
if self.brief_service and context.weekly_snapshot and context.weekly_trend:
goal_progress = self.goal_progress_service.compute(
self.goal_service, context.active_goal, context.weekly_snapshot, context.weekly_trend
)
brief, focus = await self.brief_service.generate_brief(
context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language
)
context.weekly_snapshot.performance_brief = brief
context.weekly_snapshot.performance_focus = focus
context.weekly_snapshot.brief_source_hash = self.brief_service.compute_brief_hash(
context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language
)
context.weekly_snapshot.brief_generated_at = datetime.now()
if self.weekly_repo:
self.weekly_repo.save(context.weekly_snapshot)
return brief, focus
def _update_cache(self, context, brief, focus):
context.last_runner_positioning = context.runner_positioning
context.last_insights = context.insights
context.last_plan = context.plan
context.last_brief = brief
context.last_focus = focus