Spaces:
Sleeping
Sleeping
| """Synthetic multi-anomaly data generator for the metric tracker RL environment.""" | |
| from __future__ import annotations | |
| import random | |
| from dataclasses import dataclass, field | |
| from datetime import date, timedelta | |
| from statistics import median | |
| try: | |
| from ..analysis_tools import COUNT_METRICS, FUNNEL_STEPS, SharedAnalysisToolkit, AnalysisContext | |
| from ..models import ( | |
| ConversionMetricDefinition, | |
| MethodSpec, | |
| MetricRecord, | |
| MetricSubmissionRow, | |
| SyntheticAnomalyGenerator, | |
| SyntheticGeneratorApplication, | |
| ) | |
| except ImportError: | |
| from analysis_tools import COUNT_METRICS, FUNNEL_STEPS, SharedAnalysisToolkit, AnalysisContext | |
| from models import ( | |
| ConversionMetricDefinition, | |
| MethodSpec, | |
| MetricRecord, | |
| MetricSubmissionRow, | |
| SyntheticAnomalyGenerator, | |
| SyntheticGeneratorApplication, | |
| ) | |
| ALL_SCENARIO_FAMILIES: tuple[str, ...] = ( | |
| "mixed", | |
| "rate_drop_from_median", | |
| "rate_spike_from_median", | |
| "absolute_drop_in_event_count", | |
| "absolute_spike_in_event_count", | |
| "funnel_break", | |
| "hourly_traffic_mix_shift", | |
| "instrumentation_data_quality_issue", | |
| ) | |
| SYNTHETIC_GENERATOR_METHOD_SPECS: tuple[MethodSpec, ...] = ( | |
| MethodSpec( | |
| name="metric_stddev_shift", | |
| description=( | |
| "Inject a count or conversion anomaly on specific dates by setting the metric to " | |
| "median +/- stddev_factor * std_dev_from_median." | |
| ), | |
| parameters=["metric_name", "metric_names", "date", "dates", "stddev_factor", "direction"], | |
| ), | |
| ) | |
| def available_synthetic_generator_methods() -> list[MethodSpec]: | |
| """Return supported reset-time synthetic generator methods.""" | |
| return list(SYNTHETIC_GENERATOR_METHOD_SPECS) | |
| class GeneratorConfig: | |
| """Configurable parameters for synthetic metric generation.""" | |
| conversion_definitions: tuple[ConversionMetricDefinition, ...] = ( | |
| ConversionMetricDefinition( | |
| name="app_open_to_menu_open", | |
| numerator="menu_opens", | |
| denominator="app_opens", | |
| description="menu_opens / app_opens * 100", | |
| ), | |
| ConversionMetricDefinition( | |
| name="menu_open_to_product_added_to_cart", | |
| numerator="product_added_to_cart", | |
| denominator="menu_opens", | |
| description="product_added_to_cart / menu_opens * 100", | |
| ), | |
| ConversionMetricDefinition( | |
| name="product_added_to_cart_to_order_placed", | |
| numerator="orders_placed", | |
| denominator="product_added_to_cart", | |
| description="orders_placed / product_added_to_cart * 100", | |
| ), | |
| ConversionMetricDefinition( | |
| name="order_placed_to_payment_successful", | |
| numerator="payment_successful", | |
| denominator="orders_placed", | |
| description="payment_successful / orders_placed * 100", | |
| ), | |
| ConversionMetricDefinition( | |
| name="app_open_to_order_placed", | |
| numerator="orders_placed", | |
| denominator="app_opens", | |
| description="orders_placed / app_opens * 100", | |
| ), | |
| ConversionMetricDefinition( | |
| name="app_open_to_payment_successful", | |
| numerator="payment_successful", | |
| denominator="app_opens", | |
| description="payment_successful / app_opens * 100", | |
| ), | |
| ) | |
| num_weeks: int = 4 | |
| end_date_offset_days: int = 1 | |
| base_daily_app_opens: int = 18000 | |
| weekday_factors: tuple[float, ...] = (0.95, 1.0, 1.02, 1.04, 1.06, 1.12, 1.08) | |
| hourly_weights: tuple[float, ...] = ( | |
| 0.010, | |
| 0.008, | |
| 0.007, | |
| 0.007, | |
| 0.010, | |
| 0.018, | |
| 0.028, | |
| 0.040, | |
| 0.050, | |
| 0.055, | |
| 0.058, | |
| 0.060, | |
| 0.058, | |
| 0.056, | |
| 0.054, | |
| 0.052, | |
| 0.054, | |
| 0.060, | |
| 0.072, | |
| 0.078, | |
| 0.075, | |
| 0.060, | |
| 0.038, | |
| 0.025, | |
| ) | |
| baseline_rates: dict[str, float] = field( | |
| default_factory=lambda: { | |
| "menu_opens": 0.63, | |
| "product_added_to_cart": 0.29, | |
| "orders_placed": 0.44, | |
| "payment_successful": 0.91, | |
| } | |
| ) | |
| def num_days(self) -> int: | |
| return self.num_weeks * 7 | |
| class EpisodeConfig: | |
| """Per-episode configuration.""" | |
| seed: int = 0 | |
| scenario_family: str = "mixed" | |
| difficulty: str = "medium" | |
| anomaly_density: str = "medium" | |
| anomaly_count: int = 3 | |
| anomaly_generators: tuple[SyntheticAnomalyGenerator, ...] = () | |
| def normalized(self) -> "EpisodeConfig": | |
| family = self.scenario_family if self.scenario_family in ALL_SCENARIO_FAMILIES else "mixed" | |
| difficulty = self.difficulty if self.difficulty in {"easy", "medium", "hard"} else "medium" | |
| density = self.anomaly_density if self.anomaly_density in {"low", "medium", "high"} else "medium" | |
| return EpisodeConfig( | |
| seed=int(self.seed), | |
| scenario_family=family, | |
| difficulty=difficulty, | |
| anomaly_density=density, | |
| anomaly_count=max(1, int(self.anomaly_count or 3)), | |
| anomaly_generators=tuple(self.anomaly_generators or ()), | |
| ) | |
| class PlannedAnomaly: | |
| """Internal anomaly schedule item.""" | |
| date: str | |
| anomaly_type: str | |
| entity_type: str | |
| entity_name: str | |
| detection_method: str | |
| details: dict[str, str] | |
| class EpisodeData: | |
| """Synthetic dataset and ground truth used for one episode.""" | |
| config: EpisodeConfig | |
| scenario_label: str | |
| daily_metrics: list[MetricRecord] | |
| hourly_metrics: list[MetricRecord] | |
| expected_rows: list[MetricSubmissionRow] | |
| anomaly_schedule: list[dict[str, str]] | |
| applied_synthetic_generators: list[SyntheticGeneratorApplication] | |
| class MetricDataGenerator: | |
| """Reusable synthetic data generator used by the env and custom UI.""" | |
| def __init__(self, config: GeneratorConfig | None = None, seed: int | None = None) -> None: | |
| self.config = config or GeneratorConfig() | |
| self._default_seed = int(seed or 0) | |
| def generate_episode(self, episode_config: EpisodeConfig | None = None) -> EpisodeData: | |
| """Generate one seeded episode.""" | |
| config = (episode_config or EpisodeConfig(seed=self._default_seed)).normalized() | |
| rng = random.Random(config.seed) | |
| end_date = date.today() - timedelta(days=self.config.end_date_offset_days) | |
| start_date = end_date - timedelta(days=self.config.num_days - 1) | |
| base_hourly = self._generate_base_hourly_metrics(start_date, rng, config) | |
| applied_synthetic_generators: list[SyntheticGeneratorApplication] = [] | |
| if self._use_synthetic_metric_generators(config): | |
| anomaly_plan, applied_synthetic_generators = self._apply_metric_generators( | |
| base_hourly, | |
| rng, | |
| config, | |
| ) | |
| else: | |
| anomaly_plan = self._plan_anomalies(base_hourly, rng, config) | |
| self._apply_anomalies(base_hourly, anomaly_plan, rng, config) | |
| daily_metrics, hourly_metrics = self._materialize_metrics(base_hourly) | |
| if applied_synthetic_generators: | |
| self._refresh_applied_generator_actuals( | |
| applied_synthetic_generators, | |
| daily_metrics, | |
| ) | |
| expected_rows = self._build_expected_rows(daily_metrics, hourly_metrics, anomaly_plan, config) | |
| anomaly_schedule = [ | |
| { | |
| "date": item.date, | |
| "anomaly_type": item.anomaly_type, | |
| "entity_type": item.entity_type, | |
| "entity_name": item.entity_name, | |
| "detection_method": item.detection_method, | |
| } | |
| for item in anomaly_plan | |
| ] | |
| return EpisodeData( | |
| config=config, | |
| scenario_label=config.scenario_family, | |
| daily_metrics=daily_metrics, | |
| hourly_metrics=hourly_metrics, | |
| expected_rows=expected_rows, | |
| anomaly_schedule=anomaly_schedule, | |
| applied_synthetic_generators=applied_synthetic_generators, | |
| ) | |
| def _use_synthetic_metric_generators(self, episode_config: EpisodeConfig) -> bool: | |
| if episode_config.anomaly_generators: | |
| return True | |
| return episode_config.scenario_family in { | |
| "mixed", | |
| "rate_drop_from_median", | |
| "rate_spike_from_median", | |
| "absolute_drop_in_event_count", | |
| "absolute_spike_in_event_count", | |
| } | |
| def _generate_base_hourly_metrics( | |
| self, | |
| start_date: date, | |
| rng: random.Random, | |
| episode_config: EpisodeConfig, | |
| ) -> dict[str, list[MetricRecord]]: | |
| hourly: dict[str, list[MetricRecord]] = {} | |
| difficulty_noise = {"easy": 0.015, "medium": 0.025, "hard": 0.035}[episode_config.difficulty] | |
| for day_index in range(self.config.num_days): | |
| current_date = start_date + timedelta(days=day_index) | |
| date_key = current_date.isoformat() | |
| weekday_factor = self.config.weekday_factors[current_date.weekday()] | |
| trend_factor = 1.0 + day_index * 0.0025 | |
| noise_factor = 1.0 + rng.uniform(-0.02, 0.02) | |
| total_app_opens = round( | |
| self.config.base_daily_app_opens * weekday_factor * trend_factor * noise_factor | |
| ) | |
| weights = self._hour_weights(current_date.weekday(), rng) | |
| hourly_app_opens = self._allocate_total(total_app_opens, weights, rng) | |
| day_rows: list[MetricRecord] = [] | |
| for hour, app_opens in enumerate(hourly_app_opens): | |
| menu_rate = self._bounded( | |
| self.config.baseline_rates["menu_opens"] * (1.0 + rng.uniform(-difficulty_noise, difficulty_noise)), | |
| 0.50, | |
| 0.80, | |
| ) | |
| cart_rate = self._bounded( | |
| self.config.baseline_rates["product_added_to_cart"] | |
| * (1.0 + rng.uniform(-difficulty_noise * 1.2, difficulty_noise * 1.2)), | |
| 0.18, | |
| 0.42, | |
| ) | |
| order_rate = self._bounded( | |
| self.config.baseline_rates["orders_placed"] | |
| * (1.0 + rng.uniform(-difficulty_noise * 1.2, difficulty_noise * 1.2)), | |
| 0.28, | |
| 0.62, | |
| ) | |
| payment_rate = self._bounded( | |
| self.config.baseline_rates["payment_successful"] | |
| * (1.0 + rng.uniform(-difficulty_noise, difficulty_noise)), | |
| 0.76, | |
| 0.99, | |
| ) | |
| menu_opens = round(app_opens * menu_rate) | |
| carts = round(menu_opens * cart_rate) | |
| orders = round(carts * order_rate) | |
| payments = round(orders * payment_rate) | |
| day_rows.append( | |
| MetricRecord( | |
| date=date_key, | |
| hour=hour, | |
| app_opens=app_opens, | |
| menu_opens=menu_opens, | |
| product_added_to_cart=carts, | |
| orders_placed=orders, | |
| payment_successful=payments, | |
| ) | |
| ) | |
| hourly[date_key] = day_rows | |
| return hourly | |
| def _plan_anomalies( | |
| self, | |
| base_hourly: dict[str, list[MetricRecord]], | |
| rng: random.Random, | |
| episode_config: EpisodeConfig, | |
| ) -> list[PlannedAnomaly]: | |
| dates = sorted(base_hourly) | |
| candidate_dates = dates[3:-2] if len(dates) > 8 else dates | |
| family_pool = ( | |
| list(ALL_SCENARIO_FAMILIES[1:]) | |
| if episode_config.scenario_family == "mixed" | |
| else [episode_config.scenario_family] | |
| ) | |
| target_count = max( | |
| 1, | |
| int( | |
| episode_config.anomaly_count | |
| or {"low": 3, "medium": 5, "high": 7}[episode_config.anomaly_density] | |
| ), | |
| ) | |
| plan: list[PlannedAnomaly] = [] | |
| used_pairs: set[tuple[str, str, str]] = set() | |
| family_order = family_pool[:] | |
| rng.shuffle(family_order) | |
| family_index = 0 | |
| while len(plan) < target_count: | |
| if family_index >= len(family_order): | |
| family_order = family_pool[:] | |
| rng.shuffle(family_order) | |
| family_index = 0 | |
| anomaly_type = family_order[family_index] | |
| family_index += 1 | |
| date_key = rng.choice(candidate_dates) | |
| entity_type, entity_name, detection_method, details = self._pick_entity_for_family( | |
| anomaly_type, | |
| rng, | |
| ) | |
| dedupe_key = (date_key, entity_type, entity_name) | |
| if dedupe_key in used_pairs: | |
| continue | |
| used_pairs.add(dedupe_key) | |
| plan.append( | |
| PlannedAnomaly( | |
| date=date_key, | |
| anomaly_type=anomaly_type, | |
| entity_type=entity_type, | |
| entity_name=entity_name, | |
| detection_method=detection_method, | |
| details=details, | |
| ) | |
| ) | |
| plan.sort(key=lambda item: (item.date, item.entity_type, item.entity_name)) | |
| return plan | |
| def _pick_entity_for_family( | |
| self, | |
| anomaly_type: str, | |
| rng: random.Random, | |
| ) -> tuple[str, str, str, dict[str, str]]: | |
| if anomaly_type in {"rate_drop_from_median", "rate_spike_from_median"}: | |
| definition = rng.choice(list(self.config.conversion_definitions)) | |
| return ( | |
| "conversion_rate", | |
| definition.name, | |
| "compare_rate_to_median", | |
| {"conversion_name": definition.name}, | |
| ) | |
| if anomaly_type in {"absolute_drop_in_event_count", "absolute_spike_in_event_count"}: | |
| metric_name = rng.choice(list(COUNT_METRICS)) | |
| return ( | |
| "event_count", | |
| metric_name, | |
| "compare_count_to_median", | |
| {"metric_name": metric_name}, | |
| ) | |
| if anomaly_type == "funnel_break": | |
| numerator, denominator = rng.choice(list(FUNNEL_STEPS)) | |
| return ( | |
| "funnel_step", | |
| f"{numerator}_from_{denominator}", | |
| "detect_funnel_break", | |
| {"numerator": numerator, "denominator": denominator}, | |
| ) | |
| if anomaly_type == "hourly_traffic_mix_shift": | |
| return ( | |
| "hourly_mix", | |
| "app_opens:daytime_share", | |
| "hourly_rows_for_date", | |
| {}, | |
| ) | |
| numerator, denominator = rng.choice(list(FUNNEL_STEPS)) | |
| return ( | |
| "data_quality", | |
| f"{numerator}_lte_{denominator}", | |
| "check_impossible_counts", | |
| {"numerator": numerator, "denominator": denominator}, | |
| ) | |
| def _apply_anomalies( | |
| self, | |
| hourly: dict[str, list[MetricRecord]], | |
| plan: list[PlannedAnomaly], | |
| rng: random.Random, | |
| episode_config: EpisodeConfig, | |
| ) -> None: | |
| difficulty = episode_config.difficulty | |
| for item in plan: | |
| rows = hourly[item.date] | |
| if item.anomaly_type == "rate_drop_from_median": | |
| self._apply_rate_change(rows, item.details["conversion_name"], rng, difficulty, direction="down") | |
| elif item.anomaly_type == "rate_spike_from_median": | |
| self._apply_rate_change(rows, item.details["conversion_name"], rng, difficulty, direction="up") | |
| elif item.anomaly_type == "absolute_drop_in_event_count": | |
| self._apply_count_change(rows, item.details["metric_name"], rng, difficulty, direction="down") | |
| elif item.anomaly_type == "absolute_spike_in_event_count": | |
| self._apply_count_change(rows, item.details["metric_name"], rng, difficulty, direction="up") | |
| elif item.anomaly_type == "funnel_break": | |
| self._apply_funnel_break(rows, item.details["numerator"], item.details["denominator"], rng, difficulty) | |
| elif item.anomaly_type == "hourly_traffic_mix_shift": | |
| self._apply_hourly_mix_shift(rows, rng, difficulty) | |
| elif item.anomaly_type == "instrumentation_data_quality_issue": | |
| self._apply_data_quality_issue(rows, item.details["numerator"], item.details["denominator"], rng, difficulty) | |
| def _apply_rate_change( | |
| self, | |
| rows: list[MetricRecord], | |
| conversion_name: str, | |
| rng: random.Random, | |
| difficulty: str, | |
| *, | |
| direction: str, | |
| ) -> None: | |
| definition = next(item for item in self.config.conversion_definitions if item.name == conversion_name) | |
| multipliers = { | |
| "easy": (0.74, 1.32), | |
| "medium": (0.82, 1.22), | |
| "hard": (0.88, 1.15), | |
| }[difficulty] | |
| multiplier = multipliers[0] if direction == "down" else multipliers[1] | |
| for row in rows: | |
| denominator_value = getattr(row, definition.denominator) | |
| observed = round(denominator_value * multiplier * self._base_rate_from_metric(definition.numerator)) | |
| setattr_value = min(max(observed, 0), denominator_value) | |
| self._set_metric_and_rebalance(row, definition.numerator, setattr_value) | |
| def _apply_count_change( | |
| self, | |
| rows: list[MetricRecord], | |
| metric_name: str, | |
| rng: random.Random, | |
| difficulty: str, | |
| *, | |
| direction: str, | |
| ) -> None: | |
| multipliers = { | |
| "easy": (0.58, 1.42), | |
| "medium": (0.72, 1.28), | |
| "hard": (0.82, 1.18), | |
| }[difficulty] | |
| multiplier = multipliers[0] if direction == "down" else multipliers[1] | |
| for row in rows: | |
| original = getattr(row, metric_name) | |
| updated = max(0, round(original * multiplier)) | |
| self._set_metric_and_rebalance(row, metric_name, updated) | |
| def _apply_funnel_break( | |
| self, | |
| rows: list[MetricRecord], | |
| numerator: str, | |
| denominator: str, | |
| rng: random.Random, | |
| difficulty: str, | |
| ) -> None: | |
| if numerator == "menu_opens": | |
| return | |
| drop = {"easy": 0.45, "medium": 0.58, "hard": 0.7}[difficulty] | |
| for row in rows: | |
| denominator_value = getattr(row, denominator) | |
| broken_value = max(0, round(denominator_value * drop)) | |
| self._set_metric_and_rebalance(row, numerator, broken_value) | |
| def _apply_hourly_mix_shift( | |
| self, | |
| rows: list[MetricRecord], | |
| rng: random.Random, | |
| difficulty: str, | |
| ) -> None: | |
| total = sum(row.app_opens for row in rows) | |
| if total <= 0: | |
| return | |
| shift = {"easy": 0.28, "medium": 0.20, "hard": 0.14}[difficulty] | |
| boosted_hours = {0, 1, 2, 3, 4, 21, 22, 23} | |
| weights = [] | |
| for row in rows: | |
| base = row.app_opens / total | |
| if row.hour in boosted_hours: | |
| base *= 1.0 + shift | |
| elif 9 <= (row.hour or 0) <= 18: | |
| base *= max(0.2, 1.0 - shift) | |
| weights.append(base) | |
| normalized = [value / sum(weights) for value in weights] | |
| redistributed = self._allocate_total(total, normalized, rng) | |
| for row, app_opens in zip(rows, redistributed, strict=False): | |
| row.app_opens = app_opens | |
| menu_rate = self._ratio(row.menu_opens, max(row.app_opens, 1)) | |
| row.menu_opens = min(row.app_opens, round(app_opens * menu_rate)) | |
| cart_rate = self._ratio(row.product_added_to_cart, max(row.menu_opens, 1)) | |
| row.product_added_to_cart = min(row.menu_opens, round(row.menu_opens * cart_rate)) | |
| order_rate = self._ratio(row.orders_placed, max(row.product_added_to_cart, 1)) | |
| row.orders_placed = min(row.product_added_to_cart, round(row.product_added_to_cart * order_rate)) | |
| payment_rate = self._ratio(row.payment_successful, max(row.orders_placed, 1)) | |
| row.payment_successful = min(row.orders_placed, round(row.orders_placed * payment_rate)) | |
| def _apply_data_quality_issue( | |
| self, | |
| rows: list[MetricRecord], | |
| numerator: str, | |
| denominator: str, | |
| rng: random.Random, | |
| difficulty: str, | |
| ) -> None: | |
| affected_hours = {"easy": 5, "medium": 4, "hard": 3}[difficulty] | |
| for row in rng.sample(rows, k=min(affected_hours, len(rows))): | |
| denominator_value = getattr(row, denominator) | |
| violation = max(1, round(denominator_value * {"easy": 0.12, "medium": 0.08, "hard": 0.05}[difficulty])) | |
| setattr(row, numerator, denominator_value + violation) | |
| self._rebalance_downstream_from(row, numerator) | |
| def _apply_metric_generators( | |
| self, | |
| hourly: dict[str, list[MetricRecord]], | |
| rng: random.Random, | |
| episode_config: EpisodeConfig, | |
| ) -> tuple[list[PlannedAnomaly], list[SyntheticGeneratorApplication]]: | |
| generator_specs = self._resolve_metric_generators(hourly, rng, episode_config) | |
| if not generator_specs: | |
| return [], [] | |
| daily_metrics, hourly_metrics = self._materialize_metrics(hourly) | |
| toolkit = SharedAnalysisToolkit( | |
| AnalysisContext( | |
| daily_metrics=daily_metrics, | |
| hourly_metrics=hourly_metrics, | |
| conversion_definitions=list(self.config.conversion_definitions), | |
| config=episode_config.__dict__, | |
| ) | |
| ) | |
| anomaly_plan: list[PlannedAnomaly] = [] | |
| applications: list[SyntheticGeneratorApplication] = [] | |
| seen_pairs: set[tuple[str, str]] = set() | |
| for spec in generator_specs: | |
| for date_key in self._resolve_generator_dates(spec, hourly, rng): | |
| for metric_name in self._resolve_generator_metrics(spec): | |
| dedupe_key = (date_key, metric_name) | |
| if dedupe_key in seen_pairs: | |
| continue | |
| seen_pairs.add(dedupe_key) | |
| application = self._build_metric_generator_application( | |
| toolkit=toolkit, | |
| date_key=date_key, | |
| metric_name=metric_name, | |
| spec=spec, | |
| rng=rng, | |
| ) | |
| self._apply_metric_generator_application(hourly[date_key], application) | |
| applications.append(application) | |
| anomaly_plan.append( | |
| PlannedAnomaly( | |
| date=date_key, | |
| anomaly_type=application.anomaly_type, | |
| entity_type=application.metric_type, | |
| entity_name=metric_name, | |
| detection_method=application.detection_method, | |
| details={"metric_name": metric_name}, | |
| ) | |
| ) | |
| applications.sort(key=lambda item: (item.date, item.metric_name)) | |
| anomaly_plan.sort(key=lambda item: (item.date, item.entity_type, item.entity_name)) | |
| return anomaly_plan, applications | |
| def _resolve_metric_generators( | |
| self, | |
| hourly: dict[str, list[MetricRecord]], | |
| rng: random.Random, | |
| episode_config: EpisodeConfig, | |
| ) -> list[SyntheticAnomalyGenerator]: | |
| if episode_config.anomaly_generators: | |
| return list(episode_config.anomaly_generators) | |
| dates = sorted(hourly) | |
| candidate_dates = dates[3:-2] if len(dates) > 8 else dates | |
| metric_pool = self._metric_pool_for_family(episode_config.scenario_family) | |
| if not metric_pool: | |
| return [] | |
| used_pairs: set[tuple[str, str]] = set() | |
| generated: list[SyntheticAnomalyGenerator] = [] | |
| default_stddev = {"easy": 2.6, "medium": 2.2, "hard": 1.8}[episode_config.difficulty] | |
| while len(generated) < max(1, episode_config.anomaly_count): | |
| date_key = rng.choice(candidate_dates) | |
| metric_name = rng.choice(metric_pool) | |
| if (date_key, metric_name) in used_pairs: | |
| continue | |
| used_pairs.add((date_key, metric_name)) | |
| generated.append( | |
| SyntheticAnomalyGenerator( | |
| method_name="metric_stddev_shift", | |
| metric_name=metric_name, | |
| date=date_key, | |
| stddev_factor=default_stddev, | |
| direction=self._default_direction_for_family(episode_config.scenario_family, rng), | |
| ) | |
| ) | |
| return generated | |
| def _metric_pool_for_family(self, scenario_family: str) -> list[str]: | |
| conversion_metrics = [item.name for item in self.config.conversion_definitions] | |
| if scenario_family in {"rate_drop_from_median", "rate_spike_from_median"}: | |
| return conversion_metrics | |
| if scenario_family in {"absolute_drop_in_event_count", "absolute_spike_in_event_count"}: | |
| return list(COUNT_METRICS) | |
| if scenario_family == "mixed": | |
| return list(COUNT_METRICS) + conversion_metrics | |
| return [] | |
| def _default_direction_for_family(scenario_family: str, rng: random.Random) -> str: | |
| if scenario_family in {"rate_drop_from_median", "absolute_drop_in_event_count"}: | |
| return "down" | |
| if scenario_family in {"rate_spike_from_median", "absolute_spike_in_event_count"}: | |
| return "up" | |
| return "down" if rng.random() < 0.5 else "up" | |
| def _resolve_generator_dates( | |
| self, | |
| spec: SyntheticAnomalyGenerator, | |
| hourly: dict[str, list[MetricRecord]], | |
| rng: random.Random, | |
| ) -> list[str]: | |
| dates = [item for item in spec.dates if item in hourly] | |
| if spec.date and spec.date in hourly: | |
| dates.append(spec.date) | |
| if not dates: | |
| dates = [rng.choice(sorted(hourly))] | |
| seen = set() | |
| deduped = [] | |
| for item in dates: | |
| if item in seen: | |
| continue | |
| seen.add(item) | |
| deduped.append(item) | |
| return deduped | |
| def _resolve_generator_metrics(self, spec: SyntheticAnomalyGenerator) -> list[str]: | |
| metrics = [item for item in spec.metric_names if item] | |
| if spec.metric_name: | |
| metrics.append(spec.metric_name) | |
| if not metrics: | |
| metrics = list(COUNT_METRICS) + [item.name for item in self.config.conversion_definitions] | |
| seen = set() | |
| deduped = [] | |
| for item in metrics: | |
| if item in seen: | |
| continue | |
| seen.add(item) | |
| deduped.append(item) | |
| return deduped | |
| def _build_metric_generator_application( | |
| self, | |
| *, | |
| toolkit: SharedAnalysisToolkit, | |
| date_key: str, | |
| metric_name: str, | |
| spec: SyntheticAnomalyGenerator, | |
| rng: random.Random, | |
| ) -> SyntheticGeneratorApplication: | |
| stats = toolkit.get_metric_std_dev_from_median(metric_name) | |
| descriptor = toolkit._metric_descriptor(metric_name) | |
| baseline_value = float(stats["median_value"]) | |
| std_dev_from_median = float(stats["std_dev_from_median"]) | |
| pre_applied_value = float(descriptor["per_date_values"][date_key]) | |
| direction = spec.direction if spec.direction != "auto" else ("down" if rng.random() < 0.5 else "up") | |
| sign = -1.0 if direction == "down" else 1.0 | |
| threshold_value = round(std_dev_from_median * float(spec.stddev_factor), 4) | |
| metric_type = "event_count" if metric_name in COUNT_METRICS else "conversion_rate" | |
| if metric_type == "event_count": | |
| minimum_shift = max(50.0, baseline_value * toolkit._count_threshold_fraction()) * 1.05 | |
| applied_shift = max(threshold_value, round(minimum_shift, 4)) | |
| target_value = max(0.0, baseline_value + sign * applied_shift) | |
| anomaly_type = "absolute_spike_in_event_count" if sign > 0 else "absolute_drop_in_event_count" | |
| detection_method = "compare_count_to_median" | |
| else: | |
| applied_shift = max(threshold_value, round(toolkit._rate_threshold() * 1.05, 4)) | |
| target_value = self._bounded(baseline_value + sign * applied_shift, 0.0, 100.0) | |
| anomaly_type = "rate_spike_from_median" if sign > 0 else "rate_drop_from_median" | |
| detection_method = "compare_rate_to_median" | |
| return SyntheticGeneratorApplication( | |
| method_name=spec.method_name, | |
| date=date_key, | |
| metric_name=metric_name, | |
| metric_type=metric_type, | |
| direction="up" if sign > 0 else "down", | |
| anomaly_type=anomaly_type, | |
| detection_method=detection_method, | |
| baseline_value=round(baseline_value, 4), | |
| pre_applied_value=round(pre_applied_value, 4), | |
| std_dev_from_median=round(std_dev_from_median, 4), | |
| stddev_factor=round(float(spec.stddev_factor), 4), | |
| threshold_value=threshold_value, | |
| target_value=round(target_value, 4), | |
| actual_value=round(target_value, 4), | |
| formula=( | |
| f"{metric_name} = median {'+' if sign > 0 else '-'} " | |
| "max(stddev_factor * std_dev_from_median, detector_threshold)" | |
| ), | |
| ) | |
| def _apply_metric_generator_application( | |
| self, | |
| rows: list[MetricRecord], | |
| application: SyntheticGeneratorApplication, | |
| ) -> None: | |
| if application.metric_type == "event_count": | |
| self._apply_daily_count_target( | |
| rows, | |
| application.metric_name, | |
| int(round(application.target_value)), | |
| ) | |
| return | |
| self._apply_daily_conversion_target( | |
| rows, | |
| application.metric_name, | |
| float(application.target_value), | |
| ) | |
| def _apply_daily_count_target( | |
| self, | |
| rows: list[MetricRecord], | |
| metric_name: str, | |
| target_total: int, | |
| ) -> None: | |
| target_total = max(0, target_total) | |
| current_values = [max(0, getattr(row, metric_name)) for row in rows] | |
| current_total = sum(current_values) | |
| if current_total > 0: | |
| weights = [value / current_total for value in current_values] | |
| else: | |
| app_total = sum(max(0, row.app_opens) for row in rows) or len(rows) | |
| weights = [max(0, row.app_opens) / app_total for row in rows] | |
| allocated = self._allocate_total(target_total, weights, random.Random(target_total + len(rows))) | |
| for row, value in zip(rows, allocated, strict=False): | |
| self._set_metric_and_rebalance(row, metric_name, value) | |
| def _apply_daily_conversion_target( | |
| self, | |
| rows: list[MetricRecord], | |
| conversion_name: str, | |
| target_rate_pct: float, | |
| ) -> None: | |
| definition = next(item for item in self.config.conversion_definitions if item.name == conversion_name) | |
| bounded_rate = self._bounded(target_rate_pct / 100.0, 0.0, 1.0) | |
| for row in rows: | |
| denominator_value = getattr(row, definition.denominator) | |
| numerator_target = round(denominator_value * bounded_rate) | |
| self._set_metric_and_rebalance(row, definition.numerator, numerator_target) | |
| def _refresh_applied_generator_actuals( | |
| self, | |
| applications: list[SyntheticGeneratorApplication], | |
| daily_metrics: list[MetricRecord], | |
| ) -> None: | |
| by_date = {row.date: row for row in daily_metrics} | |
| conversion_map = {item.name: item for item in self.config.conversion_definitions} | |
| for application in applications: | |
| record = by_date.get(application.date) | |
| if record is None: | |
| continue | |
| if application.metric_type == "event_count": | |
| actual_value = float(getattr(record, application.metric_name)) | |
| else: | |
| definition = conversion_map[application.metric_name] | |
| denominator = getattr(record, definition.denominator) | |
| actual_value = round( | |
| (getattr(record, definition.numerator) / denominator * 100.0) | |
| if denominator > 0 | |
| else 0.0, | |
| 4, | |
| ) | |
| application.actual_value = round(actual_value, 4) | |
| def _build_expected_rows( | |
| self, | |
| daily_metrics: list[MetricRecord], | |
| hourly_metrics: list[MetricRecord], | |
| plan: list[PlannedAnomaly], | |
| episode_config: EpisodeConfig, | |
| ) -> list[MetricSubmissionRow]: | |
| toolkit = SharedAnalysisToolkit( | |
| AnalysisContext( | |
| daily_metrics=daily_metrics, | |
| hourly_metrics=hourly_metrics, | |
| conversion_definitions=list(self.config.conversion_definitions), | |
| config={ | |
| "seed": episode_config.seed, | |
| "scenario_family": episode_config.scenario_family, | |
| "difficulty": episode_config.difficulty, | |
| "anomaly_density": episode_config.anomaly_density, | |
| "anomaly_count": episode_config.anomaly_count, | |
| }, | |
| ) | |
| ) | |
| rows: list[MetricSubmissionRow] = [] | |
| for item in plan: | |
| if item.detection_method == "compare_rate_to_median": | |
| result = toolkit.compare_rate_to_median(item.date, item.entity_name) | |
| elif item.detection_method == "compare_count_to_median": | |
| result = toolkit.compare_count_to_median(item.date, item.entity_name) | |
| elif item.detection_method == "detect_funnel_break": | |
| candidates = toolkit.detect_funnel_break(item.date)["candidates"] | |
| result = next((row for row in candidates if row["entity_name"] == item.entity_name), None) | |
| if result is None: | |
| numerator = item.details["numerator"] | |
| denominator = item.details["denominator"] | |
| daily_row = next(row for row in daily_metrics if row.date == item.date) | |
| baseline_series = [ | |
| (getattr(row, numerator) / getattr(row, denominator) * 100.0) | |
| if getattr(row, denominator) > 0 | |
| else 0.0 | |
| for row in daily_metrics | |
| ] | |
| baseline = round(median(baseline_series), 4) | |
| observed = round( | |
| (getattr(daily_row, numerator) / getattr(daily_row, denominator) * 100.0) | |
| if getattr(daily_row, denominator) > 0 | |
| else 0.0, | |
| 4, | |
| ) | |
| delta = round(observed - baseline, 4) | |
| result = { | |
| "entity_type": item.entity_type, | |
| "entity_name": item.entity_name, | |
| "baseline_value": baseline, | |
| "observed_value": observed, | |
| "delta_value": delta, | |
| "severity": self._severity_from_ratio(abs(delta), 5.0, 10.0, 15.0), | |
| } | |
| elif item.detection_method == "check_impossible_counts": | |
| impossible = toolkit.check_impossible_counts(item.date) | |
| result = { | |
| "date": item.date, | |
| "entity_type": item.entity_type, | |
| "entity_name": item.entity_name, | |
| "anomaly_type": item.anomaly_type, | |
| "detection_method": item.detection_method, | |
| "baseline_value": 0.0, | |
| "observed_value": round(impossible["total_excess"], 4), | |
| "delta_value": round(impossible["total_excess"], 4), | |
| "severity": self._severity_from_ratio(impossible["total_excess"], 20.0, 60.0, 120.0), | |
| } | |
| else: | |
| observed_share = toolkit.hourly_rows_for_date(item.date)["summary"]["daytime_share"] | |
| baseline_share = toolkit._median_daytime_share() | |
| delta = round(observed_share - baseline_share, 4) | |
| result = { | |
| "date": item.date, | |
| "entity_type": item.entity_type, | |
| "entity_name": item.entity_name, | |
| "anomaly_type": item.anomaly_type, | |
| "detection_method": item.detection_method, | |
| "baseline_value": round(baseline_share, 4), | |
| "observed_value": round(observed_share, 4), | |
| "delta_value": delta, | |
| "severity": self._severity_from_ratio(abs(delta) * 100.0, 10.0, 18.0, 25.0), | |
| } | |
| if not result: | |
| continue | |
| normalized = dict(result) | |
| normalized["date"] = item.date | |
| normalized["anomaly_type"] = item.anomaly_type | |
| normalized["detection_method"] = item.detection_method | |
| rows.append(MetricSubmissionRow(**normalized)) | |
| deduped = {f"{row.date}|{row.entity_type}|{row.entity_name}": row for row in rows} | |
| return sorted(deduped.values(), key=lambda row: (row.date, row.entity_type, row.entity_name)) | |
| def _materialize_metrics( | |
| self, | |
| base_hourly: dict[str, list[MetricRecord]], | |
| ) -> tuple[list[MetricRecord], list[MetricRecord]]: | |
| hourly_metrics = [] | |
| daily_metrics = [] | |
| for date_key in sorted(base_hourly): | |
| rows = base_hourly[date_key] | |
| hourly_metrics.extend(rows) | |
| daily_metrics.append( | |
| MetricRecord( | |
| date=date_key, | |
| hour=None, | |
| app_opens=sum(item.app_opens for item in rows), | |
| menu_opens=sum(item.menu_opens for item in rows), | |
| product_added_to_cart=sum(item.product_added_to_cart for item in rows), | |
| orders_placed=sum(item.orders_placed for item in rows), | |
| payment_successful=sum(item.payment_successful for item in rows), | |
| ) | |
| ) | |
| return daily_metrics, hourly_metrics | |
| def _set_metric_and_rebalance(self, row: MetricRecord, metric_name: str, value: int) -> None: | |
| caps = { | |
| "app_opens": None, | |
| "menu_opens": row.app_opens, | |
| "product_added_to_cart": row.menu_opens, | |
| "orders_placed": row.product_added_to_cart, | |
| "payment_successful": row.orders_placed, | |
| } | |
| cap = caps.get(metric_name) | |
| bounded = max(0, value if cap is None else min(value, cap)) | |
| setattr(row, metric_name, bounded) | |
| self._rebalance_downstream_from(row, metric_name) | |
| self._rebalance_upstream_to(row, metric_name) | |
| def _rebalance_downstream_from(self, row: MetricRecord, metric_name: str) -> None: | |
| order = list(COUNT_METRICS) | |
| start_index = order.index(metric_name) | |
| for index in range(start_index + 1, len(order)): | |
| parent_name = order[index - 1] | |
| current_name = order[index] | |
| parent_value = getattr(row, parent_name) | |
| current_value = min(getattr(row, current_name), parent_value) | |
| setattr(row, current_name, max(0, current_value)) | |
| def _rebalance_upstream_to(self, row: MetricRecord, metric_name: str) -> None: | |
| order = list(COUNT_METRICS) | |
| start_index = order.index(metric_name) | |
| for index in range(start_index - 1, -1, -1): | |
| child_name = order[index + 1] | |
| current_name = order[index] | |
| child_value = getattr(row, child_name) | |
| current_value = max(getattr(row, current_name), child_value) | |
| setattr(row, current_name, current_value) | |
| def _base_rate_from_metric(self, metric_name: str) -> float: | |
| if metric_name == "menu_opens": | |
| return self.config.baseline_rates["menu_opens"] | |
| if metric_name == "product_added_to_cart": | |
| return self.config.baseline_rates["product_added_to_cart"] | |
| if metric_name == "orders_placed": | |
| return self.config.baseline_rates["orders_placed"] | |
| if metric_name == "payment_successful": | |
| return self.config.baseline_rates["payment_successful"] | |
| return 1.0 | |
| def _hour_weights(self, weekday: int, rng: random.Random) -> list[float]: | |
| weekend_multiplier = 1.12 if weekday >= 5 else 1.0 | |
| weights = [ | |
| max(0.001, value * weekend_multiplier * (1.0 + rng.uniform(-0.08, 0.08))) | |
| for value in self.config.hourly_weights | |
| ] | |
| total = sum(weights) | |
| return [value / total for value in weights] | |
| def _allocate_total(total: int, weights: list[float], rng: random.Random) -> list[int]: | |
| raw = [total * weight for weight in weights] | |
| integers = [int(value) for value in raw] | |
| remainder = total - sum(integers) | |
| ranked = sorted( | |
| range(len(weights)), | |
| key=lambda index: (raw[index] - integers[index], rng.random()), | |
| reverse=True, | |
| ) | |
| for index in ranked[:remainder]: | |
| integers[index] += 1 | |
| return integers | |
| def _ratio(numerator: int, denominator: int) -> float: | |
| if denominator <= 0: | |
| return 0.0 | |
| return numerator / denominator | |
| def _bounded(value: float, lower: float, upper: float) -> float: | |
| return min(max(value, lower), upper) | |
| def _severity_from_ratio(value: float, medium: float, high: float, critical: float) -> str: | |
| if value >= critical: | |
| return "critical" | |
| if value >= high: | |
| return "high" | |
| if value >= medium: | |
| return "medium" | |
| return "low" | |