AFML / afml /sampling /concurrent.py
akshayboora's picture
Upload 940 files
669d6a1 verified
"""
Logic regarding concurrent labels from chapter 4.
"""
import pandas as pd
from afml.util.multiprocess import mp_pandas_obj
def num_concurrent_events(close_series_index, label_endtime, molecule):
"""
Advances in Financial Machine Learning, Snippet 4.1, page 60.
Estimating the Uniqueness of a Label
This function uses close series prices and label endtime (when the first barrier is touched)
to compute the number of concurrent events per bar.
:param close_series_index: (pd.Series) Close prices index
:param label_endtime: (pd.Series) Label endtime series (t1 for triple barrier events)
:param molecule: (an array) A set of datetime index values for processing
:return: (pd.Series) Number concurrent labels for each datetime index
"""
# Find events that span the period [molecule[0], molecule[1]]
label_endtime = label_endtime.fillna(
close_series_index[-1]
) # Unclosed events still must impact other weights
label_endtime = label_endtime[
label_endtime >= molecule[0]
] # Events that end at or after molecule[0]
# Events that start at or before t1[molecule].max()
label_endtime = label_endtime.loc[: label_endtime[molecule].max()]
# Count events spanning a bar
nearest_index = close_series_index.searchsorted(
pd.DatetimeIndex([label_endtime.index[0], label_endtime.max()])
)
count = pd.Series(
0, index=close_series_index[nearest_index[0] : nearest_index[1] + 1]
)
for t_in, t_out in label_endtime.items():
count.loc[t_in:t_out] += 1
return count.loc[molecule[0] : label_endtime[molecule].max()]
def _get_average_uniqueness(label_endtime, num_conc_events, molecule):
"""
Advances in Financial Machine Learning, Snippet 4.2, page 62.
Estimating the Average Uniqueness of a Label
This function uses close series prices and label endtime (when the first barrier is touched)
to compute the number of concurrent events per bar.
:param label_endtime: (pd.Series) Label endtime series (t1 for triple barrier events)
:param num_conc_events: (pd.Series) Number of concurrent labels (output from num_concurrent_events function).
:param molecule: (an array) A set of datetime index values for processing.
:return: (pd.Series) Average uniqueness over event's lifespan.
"""
wght = {}
for t_in, t_out in label_endtime.loc[molecule].items():
wght[t_in] = (1.0 / num_conc_events.loc[t_in:t_out]).mean()
wght = pd.Series(wght)
return wght
def get_num_conc_events(events, close, num_threads=4, verbose=True):
num_conc_events = mp_pandas_obj(
num_concurrent_events,
("molecule", events.index),
num_threads,
close_series_index=close.index,
label_endtime=events["t1"],
verbose=verbose,
)
return num_conc_events
def get_av_uniqueness_from_triple_barrier(
triple_barrier_events, close_series, num_threads, num_conc_events=None, verbose=True
):
"""
This function is the orchestrator to derive average sample uniqueness from a dataset labeled by the triple barrier
method.
:param triple_barrier_events: (pd.DataFrame) Events from labeling.get_events()
:param close_series: (pd.Series) Close prices.
:param num_threads: (int) The number of threads concurrently used by the function.
:param num_conc_events: (pd.Series) Number concurrent labels for each datetime index
:param verbose: (bool) Flag to report progress on asynch jobs
:return: (pd.Series) Average uniqueness over event's lifespan for each index in triple_barrier_events
"""
out = pd.DataFrame()
# Create processing pipeline for num_conc_events
def process_concurrent_events(ce):
"""Process concurrent events to ensure proper format and indexing."""
ce = ce.loc[~ce.index.duplicated(keep="last")]
ce = ce.reindex(close_series.index).fillna(0)
if isinstance(ce, pd.Series):
ce = ce.to_frame()
return ce
# Handle num_conc_events (whether provided or computed)
if num_conc_events is None:
num_conc_events = get_num_conc_events(
triple_barrier_events, close_series, num_threads, verbose
)
processed_ce = process_concurrent_events(num_conc_events)
else:
# Ensure precomputed value matches expected format
processed_ce = process_concurrent_events(num_conc_events.copy())
# Verify index compatibility
missing_in_close = processed_ce.index.difference(close_series.index)
assert missing_in_close.empty, (
f"num_conc_events contains {len(missing_in_close)} indices not in close_series"
)
out["tW"] = mp_pandas_obj(
_get_average_uniqueness,
("molecule", triple_barrier_events.index),
num_threads,
label_endtime=triple_barrier_events["t1"],
num_conc_events=processed_ce,
verbose=verbose,
)
return out