Spaces:
Runtime error
Runtime error
import datetime | |
from typing import List | |
import numpy as np | |
import pandas as pd | |
import pytz | |
import wrds | |
try: | |
import exchange_calendars as tc | |
except: | |
print( | |
"Cannot import exchange_calendars.", | |
"If you are using python>=3.7, please install it.", | |
) | |
import trading_calendars as tc | |
print("Use trading_calendars instead for wrds processor.") | |
# from basic_processor import _Base | |
from meta.data_processors._base import _Base | |
pd.options.mode.chained_assignment = None | |
class Wrds(_Base): | |
# def __init__(self,if_offline=False): | |
# if not if_offline: | |
# self.db = wrds.Connection() | |
def __init__( | |
self, | |
data_source: str, | |
start_date: str, | |
end_date: str, | |
time_interval: str, | |
**kwargs, | |
): | |
super().__init__(data_source, start_date, end_date, time_interval, **kwargs) | |
if "if_offline" in kwargs.keys() and not kwargs["if_offline"]: | |
self.db = wrds.Connection() | |
def download_data( | |
self, | |
ticker_list: List[str], | |
if_save_tempfile=False, | |
filter_shares=0, | |
save_path: str = "./data/dataset.csv", | |
): | |
dates = self.get_trading_days(self.start_date, self.end_date) | |
print("Trading days: ") | |
print(dates) | |
first_time = True | |
empty = True | |
stock_set = tuple(ticker_list) | |
for i in dates: | |
x = self.data_fetch_wrds(i, stock_set, filter_shares, self.time_interval) | |
if not x[1]: | |
empty = False | |
dataset = x[0] | |
dataset = self.preprocess_to_ohlcv( | |
dataset, time_interval=(str(self.time_interval) + "S") | |
) | |
print("Data for date: " + i + " finished") | |
if first_time: | |
temp = dataset | |
first_time = False | |
else: | |
temp = pd.concat([temp, dataset]) | |
if if_save_tempfile: | |
temp.to_csv("./temp.csv") | |
if empty: | |
raise ValueError("Empty Data under input parameters!") | |
result = temp | |
result = result.sort_values(by=["time", "tic"]) | |
result = result.reset_index(drop=True) | |
self.dataframe = result | |
self.save_data(save_path) | |
print( | |
f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}" | |
) | |
def preprocess_to_ohlcv(self, df, time_interval="60S"): | |
df = df[["date", "time_m", "sym_root", "size", "price"]] | |
tic_list = np.unique(df["sym_root"].values) | |
final_df = None | |
first_time = True | |
for i in range(len(tic_list)): | |
tic = tic_list[i] | |
time_list = [] | |
temp_df = df[df["sym_root"] == tic] | |
for i in range(temp_df.shape[0]): | |
date = temp_df["date"].iloc[i] | |
time_m = temp_df["time_m"].iloc[i] | |
time = str(date) + " " + str(time_m) | |
try: | |
time = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S.%f") | |
except: | |
time = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S") | |
time_list.append(time) | |
temp_df["time"] = time_list | |
temp_df = temp_df.set_index("time") | |
data_ohlc = temp_df["price"].resample(time_interval).ohlc() | |
data_v = temp_df["size"].resample(time_interval).agg({"size": "sum"}) | |
volume = data_v["size"].values | |
data_ohlc["volume"] = volume | |
data_ohlc["tic"] = tic | |
if first_time: | |
final_df = data_ohlc.reset_index() | |
first_time = False | |
else: | |
final_df = final_df.append(data_ohlc.reset_index(), ignore_index=True) | |
return final_df | |
def clean_data(self): | |
df = self.dataframe[["time", "open", "high", "low", "close", "volume", "tic"]] | |
# remove 16:00 data | |
tic_list = np.unique(df["tic"].values) | |
ary = df.values | |
rows_1600 = [] | |
for i in range(ary.shape[0]): | |
row = ary[i] | |
time = row[0] | |
if str(time)[-8:] == "16:00:00": | |
rows_1600.append(i) | |
df = df.drop(rows_1600) | |
df = df.sort_values(by=["tic", "time"]) | |
# check missing rows | |
tic_dic = {tic: [0, 0] for tic in tic_list} | |
ary = df.values | |
for i in range(ary.shape[0]): | |
row = ary[i] | |
volume = row[5] | |
tic = row[6] | |
if volume != 0: | |
tic_dic[tic][0] += 1 | |
tic_dic[tic][1] += 1 | |
constant = np.unique(df["time"].values).shape[0] | |
nan_tics = [tic for tic, value in tic_dic.items() if value[1] != constant] | |
# fill missing rows | |
normal_time = np.unique(df["time"].values) | |
df2 = df.copy() | |
for tic in nan_tics: | |
tic_time = df[df["tic"] == tic]["time"].values | |
missing_time = [i for i in normal_time if i not in tic_time] | |
for time in missing_time: | |
temp_df = pd.DataFrame( | |
[[time, np.nan, np.nan, np.nan, np.nan, 0, tic]], | |
columns=[ | |
"time", | |
"open", | |
"high", | |
"low", | |
"close", | |
"volume", | |
"tic", | |
], | |
) | |
df2 = df2.append(temp_df, ignore_index=True) | |
# fill nan data | |
df = df2.sort_values(by=["tic", "time"]) | |
for i in range(df.shape[0]): | |
if float(df.iloc[i]["volume"]) == 0: | |
previous_close = df.iloc[i - 1]["close"] | |
if str(previous_close) == "nan": | |
raise ValueError("Error nan price") | |
df.iloc[i, 1] = previous_close | |
df.iloc[i, 2] = previous_close | |
df.iloc[i, 3] = previous_close | |
df.iloc[i, 4] = previous_close | |
# check if nan | |
ary = df[["open", "high", "low", "close", "volume"]].values | |
assert np.isnan(np.min(ary)) == False | |
# final preprocess | |
df = df[["time", "open", "high", "low", "close", "volume", "tic"]] | |
df = df.reset_index(drop=True) | |
print("Data clean finished") | |
self.dataframe = df | |
def get_trading_days(self, start, end): | |
nyse = tc.get_calendar("NYSE") | |
df = nyse.sessions_in_range( | |
pd.Timestamp(start, tz=pytz.UTC), pd.Timestamp(end, tz=pytz.UTC) | |
) | |
return [str(day)[:10] for day in df] | |
def data_fetch_wrds( | |
self, | |
date="2021-05-01", | |
stock_set=("AAPL"), | |
filter_shares=0, | |
time_interval=60, | |
): | |
# start_date, end_date should be in the same year | |
current_date = datetime.datetime.strptime(date, "%Y-%m-%d") | |
lib = "taqm_" + str(current_date.year) # taqm_2021 | |
table = "ctm_" + current_date.strftime("%Y%m%d") # ctm_20210501 | |
parm = {"syms": stock_set, "num_shares": filter_shares} | |
try: | |
data = self.db.raw_sql( | |
"select * from " | |
+ lib | |
+ "." | |
+ table | |
+ " where sym_root in %(syms)s and time_m between '9:30:00' and '16:00:00' and size > %(num_shares)s and sym_suffix is null", | |
params=parm, | |
) | |
if_empty = False | |
return data, if_empty | |
except: | |
print("Data for date: " + date + " error") | |
if_empty = True | |
return None, if_empty | |
# def add_technical_indicator(self, df, tech_indicator_list = [ | |
# 'macd', 'boll_ub', 'boll_lb', 'rsi_30', 'dx_30', | |
# 'close_30_sma', 'close_60_sma']): | |
# df = df.rename(columns={'time':'date'}) | |
# df = df.copy() | |
# df = df.sort_values(by=['tic', 'date']) | |
# stock = Sdf.retype(df.copy()) | |
# unique_ticker = stock.tic.unique() | |
# tech_indicator_list = tech_indicator_list | |
# | |
# for indicator in tech_indicator_list: | |
# indicator_df = pd.DataFrame() | |
# for i in range(len(unique_ticker)): | |
# # print(unique_ticker[i], i) | |
# temp_indicator = stock[stock.tic == unique_ticker[i]][indicator] | |
# temp_indicator = pd.DataFrame(temp_indicator) | |
# temp_indicator['tic'] = unique_ticker[i] | |
# # print(len(df[df.tic == unique_ticker[i]]['date'].to_list())) | |
# temp_indicator['date'] = df[df.tic == unique_ticker[i]]['date'].to_list() | |
# indicator_df = indicator_df.append( | |
# temp_indicator, ignore_index=True | |
# ) | |
# df = df.merge(indicator_df[['tic', 'date', indicator]], on=['tic', 'date'], how='left') | |
# df = df.sort_values(by=['date', 'tic']) | |
# print('Succesfully add technical indicators') | |
# return df | |
# def calculate_turbulence(self,data, time_period=252): | |
# # can add other market assets | |
# df = data.copy() | |
# df_price_pivot = df.pivot(index="date", columns="tic", values="close") | |
# # use returns to calculate turbulence | |
# df_price_pivot = df_price_pivot.pct_change() | |
# | |
# unique_date = df.date.unique() | |
# # start after a fixed time period | |
# start = time_period | |
# turbulence_index = [0] * start | |
# # turbulence_index = [0] | |
# count = 0 | |
# for i in range(start, len(unique_date)): | |
# current_price = df_price_pivot[df_price_pivot.index == unique_date[i]] | |
# # use one year rolling window to calcualte covariance | |
# hist_price = df_price_pivot[ | |
# (df_price_pivot.index < unique_date[i]) | |
# & (df_price_pivot.index >= unique_date[i - time_period]) | |
# ] | |
# # Drop tickers which has number missing values more than the "oldest" ticker | |
# filtered_hist_price = hist_price.iloc[hist_price.isna().sum().min():].dropna(axis=1) | |
# | |
# cov_temp = filtered_hist_price.cov() | |
# current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(filtered_hist_price, axis=0) | |
# temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot( | |
# current_temp.values.T | |
# ) | |
# if temp > 0: | |
# count += 1 | |
# if count > 2: | |
# turbulence_temp = temp[0][0] | |
# else: | |
# # avoid large outlier because of the calculation just begins | |
# turbulence_temp = 0 | |
# else: | |
# turbulence_temp = 0 | |
# turbulence_index.append(turbulence_temp) | |
# | |
# turbulence_index = pd.DataFrame( | |
# {"date": df_price_pivot.index, "turbulence": turbulence_index} | |
# ) | |
# return turbulence_index | |
# | |
# def add_turbulence(self,data, time_period=252): | |
# """ | |
# add turbulence index from a precalcualted dataframe | |
# :param data: (df) pandas dataframe | |
# :return: (df) pandas dataframe | |
# """ | |
# df = data.copy() | |
# turbulence_index = self.calculate_turbulence(df, time_period=time_period) | |
# df = df.merge(turbulence_index, on="date") | |
# df = df.sort_values(["date", "tic"]).reset_index(drop=True) | |
# return df | |
# def add_vix(self, data): | |
# vix_df = self.download_data(['vix'], self.start, self.end_date, self.time_interval) | |
# cleaned_vix = self.clean_data(vix_df) | |
# vix = cleaned_vix[['date','close']] | |
# | |
# df = data.copy() | |
# df = df.merge(vix, on="date") | |
# df = df.sort_values(["date", "tic"]).reset_index(drop=True) | |
# | |
# return df | |
# def df_to_array(self,df,tech_indicator_list): | |
# unique_ticker = df.tic.unique() | |
# print(unique_ticker) | |
# if_first_time = True | |
# for tic in unique_ticker: | |
# if if_first_time: | |
# price_array = df[df.tic==tic][['close']].values | |
# #price_ary = df[df.tic==tic]['close'].values | |
# tech_array = df[df.tic==tic][tech_indicator_list].values | |
# risk_array = df[df.tic==tic]['turbulence'].values | |
# if_first_time = False | |
# else: | |
# price_array = np.hstack([price_array, df[df.tic==tic][['close']].values]) | |
# tech_array = np.hstack([tech_array, df[df.tic==tic][tech_indicator_list].values]) | |
# print('Successfully transformed into array') | |
# return price_array, tech_array, risk_array | |