import numpy as np import pandas as pd import os import pickle import time from contextlib import contextmanager from importlib import reload import re from project_tools import project_config, project_utils, numerapi_utils import glob import matplotlib.pyplot as plt import seaborn as sns from random import randint, random import itertools import scipy from scipy.stats import ks_2samp from sklearn.metrics import log_loss, roc_auc_score, accuracy_score, mean_squared_error from sklearn.preprocessing import MinMaxScaler, StandardScaler from sklearn.pipeline import make_pipeline from sklearn import linear_model import datetime import json from collections import OrderedDict from os import listdir from os.path import isfile, join, isdir import glob import numerapi import itertools import io import requests from pathlib import Path from scipy.stats.mstats import gmean from typing import List, Dict napi = numerapi.NumerAPI() #verbosity="info") def get_time_string(): """ Generate a time string representation of the time of call of this function. :param None :return: a string that represent the time of the functional call. """ now = datetime.datetime.now() now = str(now.strftime('%Y%m%d%H%M')) return now def reload_project(): """ utility function used during experimentation to reload various model when required, useful for quick experiment iteration :return: None """ reload(project_config) reload(project_utils) reload(numerapi_utils) @contextmanager def timer(name): """ utility timer function to check how long a piece of code might take to run. :param name: name of the code fragment to be timed :yield: time taken for the code to run """ t0 = time.time() print('[%s] in progress' % name) yield print('[%s] done in %.6f s' %(name, time.time() - t0)) def load_data(pickle_file): """ load pickle data from file :param pickle_file: path of pickle data :return: data stored in pickle file """ load_file = open(pickle_file, 'rb') data = pickle.load(load_file) return data def pickle_data(path, data, protocol=-1, timestamp=False, verbose=True): """ Pickle data to specified file :param path: full path of file where data will be pickled to :param data: data to be pickled :param protocol: pickle protocol, -1 indicate to use the latest protocol :return: None """ file = path if timestamp: base_file = os.path.splitext(file)[0] time_str = '_' + get_time_string() ext = os.path.splitext(os.path.basename(file))[1] file = base_file + time_str + ext if verbose: print('creating file %s' % file) save_file = open(file, 'wb') pickle.dump(data, save_file, protocol=protocol) save_file.close() def save_json(path, data, timestamp=False, verbose=True, indent=2): """ Save data to Json format :param path: full path of file where data will be pickled to :param data: data to be pickled :param timestamp: if true, the timestamp will be saved as part of the file name :param verbose: if true, print information about file creation :param indent: specify the width of the indent in the resulted Json file :return: None """ file = path if timestamp: base_file = os.path.splitext(file)[0] time_str = '_' + get_time_string() ext = os.path.splitext(os.path.basename(file))[1] file = base_file + time_str + ext if verbose: print('creating file %s' % file) outfile = open(file, 'w') json.dump(data, outfile, indent=indent) outfile.close() def load_json(json_file): """ load data from Json file :param json_file: path of json file :return: data stored in json file as python dictionary """ load_file = open(json_file) data = json.load(load_file) load_file.close() return data def create_folder(path): Path(path).mkdir(parents=True, exist_ok=True) def glob_folder_filelist(path, file_type='', recursive=True): """ utility function that walk through a given directory, and return list of files in the directory :param path: the path of the directory :param file_type: if not '', this function would only consider the file type specified by this parameter :param recursive: if True, perform directory walk-fhrough recursively :return absfile: a list containing absolute path of each file in the directory :return base_files: a list containing base name of each file in the directory """ if path[-1] != '/': path = path +'/' abs_files = [] base_files = [] patrn = '**' if recursive else '*' glob_path = path + patrn matches = glob.glob(glob_path, recursive=recursive) for f in matches: if os.path.isfile(f): include = True if len(file_type)>0: ext = os.path.splitext(f)[1] if ext[1:] != file_type: include = False if include: abs_files.append(f) base_files.append(os.path.basename(f)) return abs_files, base_files def dir_compare(pathl, pathr): files_pathl = set([f for f in listdir(pathl) if isfile(join(pathl, f))]) files_pathr = set([f for f in listdir(pathr) if isfile(join(pathr, f))]) return list(files_pathl-files_pathr), list(files_pathr-files_pathl) def lr_dir_sync(pathl, pathr): files_lrddiff, files_rldiff = project_utils.dir_compare(pathl, pathr) for f in files_lrddiff: scr = pathl + f dst = pathr + f print('copying file %s' % scr) copyfile(scr, dst) def copy_file_with_time(src_file, dst_file_name, des_path): basename = os.path.splitext(os.path.basename(dst_file_name))[0] ext_name = os.path.splitext(os.path.basename(dst_file_name))[1] timestr = get_time_string() des_name = '%s%s_%s%s' % (des_path, basename, timestr, ext_name) # print(des_name) copyfile(src_file, des_name) def find_filesfromfolder(target_dir, containtext): absnames, basenames = glob_folder_filelist(target_dir) result_filelist = [] for absname, basename in zip(absnames, basenames): if containtext in basename: result_filelist.append(absname) # result_filelist = [f for f in total_filelist if containtext in f] return result_filelist def cp_files_with_prefix(src_path, dst_path, prefix, ext): abs_file_list, base_file_list = get_folder_filelist(src_path, file_type=ext) # print(abs_file_list) for src_file, base_file in zip(abs_file_list, base_file_list): dst_file = dst_path + prefix + base_file copyfile(src_file, dst_file) return None def mv_files_with_prefix(src_path, dst_path, prefix, ext): abs_file_list, base_file_list = get_folder_filelist(src_path, file_type=ext) # print(abs_file_list) for src_file, base_file in zip(abs_file_list, base_file_list): dst_file = dst_path + prefix + base_file move(src_file, dst_file) return None def empty_folder(path): if path[-1]!='*': path = path + '*' files = glob.glob(path) for f in files: os.remove(f) def rescale(n, range1, range2): if n>range1[1]: #or n= p).astype(np.uint8) acc = accuracy_score(label, preds) return acc, preds def np_pearson(t,p): vt = t - t.mean() vp = p - p.mean() top = np.sum(vt*vp) bottom = np.sqrt(np.sum(vt**2)) * np.sqrt(np.sum(vp**2)) res = top/bottom return res def df_get_features_with_str(df, ptrn): """ extract list of feature names from a data frame that contain the specified regular expression pattern :param df: the input dataframe of which features name to be analysed :param ptrn: the specified regular expression pattern :return: list of feature names that contained the specified regular expression """ return [col for col in df.columns.tolist() if len(re.findall(ptrn, col)) > 0] def df_fillna_with_other(df, src_feature, dst_feature): """ fill the NA values of a specified feature in a dataframe with values of another feature from the same row. :param df: the input dataframe :param src_feature: the specified feature of which NA value will be filled :param dst_feature: the feature of which values will be used :return: a dataframe with the specified feature's NA value being filled by values from the "dst_feature" """ src_vals = df[src_feature].values dst_vals = df[dst_feature].values argwhere_nan = np.argwhere(np.isnan(dst_vals)).flatten() dst_vals[argwhere_nan] = src_vals[argwhere_nan] df[dst_feature] = dst_vals return df def plot_prediction_prob(y_pred_prob): """ plot probability prediction values using histrogram :param y_pred_prob: the probability prediction values to be plotted :return: None, the plot will be plotted during the operation of the function. """ prob_series = pd.Series(data=y_pred_prob) prob_series.name = 'prediction probability' prob_series.plot(kind='hist', figsize=(15, 5), bins=50) plt.show() print(prob_series.describe()) def df_traintest_split(df, split_var, seed=None, train_ratio=0.75): """ perform train test split on a specified feature on a given dataframe wwith specified train ratio. Unique value of the specified feature will only present on either the resulted train or the test dataframe :param df: the input dataframe to be split :param split_var: the feature to be used as unique value to perform the split :param seed: the random used to facilitate the train test split :param train_ratio: the ratio of data to be split into the resulted train dataframe. :return train_df: the resulted train dataframe after the split :return test_df: the resulted test dataframe after the split """ sv_list = df[split_var].unique().tolist() train_length = int(len(sv_list) * train_ratio) train_siv_list = pd.Series(df[split_var].unique()).sample(train_length, random_state=seed) train_idx = df.loc[df[split_var].isin(train_siv_list)].index.values test_idx = df.iloc[df.index.difference(train_idx)].index.values train_df = df.loc[train_idx].copy().reset_index(drop=True) test_df = df.loc[test_idx].copy().reset_index(drop=True) return train_df, test_df # https://www.kaggle.com/gemartin/load-data-reduce-memory-usage def reduce_mem_usage(df, verbose=True, exceiptions=[]): """ iterate through all the columns of a dataframe and modify the data type to reduce memory usage. """ np_input = False if isinstance(df, np.ndarray): np_input = True df = pd.DataFrame(data=df) start_mem = df.memory_usage().sum() / 1024 ** 2 col_id = 0 print('Memory usage of dataframe is {:.2f} MB'.format(start_mem)) for col in df.columns: if verbose: print('doing %d: %s' % (col_id, col)) col_type = df[col].dtype try: if (col_type != object) & (col not in exceiptions): c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: # df[col] = df[col].astype(np.float16) # elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) # else: # df[col] = df[col].astype('category') # pass except: pass col_id += 1 end_mem = df.memory_usage().sum() / 1024 ** 2 print('Memory usage after optimization is: {:.2f} MB'.format(end_mem)) print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem)) if np_input: return df.values else: return df def get_xgb_featimp(model): imp_type = ['weight', 'gain', 'cover', 'total_gain', 'total_cover'] imp_dict = {} try: bst = model.get_booster() except: bst = model feature_names = bst.feature_names for impt in imp_type: imp_dict[impt] = [] scores = bst.get_score(importance_type=impt) for feature in feature_names: if feature in scores.keys(): imp_dict[impt].append(scores[feature]) else: imp_dict[impt].append(np.nan) imp_df = pd.DataFrame(index=bst.feature_names, data=imp_dict) return imp_df def get_df_rankavg(df): idx = df.index cols = df.columns.tolist() rankavg_dict = {} for col in cols: rankavg_dict[col]=df[col].rank(pct=True).tolist() rankavg_df = pd.DataFrame(index=idx, columns=cols, data=rankavg_dict) rankavg_df['rankavg'] = rankavg_df.mean(axis=1) return rankavg_df.sort_values(by='rankavg', ascending=False) def get_list_gmean(lists): out = np.zeros((len(lists[0]), len(lists))) for i in range(0, len(lists)): out[:,i] = lists[i] gmean_out = gmean(out, axis=1) return gmean_out def generate_nwise_combination(items, n=2): return list(itertools.combinations(items, n)) def pairwise_feature_generation(df, feature_list, operator='addition', verbose=True): feats_pair = generate_nwise_combination(feature_list, 2) result_df = pd.DataFrame() for pair in feats_pair: if verbose: print('generating %s of %s and %s' % (operator, pair[0], pair[1])) if operator == 'addition': feat_name = pair[0] + '_add_' + pair[1] result_df[feat_name] = df[pair[0]] + df[pair[1]] elif operator == 'multiplication': feat_name = pair[0] + '_mulp_' + pair[1] result_df[feat_name] = df[pair[0]] * df[pair[1]] elif operator == 'division': feat_name = pair[0] + '_div_' + pair[1] result_df[feat_name] = df[pair[0]] / df[pair[1]] return result_df def try_divide(x, y, val=0.0): """ try to perform division between two number, and return a default value if division by zero is detected :param x: the number to be used as dividend :param y: the number to be used as divisor :param val: the default output value :return: the output value, the default value of val will be returned if division by zero is detected """ if y != 0.0: val = float(x) / y return val def series_reverse_cumsum(a): return a.fillna(0).values[::-1].cumsum()[::-1] def get_array_sharpe(values): return values.mean()/values.std() #### NumerDash specific functions ### def calculate_rounddailysharpe_dashboard(df, lastround, earliest_round, score='corr'): if score=='corr': target = 'corr_sharpe' elif score == 'corr_pct': target = 'corr_pct_sharpe' elif score=='mmc': target = 'mmc_sharpe' elif score=='mmc_pct': target = 'mmc_pct_sharpe' elif score=='corrmmc': target = 'corrmmc_sharpe' elif score=='corr2mmc': target = 'corr2mmc_sharpe' elif score=='cmavg_pct': target = 'cmavgpct_sharpe' elif score=='c2mavg_pct': target = 'c2mavcpct_sharpe' mean_feat = 'avg_sharpe' sos_feat = 'sos' df = df[(df['roundNumber'] >= earliest_round) & (df['roundNumber'] <= lastround)] res = df.groupby(['model', 'roundNumber', 'group'])[score].apply( lambda x: get_array_sharpe(x)).reset_index(drop=False) res = res.rename(columns={score: target}).sort_values('roundNumber', ascending=False) res = res.pivot(index=['model', 'group'], columns='roundNumber', values=target) res.columns.name = '' cols = [i for i in res.columns[::-1]] res = res[cols] res[mean_feat] = res[cols].mean(axis=1) res[sos_feat] = res[cols].apply(lambda x: get_array_sharpe(x), axis=1) res = res.drop_duplicates(keep='first').sort_values(by=sos_feat, ascending=False) res.reset_index(drop=False, inplace=True) return res[['model', 'group', sos_feat, mean_feat]+cols] def groupby_agg_execution(agg_recipies, df, verbose=True): result_dfs = dict() for groupby_cols, features, aggs in agg_recipies: group_object = df.groupby(groupby_cols) groupby_key = '_'.join(groupby_cols) if groupby_key not in list(result_dfs.keys()): result_dfs[groupby_key] = pd.DataFrame() for feature in features: rename_col = feature for agg in aggs: if isinstance(agg, dict): agg_name = list(agg.keys())[0] agg_func = agg[agg_name] else: agg_name = agg agg_func = agg if agg_name=='count': groupby_aggregate_name = '{}_{}'.format(groupby_key, agg_name) else: groupby_aggregate_name = '{}_{}_{}'.format(groupby_key, feature, agg_name) verbose and print(f'generating statistic {groupby_aggregate_name}') groupby_res_df = group_object[feature].agg(agg_func).reset_index(drop=False) groupby_res_df = groupby_res_df.rename(columns={rename_col: groupby_aggregate_name}) if len(result_dfs[groupby_key]) == 0: result_dfs[groupby_key] = groupby_res_df else: result_dfs[groupby_key][groupby_aggregate_name] = groupby_res_df[groupby_aggregate_name] return result_dfs def get_latest_round_id(): try: all_competitions = numerapi_utils.get_competitions() latest_comp_id = all_competitions[0]['number'] except: print('calling api unsuccessulf, using downloaded data to get the latest round') local_data = load_data(project_config.DASHBOARD_MODEL_RESULT_FILE) latest_comp_id = local_data['roundNumber'].max() return int(latest_comp_id) # except: latest_round = get_latest_round_id() def update_numerati_data(url=project_config.NUMERATI_URL, save_path=project_config.FEATURE_PATH): content = requests.get(url).content data = pd.read_csv(io.StringIO(content.decode('utf-8'))) save_file = os.path.join(save_path, 'numerati_data.pkl') pickle_data(save_file, data) return data def get_model_group(model_name): cat_name = 'other' if model_name in project_config.MODEL_NAMES+project_config.NEW_MODEL_NAMES: cat_name = 'yx' elif model_name in project_config.TOP_LB: cat_name = 'top_corr' elif model_name in project_config.IAAI_MODELS: cat_name = 'iaai' elif model_name in project_config.ARBITRAGE_MODELS: cat_name = 'arbitrage' elif model_name in project_config.MCV_MODELS: cat_name = 'mcv' # elif model_name in project_config.MM_MODELS: # cat_name = 'mm' elif model_name in project_config.BENCHMARK_MODELS: cat_name = 'benchmark' elif model_name in project_config.TP3M: cat_name = 'top_3m' elif model_name in project_config.TP1Y: cat_name = 'top_1y' return cat_name def get_dashboard_data_status(): dashboard_data_tstr = 'NA' nmtd_tstr = 'NA' try: dashboard_data_t = datetime.datetime.utcfromtimestamp(os.path.getctime(project_config.DASHBOARD_MODEL_RESULT_FILE)) dashboard_data_tstr = dashboard_data_t.strftime(project_config.DATETIME_FORMAT2) except Exception as e: print(e) pass try: nmtd_t = datetime.datetime.utcfromtimestamp(os.path.getctime(project_config.NUMERATI_FILE)) nmtd_tstr = nmtd_t.strftime(project_config.DATETIME_FORMAT2) except Exception as e: print(e) pass return dashboard_data_tstr, nmtd_tstr