Time_RCD / evaluation /metrics.py
Oliver Le
Initial commit
d03866e
import sys
import time
from .basic_metrics import basic_metricor, generate_curve
from statsmodels.tsa.stattools import acf
from scipy.signal import argrelextrema
import numpy as np
import multiprocessing
import multiprocessing
import numpy as np
import torch
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from functools import partial
from tqdm import tqdm
import time
# ============== Parallelized Affiliation ==============
def _compute_auc_roc(labels, score):
grader = basic_metricor()
try:
return grader.metric_ROC(labels, score)
except Exception:
return 0.0
def _compute_auc_pr(labels, score):
grader = basic_metricor()
try:
return grader.metric_PR(labels, score)
except Exception:
return 0.0
def _compute_vus(labels, score, slidingWindow, version):
try:
_, _, _, _, _, _, VUS_ROC, VUS_PR = generate_curve(labels.astype(int), score, slidingWindow, version)
return VUS_ROC, VUS_PR
except Exception:
return 0.0, 0.0
def _compute_pointf1(labels, score):
# print("Evaluating F1 standard...")
grader = basic_metricor()
try:
# print("Using chunked parallel F1 computation...")
return grader.metric_standard_F1_chunked(
true_labels=labels,
anomaly_scores=score,
chunk_size=25, # Process 25 thresholds per chunk
num_workers=4 # Use 4 parallel workers
)
except Exception:
# print("F1 standard computation failed, returning zeros.")
return {'F1': 0.0, 'Precision': 0.0, 'Recall': 0.0}
def _compute_pointf1pa(labels, score):
grader = basic_metricor()
try:
return grader.metric_PointF1PA_chunked(
label=labels,
score=score,
chunk_size=30, # Process 30 quantiles per chunk
num_workers=6 # Use 6 parallel workers
)
except Exception:
return {'F1_PA': 0.0, 'P_PA': 0.0, 'R_PA': 0.0}
def _compute_affiliation(labels, score):
grader = basic_metricor()
try:
return grader.metric_Affiliation(labels, score)
except Exception:
return 0.0, 0.0, 0.0
def _compute_t_score(labels, score):
grader = basic_metricor()
try:
return grader.metric_F1_T(labels, score)
except Exception:
return {'F1_T': 0.0, 'P_T': 0.0, 'R_T': 0.0}
def _compute_f1_t(labels, score):
grader = basic_metricor()
try:
# Use non-parallel path here to avoid pickling issues inside thread workers
# metric_F1_T(use_parallel=False) runs in-process and returns a dict
return grader.metric_F1_T(labels, score, use_parallel=True)
except Exception:
# Always return a dict to keep downstream code consistent
return {'F1_T': 0.0, 'P_T': 0.0, 'R_T': 0.0}
def _run_task(func, args):
return func(*args)
def get_metrics_optimized(score, labels, slidingWindow=100, pred=None, version='opt', thre=250):
"""
Fully optimized metrics computation with proper parallelization
"""
metrics = {}
start_total = time.time()
# Ensure proper data types to avoid float/integer issues
labels = np.asarray(labels, dtype=int)
score = np.asarray(score, dtype=float)
# Determine optimal number of workers based on CPU count and workload
n_cores = multiprocessing.cpu_count()
# For threshold-iterating functions (affiliation and F1_T)
# Use more workers since they have heavy loops
heavy_workers = min(n_cores - 2, 8) # Leave some cores for system
# For simple metrics
light_workers = min(n_cores // 2, 8)
print(f"Using {heavy_workers} workers for heavy metrics, {light_workers} for light metrics")
# Start the heavy computations first (they take longest)
print("Starting heavy computations (Affiliation and F1_T)...")
heavy_start = time.time()
grader = basic_metricor()
with ProcessPoolExecutor(max_workers=2) as main_executor:
# Launch the two heaviest computations with their own internal parallelization
affiliation_future = main_executor.submit(
grader._compute_affiliation_parallel,
labels,
score,
num_workers=heavy_workers
)
# t_score_future = main_executor.submit(
# grader.metric_F1_T_fast,
# labels,
# score,
# num_workers=heavy_workers*2
# )
#
# While heavy computations are running, compute light metrics
print("Computing light metrics in parallel...")
light_start = time.time()
with ThreadPoolExecutor(max_workers=light_workers) as light_executor:
light_futures = {
'auc_roc': light_executor.submit(_compute_auc_roc, labels, score),
'auc_pr': light_executor.submit(_compute_auc_pr, labels, score),
'vus': light_executor.submit(_compute_vus, labels, score, slidingWindow, version),
'pointf1': light_executor.submit(_compute_pointf1, labels, score),
'pointf1pa': light_executor.submit(_compute_pointf1pa, labels, score),
'f1_t': light_executor.submit(_compute_f1_t, labels, score)
}
# Collect light metric results as they complete
light_results = {}
for name, future in light_futures.items():
try:
light_results[name] = future.result()
print(f" ✓ {name} completed")
except Exception as e:
print(f" ✗ {name} failed: {e}")
light_results[name] = None
print(f"Light metrics completed in {time.time() - light_start:.2f}s")
# Wait for heavy computations to complete
print("Waiting for heavy computations...")
try:
Affiliation_F, Affiliation_P, Affiliation_R = affiliation_future.result()
print(f" ✓ Affiliation completed")
except Exception as e:
print(f" ✗ Affiliation failed: {e}")
Affiliation_F, Affiliation_P, Affiliation_R = 0.0, 0.0, 0.0
# try:
# T_score = t_score_future.result()
# print(f" ✓ F1_T completed")
# except Exception as e:
# print(f" ✗ F1_T failed: {e}")
# T_score = {'F1_T': 0.0, 'P_T': 0.0, 'R_T': 0.0}
print(f"Heavy metrics completed in {time.time() - heavy_start:.2f}s")
# Unpack light results
AUC_ROC = light_results.get('auc_roc', 0.0)
AUC_PR = light_results.get('auc_pr', 0.0)
VUS_result = light_results.get('vus', (0.0, 0.0))
if isinstance(VUS_result, tuple):
VUS_ROC, VUS_PR = VUS_result
else:
VUS_ROC, VUS_PR = 0.0, 0.0
# print("HERE IS POINTF1: ")
# print(light_results.get('pointf1',))
# sys.exit()
PointF1 = light_results.get('pointf1', {'F1': 0.0, 'Precision': 0.0, 'Recall': 0.0})
PointF1PA = light_results.get('pointf1pa', {'F1_PA': 0.0, 'P_PA': 0.0, 'R_PA': 0.0})
T_score = light_results.get('f1_t', {'F1_T': 0.0, 'P_T': 0.0, 'R_T': 0.0})
# Safeguard: if upstream returned a tuple (e.g., from an older fallback), coerce to dict
if isinstance(T_score, tuple):
try:
T_score = {'F1_T': T_score[0], 'P_T': T_score[1], 'R_T': T_score[2]}
except Exception:
T_score = {'F1_T': 0.0, 'P_T': 0.0, 'R_T': 0.0}
# Build final metrics dictionary
metrics['AUC-PR'] = AUC_PR
metrics['AUC-ROC'] = AUC_ROC
metrics['VUS-PR'] = VUS_PR
metrics['VUS-ROC'] = VUS_ROC
metrics['Standard-F1'] = PointF1.get('F1', 0.0)
metrics['Standard-Precision'] = PointF1.get('Precision', 0.0)
metrics['Standard-Recall'] = PointF1.get('Recall', 0.0)
metrics['PA-F1'] = PointF1PA.get('F1_PA', 0.0)
metrics['PA-Precision'] = PointF1PA.get('P_PA', 0.0)
metrics['PA-Recall'] = PointF1PA.get('R_PA', 0.0)
metrics['Affiliation-F'] = Affiliation_F
metrics['Affiliation-P'] = Affiliation_P
metrics['Affiliation-R'] = Affiliation_R
metrics['F1_T'] = T_score.get('F1_T', 0.0)
metrics['Precision_T'] = T_score.get('P_T', 0.0)
metrics['Recall_T'] = T_score.get('R_T', 0.0)
print(f"\nTotal computation time: {time.time() - start_total:.2f}s")
return metrics
def get_metrics(score, labels, slidingWindow=100, pred=None, version='opt', thre=250):
metrics = {}
# Ensure proper data types to avoid float/integer issues
labels = np.asarray(labels, dtype=int)
score = np.asarray(score, dtype=float)
'''
Threshold Independent
'''
grader = basic_metricor()
# AUC_ROC, Precision, Recall, PointF1, PointF1PA, Rrecall, ExistenceReward, OverlapReward, Rprecision, RF, Precision_at_k = grader.metric_new(labels, score, pred, plot_ROC=False)
try:
AUC_ROC = grader.metric_ROC(labels, score)
except Exception:
AUC_ROC = 0.0
try:
AUC_PR = grader.metric_PR(labels, score)
except Exception:
AUC_PR = 0.0
# R_AUC_ROC, R_AUC_PR, _, _, _ = grader.RangeAUC(labels=labels, score=score, window=slidingWindow, plot_ROC=True)
try:
_, _, _, _, _, _,VUS_ROC, VUS_PR = generate_curve(labels.astype(int), score, slidingWindow, version, )
except Exception:
VUS_ROC, VUS_PR = 0.0, 0.0
'''
Threshold Dependent
if pred is None --> use the oracle threshold
'''
PointF1 = grader.metric_standard_F1(labels, score,)
PointF1PA = grader.metric_PointF1PA(labels, score,)
# EventF1PA = grader.metric_EventF1PA(labels, score,)
# RF1 = grader.metric_RF1(labels, score,)
try:
Affiliation_F, Affiliation_P, Affiliation_R = grader.metric_Affiliation(labels, score)
except Exception:
Affiliation_F, Affiliation_P, Affiliation_R = 0.0, 0.0, 0.0
T_score = grader.metric_F1_T(labels, score)
metrics['AUC-PR'] = AUC_PR
metrics['AUC-ROC'] = AUC_ROC
metrics['VUS-PR'] = VUS_PR
metrics['VUS-ROC'] = VUS_ROC
metrics['Standard-F1'] = PointF1['F1']
metrics['Standard-Precision'] = PointF1['Precision']
metrics['Standard-Recall'] = PointF1['Recall']
metrics['PA-F1'] = PointF1PA['F1_PA']
metrics['PA-Precision'] = PointF1PA['P_PA']
metrics['PA-Recall'] = PointF1PA['R_PA']
# metrics['Event-based-F1'] = EventF1PA
# metrics['R-based-F1'] = RF1
metrics['Affiliation-F'] = Affiliation_F
metrics['Affiliation-P'] = Affiliation_P
metrics['Affiliation-R'] = Affiliation_R
metrics['F1_T'] = T_score['F1_T']
metrics['Precision_T'] = T_score['P_T']
metrics['Recall_T'] = T_score['R_T']
return metrics
def get_metrics_pred(score, labels, pred, slidingWindow=100):
metrics = {}
# Ensure proper data types to avoid float/integer issues
labels = np.asarray(labels, dtype=int)
score = np.asarray(score, dtype=float)
pred = np.asarray(pred, dtype=int)
grader = basic_metricor()
PointF1 = grader.standard_F1(labels, score, preds=pred)
PointF1PA = grader.metric_PointF1PA(labels, score, preds=pred)
EventF1PA = grader.metric_EventF1PA(labels, score, preds=pred)
RF1 = grader.metric_RF1(labels, score, preds=pred)
Affiliation_F, Affiliation_P, Affiliation_R = grader.metric_Affiliation(labels, score, preds=pred)
VUS_R, VUS_P, VUS_F = grader.metric_VUS_pred(labels, preds=pred, windowSize=slidingWindow)
metrics['Standard-F1'] = PointF1['F1']
metrics['Standard-Precision'] = PointF1['Precision']
metrics['Standard-Recall'] = PointF1['Recall']
metrics['PA-F1'] = PointF1PA
metrics['Event-based-F1'] = EventF1PA
metrics['R-based-F1'] = RF1
metrics['Affiliation-F'] = Affiliation_F
metrics['Affiliation-P'] = Affiliation_P
metrics['Affiliation-R'] = Affiliation_R
metrics['VUS-Recall'] = VUS_R
metrics['VUS-Precision'] = VUS_P
metrics['VUS-F'] = VUS_F
return metrics
def find_length_rank(data, rank=1):
data = data.squeeze()
if len(data.shape) > 1:
return 0
if rank == 0:
return 1
data = data[: min(20000, len(data))]
base = 3
auto_corr = acf(data, nlags=400, fft=True)[base:]
# plot_acf(data, lags=400, fft=True)
# plt.xlabel('Lags')
# plt.ylabel('Autocorrelation')
# plt.title('Autocorrelation Function (ACF)')
# plt.savefig('/data/liuqinghua/code/ts/TSAD-AutoML/AutoAD_Solution/candidate_pool/cd_diagram/ts_acf.png')
local_max = argrelextrema(auto_corr, np.greater)[0]
# print('auto_corr: ', auto_corr)
# print('local_max: ', local_max)
try:
# max_local_max = np.argmax([auto_corr[lcm] for lcm in local_max])
sorted_local_max = np.argsort([auto_corr[lcm] for lcm in local_max])[::-1] # Ascending order
max_local_max = sorted_local_max[0] # Default
if rank == 1:
max_local_max = sorted_local_max[0]
if rank == 2:
for i in sorted_local_max[1:]:
if i > sorted_local_max[0]:
max_local_max = i
break
if rank == 3:
id_tmp = 1
for i in sorted_local_max[1:]:
if i > sorted_local_max[0]:
id_tmp = i
break
for i in sorted_local_max[id_tmp:]:
if i > sorted_local_max[id_tmp]:
max_local_max = i
break
# print('sorted_local_max: ', sorted_local_max)
# print('max_local_max: ', max_local_max)
if local_max[max_local_max] < 3 or local_max[max_local_max] > 300:
return 125
return local_max[max_local_max] + base
except Exception:
return 125