kristada673's picture
Upload 19 files
de6e775
import datetime
from typing import List
import numpy as np
import pandas as pd
import pytz
import wrds
try:
import exchange_calendars as tc
except:
print(
"Cannot import exchange_calendars.",
"If you are using python>=3.7, please install it.",
)
import trading_calendars as tc
print("Use trading_calendars instead for wrds processor.")
# from basic_processor import _Base
from meta.data_processors._base import _Base
pd.options.mode.chained_assignment = None
class Wrds(_Base):
# def __init__(self,if_offline=False):
# if not if_offline:
# self.db = wrds.Connection()
def __init__(
self,
data_source: str,
start_date: str,
end_date: str,
time_interval: str,
**kwargs,
):
super().__init__(data_source, start_date, end_date, time_interval, **kwargs)
if "if_offline" in kwargs.keys() and not kwargs["if_offline"]:
self.db = wrds.Connection()
def download_data(
self,
ticker_list: List[str],
if_save_tempfile=False,
filter_shares=0,
save_path: str = "./data/dataset.csv",
):
dates = self.get_trading_days(self.start_date, self.end_date)
print("Trading days: ")
print(dates)
first_time = True
empty = True
stock_set = tuple(ticker_list)
for i in dates:
x = self.data_fetch_wrds(i, stock_set, filter_shares, self.time_interval)
if not x[1]:
empty = False
dataset = x[0]
dataset = self.preprocess_to_ohlcv(
dataset, time_interval=(str(self.time_interval) + "S")
)
print("Data for date: " + i + " finished")
if first_time:
temp = dataset
first_time = False
else:
temp = pd.concat([temp, dataset])
if if_save_tempfile:
temp.to_csv("./temp.csv")
if empty:
raise ValueError("Empty Data under input parameters!")
result = temp
result = result.sort_values(by=["time", "tic"])
result = result.reset_index(drop=True)
self.dataframe = result
self.save_data(save_path)
print(
f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}"
)
def preprocess_to_ohlcv(self, df, time_interval="60S"):
df = df[["date", "time_m", "sym_root", "size", "price"]]
tic_list = np.unique(df["sym_root"].values)
final_df = None
first_time = True
for i in range(len(tic_list)):
tic = tic_list[i]
time_list = []
temp_df = df[df["sym_root"] == tic]
for i in range(temp_df.shape[0]):
date = temp_df["date"].iloc[i]
time_m = temp_df["time_m"].iloc[i]
time = str(date) + " " + str(time_m)
try:
time = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S.%f")
except:
time = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
time_list.append(time)
temp_df["time"] = time_list
temp_df = temp_df.set_index("time")
data_ohlc = temp_df["price"].resample(time_interval).ohlc()
data_v = temp_df["size"].resample(time_interval).agg({"size": "sum"})
volume = data_v["size"].values
data_ohlc["volume"] = volume
data_ohlc["tic"] = tic
if first_time:
final_df = data_ohlc.reset_index()
first_time = False
else:
final_df = final_df.append(data_ohlc.reset_index(), ignore_index=True)
return final_df
def clean_data(self):
df = self.dataframe[["time", "open", "high", "low", "close", "volume", "tic"]]
# remove 16:00 data
tic_list = np.unique(df["tic"].values)
ary = df.values
rows_1600 = []
for i in range(ary.shape[0]):
row = ary[i]
time = row[0]
if str(time)[-8:] == "16:00:00":
rows_1600.append(i)
df = df.drop(rows_1600)
df = df.sort_values(by=["tic", "time"])
# check missing rows
tic_dic = {tic: [0, 0] for tic in tic_list}
ary = df.values
for i in range(ary.shape[0]):
row = ary[i]
volume = row[5]
tic = row[6]
if volume != 0:
tic_dic[tic][0] += 1
tic_dic[tic][1] += 1
constant = np.unique(df["time"].values).shape[0]
nan_tics = [tic for tic, value in tic_dic.items() if value[1] != constant]
# fill missing rows
normal_time = np.unique(df["time"].values)
df2 = df.copy()
for tic in nan_tics:
tic_time = df[df["tic"] == tic]["time"].values
missing_time = [i for i in normal_time if i not in tic_time]
for time in missing_time:
temp_df = pd.DataFrame(
[[time, np.nan, np.nan, np.nan, np.nan, 0, tic]],
columns=[
"time",
"open",
"high",
"low",
"close",
"volume",
"tic",
],
)
df2 = df2.append(temp_df, ignore_index=True)
# fill nan data
df = df2.sort_values(by=["tic", "time"])
for i in range(df.shape[0]):
if float(df.iloc[i]["volume"]) == 0:
previous_close = df.iloc[i - 1]["close"]
if str(previous_close) == "nan":
raise ValueError("Error nan price")
df.iloc[i, 1] = previous_close
df.iloc[i, 2] = previous_close
df.iloc[i, 3] = previous_close
df.iloc[i, 4] = previous_close
# check if nan
ary = df[["open", "high", "low", "close", "volume"]].values
assert np.isnan(np.min(ary)) == False
# final preprocess
df = df[["time", "open", "high", "low", "close", "volume", "tic"]]
df = df.reset_index(drop=True)
print("Data clean finished")
self.dataframe = df
def get_trading_days(self, start, end):
nyse = tc.get_calendar("NYSE")
df = nyse.sessions_in_range(
pd.Timestamp(start, tz=pytz.UTC), pd.Timestamp(end, tz=pytz.UTC)
)
return [str(day)[:10] for day in df]
def data_fetch_wrds(
self,
date="2021-05-01",
stock_set=("AAPL"),
filter_shares=0,
time_interval=60,
):
# start_date, end_date should be in the same year
current_date = datetime.datetime.strptime(date, "%Y-%m-%d")
lib = "taqm_" + str(current_date.year) # taqm_2021
table = "ctm_" + current_date.strftime("%Y%m%d") # ctm_20210501
parm = {"syms": stock_set, "num_shares": filter_shares}
try:
data = self.db.raw_sql(
"select * from "
+ lib
+ "."
+ table
+ " where sym_root in %(syms)s and time_m between '9:30:00' and '16:00:00' and size > %(num_shares)s and sym_suffix is null",
params=parm,
)
if_empty = False
return data, if_empty
except:
print("Data for date: " + date + " error")
if_empty = True
return None, if_empty
# def add_technical_indicator(self, df, tech_indicator_list = [
# 'macd', 'boll_ub', 'boll_lb', 'rsi_30', 'dx_30',
# 'close_30_sma', 'close_60_sma']):
# df = df.rename(columns={'time':'date'})
# df = df.copy()
# df = df.sort_values(by=['tic', 'date'])
# stock = Sdf.retype(df.copy())
# unique_ticker = stock.tic.unique()
# tech_indicator_list = tech_indicator_list
#
# for indicator in tech_indicator_list:
# indicator_df = pd.DataFrame()
# for i in range(len(unique_ticker)):
# # print(unique_ticker[i], i)
# temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]
# temp_indicator = pd.DataFrame(temp_indicator)
# temp_indicator['tic'] = unique_ticker[i]
# # print(len(df[df.tic == unique_ticker[i]]['date'].to_list()))
# temp_indicator['date'] = df[df.tic == unique_ticker[i]]['date'].to_list()
# indicator_df = indicator_df.append(
# temp_indicator, ignore_index=True
# )
# df = df.merge(indicator_df[['tic', 'date', indicator]], on=['tic', 'date'], how='left')
# df = df.sort_values(by=['date', 'tic'])
# print('Succesfully add technical indicators')
# return df
# def calculate_turbulence(self,data, time_period=252):
# # can add other market assets
# df = data.copy()
# df_price_pivot = df.pivot(index="date", columns="tic", values="close")
# # use returns to calculate turbulence
# df_price_pivot = df_price_pivot.pct_change()
#
# unique_date = df.date.unique()
# # start after a fixed time period
# start = time_period
# turbulence_index = [0] * start
# # turbulence_index = [0]
# count = 0
# for i in range(start, len(unique_date)):
# current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]
# # use one year rolling window to calcualte covariance
# hist_price = df_price_pivot[
# (df_price_pivot.index < unique_date[i])
# & (df_price_pivot.index >= unique_date[i - time_period])
# ]
# # Drop tickers which has number missing values more than the "oldest" ticker
# filtered_hist_price = hist_price.iloc[hist_price.isna().sum().min():].dropna(axis=1)
#
# cov_temp = filtered_hist_price.cov()
# current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(filtered_hist_price, axis=0)
# temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(
# current_temp.values.T
# )
# if temp > 0:
# count += 1
# if count > 2:
# turbulence_temp = temp[0][0]
# else:
# # avoid large outlier because of the calculation just begins
# turbulence_temp = 0
# else:
# turbulence_temp = 0
# turbulence_index.append(turbulence_temp)
#
# turbulence_index = pd.DataFrame(
# {"date": df_price_pivot.index, "turbulence": turbulence_index}
# )
# return turbulence_index
#
# def add_turbulence(self,data, time_period=252):
# """
# add turbulence index from a precalcualted dataframe
# :param data: (df) pandas dataframe
# :return: (df) pandas dataframe
# """
# df = data.copy()
# turbulence_index = self.calculate_turbulence(df, time_period=time_period)
# df = df.merge(turbulence_index, on="date")
# df = df.sort_values(["date", "tic"]).reset_index(drop=True)
# return df
# def add_vix(self, data):
# vix_df = self.download_data(['vix'], self.start, self.end_date, self.time_interval)
# cleaned_vix = self.clean_data(vix_df)
# vix = cleaned_vix[['date','close']]
#
# df = data.copy()
# df = df.merge(vix, on="date")
# df = df.sort_values(["date", "tic"]).reset_index(drop=True)
#
# return df
# def df_to_array(self,df,tech_indicator_list):
# unique_ticker = df.tic.unique()
# print(unique_ticker)
# if_first_time = True
# for tic in unique_ticker:
# if if_first_time:
# price_array = df[df.tic==tic][['close']].values
# #price_ary = df[df.tic==tic]['close'].values
# tech_array = df[df.tic==tic][tech_indicator_list].values
# risk_array = df[df.tic==tic]['turbulence'].values
# if_first_time = False
# else:
# price_array = np.hstack([price_array, df[df.tic==tic][['close']].values])
# tech_array = np.hstack([tech_array, df[df.tic==tic][tech_indicator_list].values])
# print('Successfully transformed into array')
# return price_array, tech_array, risk_array