Spaces:
Configuration error
Configuration error
| import pandas as pd | |
| import numpy as np | |
| from datetime import timedelta | |
| from typing import List | |
| from src.utils import logger | |
| def ar1_process(n: int, mean: float, std: float, phi: float = 0.8, seed_offset: int = 0) -> np.ndarray: | |
| """Generate AR(1) time-series. | |
| Args: | |
| n (int): Number of samples. | |
| mean (float): Mean. | |
| std (float): Standard deviation. | |
| phi (float): Autocorrelation coefficient. | |
| seed_offset (int): Seed offset for reproducibility. | |
| Returns: | |
| np.ndarray: Generated series. | |
| """ | |
| np.random.seed(42 + seed_offset) | |
| x = np.zeros(n) | |
| x[0] = np.random.normal(mean, std) | |
| for t in range(1, n): | |
| x[t] = phi * x[t-1] + np.random.normal(0, std * np.sqrt(1 - phi**2)) | |
| return np.clip(x, 0, None) | |
| def generate_data() -> None: | |
| """Generate synthetic RPC metrics for all nodes.""" | |
| try: | |
| np.random.seed(42) | |
| n_samples = 1000 | |
| start_time = pd.Timestamp("2025-10-01 00:00:00") | |
| timestamps = pd.date_range(start_time, periods=n_samples, freq="1min") | |
| nodes = ["agave1", "agave2", "firedancer1", "firedancer2"] | |
| node_params = { | |
| "agave1": {"cpu_mean": 45, "cpu_var": 8, "latency_mean": 60, "latency_var": 10, "error_var": 0.005}, | |
| "agave2": {"cpu_mean": 48, "cpu_var": 9, "latency_mean": 65, "latency_var": 12, "error_var": 0.006}, | |
| "firedancer1": {"cpu_mean": 55, "cpu_var": 15, "latency_mean": 50, "latency_var": 20, "error_var": 0.01}, | |
| "firedancer2": {"cpu_mean": 52, "cpu_var": 12, "latency_mean": 55, "latency_var": 18, "error_var": 0.009} | |
| } | |
| phi = 0.8 | |
| n_ramps_per_node = 8 | |
| ramp_length = 20 | |
| all_data: List[pd.DataFrame] = [] | |
| for node in nodes: | |
| params = node_params[node] | |
| cpu_base = ar1_process(n_samples, params["cpu_mean"], params["cpu_var"], phi, nodes.index(node) * 10) | |
| latency_base = ar1_process(n_samples, params["latency_mean"], params["latency_var"], phi, nodes.index(node) * 20) | |
| error_base = np.abs(ar1_process(n_samples, 0.02, params["error_var"], phi, nodes.index(node) * 30)) | |
| mem_base = ar1_process(n_samples, 50, 10, phi, nodes.index(node) * 40) | |
| disk_base = ar1_process(n_samples, 40, 12, phi, nodes.index(node) * 50) | |
| block_gap_base = np.random.choice([0, 1], size=n_samples, p=[0.75, 0.25]) | |
| ramp_starts = sorted(np.random.choice(n_samples - ramp_length, n_ramps_per_node, replace=False)) | |
| cpu = cpu_base.copy() | |
| latency = latency_base.copy() | |
| error_rate = error_base.copy() | |
| mem = mem_base.copy() | |
| disk = disk_base.copy() | |
| block_gap = block_gap_base.copy() | |
| labels = np.zeros(n_samples, dtype=int) | |
| for start in ramp_starts: | |
| ramp_cpu = np.linspace(0, 45, ramp_length) | |
| ramp_latency = np.linspace(0, 250, ramp_length) | |
| ramp_error = np.linspace(0, 0.4, ramp_length) | |
| ramp_mem = np.linspace(0, 30, ramp_length) | |
| ramp_disk = np.linspace(0, 150, ramp_length) | |
| ramp_gap = np.full(ramp_length, 5) | |
| cpu[start:start+ramp_length] += ramp_cpu | |
| latency[start:start+ramp_length] += ramp_latency | |
| error_rate[start:start+ramp_length] += ramp_error | |
| mem[start:start+ramp_length] += ramp_mem | |
| disk[start:start+ramp_length] += ramp_disk | |
| block_gap[start:start+ramp_length] = np.maximum(block_gap[start:start+ramp_length], ramp_gap) | |
| labels[start:start+ramp_length] = 1 | |
| latency = 40 + cpu * 1.5 + error_rate * 200 + np.random.normal(0, 5, n_samples) | |
| latency = np.clip(latency, 20, 1000) | |
| node_data = [] | |
| for i, t in enumerate(timestamps): | |
| node_data.append([ | |
| t, node, | |
| round(cpu[i], 2), round(mem[i], 2), round(disk[i], 2), | |
| round(latency[i], 2), round(error_rate[i], 3), | |
| int(block_gap[i]), labels[i] | |
| ]) | |
| node_df = pd.DataFrame(node_data, columns=[ | |
| "timestamp", "node", "cpu_usage", "memory_usage", "disk_io", | |
| "rpc_latency_ms", "rpc_error_rate", "block_height_gap", "failure_imminent" | |
| ]) | |
| node_filename = f"data/raw/{node}_metrics.csv" | |
| node_df.to_csv(node_filename, index=False) | |
| all_data.append(node_df) | |
| combined_df = pd.concat(all_data, ignore_index=True) | |
| combined_df = combined_df.sort_values(['timestamp', 'node']).reset_index(drop=True) | |
| combined_df.to_csv("data/raw/synthetic_rpc_metrics_realistic.csv", index=False) | |
| logger.info("Synthetic data generated.") | |
| except Exception as e: | |
| logger.error(f"Error generating data: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| generate_data() |