import datetime from typing import List import numpy as np import pandas as pd import pytz import wrds try: import exchange_calendars as tc except: print( "Cannot import exchange_calendars.", "If you are using python>=3.7, please install it.", ) import trading_calendars as tc print("Use trading_calendars instead for wrds processor.") # from basic_processor import _Base from meta.data_processors._base import _Base pd.options.mode.chained_assignment = None class Wrds(_Base): # def __init__(self,if_offline=False): # if not if_offline: # self.db = wrds.Connection() def __init__( self, data_source: str, start_date: str, end_date: str, time_interval: str, **kwargs, ): super().__init__(data_source, start_date, end_date, time_interval, **kwargs) if "if_offline" in kwargs.keys() and not kwargs["if_offline"]: self.db = wrds.Connection() def download_data( self, ticker_list: List[str], if_save_tempfile=False, filter_shares=0, save_path: str = "./data/dataset.csv", ): dates = self.get_trading_days(self.start_date, self.end_date) print("Trading days: ") print(dates) first_time = True empty = True stock_set = tuple(ticker_list) for i in dates: x = self.data_fetch_wrds(i, stock_set, filter_shares, self.time_interval) if not x[1]: empty = False dataset = x[0] dataset = self.preprocess_to_ohlcv( dataset, time_interval=(str(self.time_interval) + "S") ) print("Data for date: " + i + " finished") if first_time: temp = dataset first_time = False else: temp = pd.concat([temp, dataset]) if if_save_tempfile: temp.to_csv("./temp.csv") if empty: raise ValueError("Empty Data under input parameters!") result = temp result = result.sort_values(by=["time", "tic"]) result = result.reset_index(drop=True) self.dataframe = result self.save_data(save_path) print( f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}" ) def preprocess_to_ohlcv(self, df, time_interval="60S"): df = df[["date", "time_m", "sym_root", "size", "price"]] tic_list = np.unique(df["sym_root"].values) final_df = None first_time = True for i in range(len(tic_list)): tic = tic_list[i] time_list = [] temp_df = df[df["sym_root"] == tic] for i in range(temp_df.shape[0]): date = temp_df["date"].iloc[i] time_m = temp_df["time_m"].iloc[i] time = str(date) + " " + str(time_m) try: time = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S.%f") except: time = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S") time_list.append(time) temp_df["time"] = time_list temp_df = temp_df.set_index("time") data_ohlc = temp_df["price"].resample(time_interval).ohlc() data_v = temp_df["size"].resample(time_interval).agg({"size": "sum"}) volume = data_v["size"].values data_ohlc["volume"] = volume data_ohlc["tic"] = tic if first_time: final_df = data_ohlc.reset_index() first_time = False else: final_df = final_df.append(data_ohlc.reset_index(), ignore_index=True) return final_df def clean_data(self): df = self.dataframe[["time", "open", "high", "low", "close", "volume", "tic"]] # remove 16:00 data tic_list = np.unique(df["tic"].values) ary = df.values rows_1600 = [] for i in range(ary.shape[0]): row = ary[i] time = row[0] if str(time)[-8:] == "16:00:00": rows_1600.append(i) df = df.drop(rows_1600) df = df.sort_values(by=["tic", "time"]) # check missing rows tic_dic = {tic: [0, 0] for tic in tic_list} ary = df.values for i in range(ary.shape[0]): row = ary[i] volume = row[5] tic = row[6] if volume != 0: tic_dic[tic][0] += 1 tic_dic[tic][1] += 1 constant = np.unique(df["time"].values).shape[0] nan_tics = [tic for tic, value in tic_dic.items() if value[1] != constant] # fill missing rows normal_time = np.unique(df["time"].values) df2 = df.copy() for tic in nan_tics: tic_time = df[df["tic"] == tic]["time"].values missing_time = [i for i in normal_time if i not in tic_time] for time in missing_time: temp_df = pd.DataFrame( [[time, np.nan, np.nan, np.nan, np.nan, 0, tic]], columns=[ "time", "open", "high", "low", "close", "volume", "tic", ], ) df2 = df2.append(temp_df, ignore_index=True) # fill nan data df = df2.sort_values(by=["tic", "time"]) for i in range(df.shape[0]): if float(df.iloc[i]["volume"]) == 0: previous_close = df.iloc[i - 1]["close"] if str(previous_close) == "nan": raise ValueError("Error nan price") df.iloc[i, 1] = previous_close df.iloc[i, 2] = previous_close df.iloc[i, 3] = previous_close df.iloc[i, 4] = previous_close # check if nan ary = df[["open", "high", "low", "close", "volume"]].values assert np.isnan(np.min(ary)) == False # final preprocess df = df[["time", "open", "high", "low", "close", "volume", "tic"]] df = df.reset_index(drop=True) print("Data clean finished") self.dataframe = df def get_trading_days(self, start, end): nyse = tc.get_calendar("NYSE") df = nyse.sessions_in_range( pd.Timestamp(start, tz=pytz.UTC), pd.Timestamp(end, tz=pytz.UTC) ) return [str(day)[:10] for day in df] def data_fetch_wrds( self, date="2021-05-01", stock_set=("AAPL"), filter_shares=0, time_interval=60, ): # start_date, end_date should be in the same year current_date = datetime.datetime.strptime(date, "%Y-%m-%d") lib = "taqm_" + str(current_date.year) # taqm_2021 table = "ctm_" + current_date.strftime("%Y%m%d") # ctm_20210501 parm = {"syms": stock_set, "num_shares": filter_shares} try: data = self.db.raw_sql( "select * from " + lib + "." + table + " where sym_root in %(syms)s and time_m between '9:30:00' and '16:00:00' and size > %(num_shares)s and sym_suffix is null", params=parm, ) if_empty = False return data, if_empty except: print("Data for date: " + date + " error") if_empty = True return None, if_empty # def add_technical_indicator(self, df, tech_indicator_list = [ # 'macd', 'boll_ub', 'boll_lb', 'rsi_30', 'dx_30', # 'close_30_sma', 'close_60_sma']): # df = df.rename(columns={'time':'date'}) # df = df.copy() # df = df.sort_values(by=['tic', 'date']) # stock = Sdf.retype(df.copy()) # unique_ticker = stock.tic.unique() # tech_indicator_list = tech_indicator_list # # for indicator in tech_indicator_list: # indicator_df = pd.DataFrame() # for i in range(len(unique_ticker)): # # print(unique_ticker[i], i) # temp_indicator = stock[stock.tic == unique_ticker[i]][indicator] # temp_indicator = pd.DataFrame(temp_indicator) # temp_indicator['tic'] = unique_ticker[i] # # print(len(df[df.tic == unique_ticker[i]]['date'].to_list())) # temp_indicator['date'] = df[df.tic == unique_ticker[i]]['date'].to_list() # indicator_df = indicator_df.append( # temp_indicator, ignore_index=True # ) # df = df.merge(indicator_df[['tic', 'date', indicator]], on=['tic', 'date'], how='left') # df = df.sort_values(by=['date', 'tic']) # print('Succesfully add technical indicators') # return df # def calculate_turbulence(self,data, time_period=252): # # can add other market assets # df = data.copy() # df_price_pivot = df.pivot(index="date", columns="tic", values="close") # # use returns to calculate turbulence # df_price_pivot = df_price_pivot.pct_change() # # unique_date = df.date.unique() # # start after a fixed time period # start = time_period # turbulence_index = [0] * start # # turbulence_index = [0] # count = 0 # for i in range(start, len(unique_date)): # current_price = df_price_pivot[df_price_pivot.index == unique_date[i]] # # use one year rolling window to calcualte covariance # hist_price = df_price_pivot[ # (df_price_pivot.index < unique_date[i]) # & (df_price_pivot.index >= unique_date[i - time_period]) # ] # # Drop tickers which has number missing values more than the "oldest" ticker # filtered_hist_price = hist_price.iloc[hist_price.isna().sum().min():].dropna(axis=1) # # cov_temp = filtered_hist_price.cov() # current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(filtered_hist_price, axis=0) # temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot( # current_temp.values.T # ) # if temp > 0: # count += 1 # if count > 2: # turbulence_temp = temp[0][0] # else: # # avoid large outlier because of the calculation just begins # turbulence_temp = 0 # else: # turbulence_temp = 0 # turbulence_index.append(turbulence_temp) # # turbulence_index = pd.DataFrame( # {"date": df_price_pivot.index, "turbulence": turbulence_index} # ) # return turbulence_index # # def add_turbulence(self,data, time_period=252): # """ # add turbulence index from a precalcualted dataframe # :param data: (df) pandas dataframe # :return: (df) pandas dataframe # """ # df = data.copy() # turbulence_index = self.calculate_turbulence(df, time_period=time_period) # df = df.merge(turbulence_index, on="date") # df = df.sort_values(["date", "tic"]).reset_index(drop=True) # return df # def add_vix(self, data): # vix_df = self.download_data(['vix'], self.start, self.end_date, self.time_interval) # cleaned_vix = self.clean_data(vix_df) # vix = cleaned_vix[['date','close']] # # df = data.copy() # df = df.merge(vix, on="date") # df = df.sort_values(["date", "tic"]).reset_index(drop=True) # # return df # def df_to_array(self,df,tech_indicator_list): # unique_ticker = df.tic.unique() # print(unique_ticker) # if_first_time = True # for tic in unique_ticker: # if if_first_time: # price_array = df[df.tic==tic][['close']].values # #price_ary = df[df.tic==tic]['close'].values # tech_array = df[df.tic==tic][tech_indicator_list].values # risk_array = df[df.tic==tic]['turbulence'].values # if_first_time = False # else: # price_array = np.hstack([price_array, df[df.tic==tic][['close']].values]) # tech_array = np.hstack([tech_array, df[df.tic==tic][tech_indicator_list].values]) # print('Successfully transformed into array') # return price_array, tech_array, risk_array