Spaces:
Runtime error
Runtime error
from typing import List | |
import alpaca_trade_api as tradeapi | |
import numpy as np | |
import pandas as pd | |
import pytz | |
try: | |
import exchange_calendars as tc | |
except: | |
print( | |
"Cannot import exchange_calendars.", | |
"If you are using python>=3.7, please install it.", | |
) | |
import trading_calendars as tc | |
print("Use trading_calendars instead for alpaca processor.") | |
# from basic_processor import _Base | |
from meta.data_processors._base import _Base | |
from meta.data_processors._base import calc_time_zone | |
from meta.config import ( | |
TIME_ZONE_SHANGHAI, | |
TIME_ZONE_USEASTERN, | |
TIME_ZONE_PARIS, | |
TIME_ZONE_BERLIN, | |
TIME_ZONE_JAKARTA, | |
TIME_ZONE_SELFDEFINED, | |
USE_TIME_ZONE_SELFDEFINED, | |
BINANCE_BASE_URL, | |
) | |
class Alpaca(_Base): | |
# def __init__(self, API_KEY=None, API_SECRET=None, API_BASE_URL=None, api=None): | |
# if api is None: | |
# try: | |
# self.api = tradeapi.REST(API_KEY, API_SECRET, API_BASE_URL, "v2") | |
# except BaseException: | |
# raise ValueError("Wrong Account Info!") | |
# else: | |
# self.api = api | |
def __init__( | |
self, | |
data_source: str, | |
start_date: str, | |
end_date: str, | |
time_interval: str, | |
**kwargs, | |
): | |
super().__init__(data_source, start_date, end_date, time_interval, **kwargs) | |
if kwargs["API"] is None: | |
try: | |
self.api = tradeapi.REST( | |
kwargs["API_KEY"], | |
kwargs["API_SECRET"], | |
kwargs["API_BASE_URL"], | |
"v2", | |
) | |
except BaseException: | |
raise ValueError("Wrong Account Info!") | |
else: | |
self.api = kwargs["API"] | |
def download_data( | |
self, | |
ticker_list, | |
start_date, | |
end_date, | |
time_interval, | |
save_path: str = "./data/dataset.csv", | |
) -> pd.DataFrame: | |
self.time_zone = calc_time_zone( | |
ticker_list, TIME_ZONE_SELFDEFINED, USE_TIME_ZONE_SELFDEFINED | |
) | |
start_date = pd.Timestamp(self.start_date, tz=self.time_zone) | |
end_date = pd.Timestamp(self.end_date, tz=self.time_zone) + pd.Timedelta(days=1) | |
self.time_interval = time_interval | |
date = start_date | |
data_df = pd.DataFrame() | |
while date != end_date: | |
start_time = (date + pd.Timedelta("09:30:00")).isoformat() | |
end_time = (date + pd.Timedelta("15:59:00")).isoformat() | |
for tic in ticker_list: | |
barset = self.api.get_bars( | |
tic, | |
time_interval, | |
start=start_time, | |
end=end_time, | |
limit=500, | |
).df | |
barset["tic"] = tic | |
barset = barset.reset_index() | |
data_df = data_df.append(barset) | |
print(("Data before ") + end_time + " is successfully fetched") | |
# print(data_df.head()) | |
date = date + pd.Timedelta(days=1) | |
if date.isoformat()[-14:-6] == "01:00:00": | |
date = date - pd.Timedelta("01:00:00") | |
elif date.isoformat()[-14:-6] == "23:00:00": | |
date = date + pd.Timedelta("01:00:00") | |
if date.isoformat()[-14:-6] != "00:00:00": | |
raise ValueError("Timezone Error") | |
data_df["time"] = data_df["timestamp"].apply( | |
lambda x: x.strftime("%Y-%m-%d %H:%M:%S") | |
) | |
self.dataframe = data_df | |
self.save_data(save_path) | |
print( | |
f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}" | |
) | |
def clean_data(self): | |
df = self.dataframe.copy() | |
tic_list = np.unique(df.tic.values) | |
trading_days = self.get_trading_days(start=self.start, end=self.end) | |
# produce full time index | |
times = [] | |
for day in trading_days: | |
current_time = pd.Timestamp(day + " 09:30:00").tz_localize(self.time_zone) | |
for _ in range(390): | |
times.append(current_time) | |
current_time += pd.Timedelta(minutes=1) | |
# create a new dataframe with full time series | |
new_df = pd.DataFrame() | |
for tic in tic_list: | |
tmp_df = pd.DataFrame( | |
columns=["open", "high", "low", "close", "volume"], index=times | |
) | |
tic_df = df[df.tic == tic] | |
for i in range(tic_df.shape[0]): | |
tmp_df.loc[tic_df.iloc[i]["time"]] = tic_df.iloc[i][ | |
["open", "high", "low", "close", "volume"] | |
] | |
# if the close price of the first row is NaN | |
if str(tmp_df.iloc[0]["close"]) == "nan": | |
print( | |
"The price of the first row for ticker ", | |
tic, | |
" is NaN. ", | |
"It will filled with the first valid price.", | |
) | |
for i in range(tmp_df.shape[0]): | |
if str(tmp_df.iloc[i]["close"]) != "nan": | |
first_valid_price = tmp_df.iloc[i]["close"] | |
tmp_df.iloc[0] = [ | |
first_valid_price, | |
first_valid_price, | |
first_valid_price, | |
first_valid_price, | |
0.0, | |
] | |
break | |
# if the close price of the first row is still NaN (All the prices are NaN in this case) | |
if str(tmp_df.iloc[0]["close"]) == "nan": | |
print( | |
"Missing data for ticker: ", | |
tic, | |
" . The prices are all NaN. Fill with 0.", | |
) | |
tmp_df.iloc[0] = [ | |
0.0, | |
0.0, | |
0.0, | |
0.0, | |
0.0, | |
] | |
# forward filling row by row | |
for i in range(tmp_df.shape[0]): | |
if str(tmp_df.iloc[i]["close"]) == "nan": | |
previous_close = tmp_df.iloc[i - 1]["close"] | |
if str(previous_close) == "nan": | |
raise ValueError | |
tmp_df.iloc[i] = [ | |
previous_close, | |
previous_close, | |
previous_close, | |
previous_close, | |
0.0, | |
] | |
tmp_df = tmp_df.astype(float) | |
tmp_df["tic"] = tic | |
new_df = new_df.append(tmp_df) | |
new_df = new_df.reset_index() | |
new_df = new_df.rename(columns={"index": "time"}) | |
print("Data clean finished!") | |
self.dataframe = new_df | |
# def add_technical_indicator( | |
# self, | |
# df, | |
# tech_indicator_list=[ | |
# "macd", | |
# "boll_ub", | |
# "boll_lb", | |
# "rsi_30", | |
# "dx_30", | |
# "close_30_sma", | |
# "close_60_sma", | |
# ], | |
# ): | |
# df = df.rename(columns={"time": "date"}) | |
# df = df.copy() | |
# df = df.sort_values(by=["tic", "date"]) | |
# stock = Sdf.retype(df.copy()) | |
# unique_ticker = stock.tic.unique() | |
# tech_indicator_list = tech_indicator_list | |
# | |
# for indicator in tech_indicator_list: | |
# indicator_df = pd.DataFrame() | |
# for i in range(len(unique_ticker)): | |
# # print(unique_ticker[i], i) | |
# temp_indicator = stock[stock.tic == unique_ticker[i]][indicator] | |
# temp_indicator = pd.DataFrame(temp_indicator) | |
# temp_indicator["tic"] = unique_ticker[i] | |
# # print(len(df[df.tic == unique_ticker[i]]['date'].to_list())) | |
# temp_indicator["date"] = df[df.tic == unique_ticker[i]][ | |
# "date" | |
# ].to_list() | |
# indicator_df = indicator_df.append(temp_indicator, ignore_index=True) | |
# df = df.merge( | |
# indicator_df[["tic", "date", indicator]], on=["tic", "date"], how="left" | |
# ) | |
# df = df.sort_values(by=["date", "tic"]) | |
# df = df.rename(columns={"date": "time"}) | |
# print("Succesfully add technical indicators") | |
# return df | |
# def add_vix(self, data): | |
# vix_df = self.download_data(["VIXY"], self.start, self.end, self.time_interval) | |
# cleaned_vix = self.clean_data(vix_df) | |
# vix = cleaned_vix[["time", "close"]] | |
# vix = vix.rename(columns={"close": "VIXY"}) | |
# | |
# df = data.copy() | |
# df = df.merge(vix, on="time") | |
# df = df.sort_values(["time", "tic"]).reset_index(drop=True) | |
# return df | |
# def calculate_turbulence(self, data, time_period=252): | |
# # can add other market assets | |
# df = data.copy() | |
# df_price_pivot = df.pivot(index="date", columns="tic", values="close") | |
# # use returns to calculate turbulence | |
# df_price_pivot = df_price_pivot.pct_change() | |
# | |
# unique_date = df.date.unique() | |
# # start after a fixed time period | |
# start = time_period | |
# turbulence_index = [0] * start | |
# # turbulence_index = [0] | |
# count = 0 | |
# for i in range(start, len(unique_date)): | |
# current_price = df_price_pivot[df_price_pivot.index == unique_date[i]] | |
# # use one year rolling window to calcualte covariance | |
# hist_price = df_price_pivot[ | |
# (df_price_pivot.index < unique_date[i]) | |
# & (df_price_pivot.index >= unique_date[i - time_period]) | |
# ] | |
# # Drop tickers which has number missing values more than the "oldest" ticker | |
# filtered_hist_price = hist_price.iloc[ | |
# hist_price.isna().sum().min() : | |
# ].dropna(axis=1) | |
# | |
# cov_temp = filtered_hist_price.cov() | |
# current_temp = current_price[[x for x in filtered_hist_price]] - np.mean( | |
# filtered_hist_price, axis=0 | |
# ) | |
# temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot( | |
# current_temp.values.T | |
# ) | |
# if temp > 0: | |
# count += 1 | |
# if count > 2: | |
# turbulence_temp = temp[0][0] | |
# else: | |
# # avoid large outlier because of the calculation just begins | |
# turbulence_temp = 0 | |
# else: | |
# turbulence_temp = 0 | |
# turbulence_index.append(turbulence_temp) | |
# | |
# turbulence_index = pd.DataFrame( | |
# {"date": df_price_pivot.index, "turbulence": turbulence_index} | |
# ) | |
# return turbulence_index | |
# | |
# def add_turbulence(self, data, time_period=252): | |
# """ | |
# add turbulence index from a precalcualted dataframe | |
# :param data: (df) pandas dataframe | |
# :return: (df) pandas dataframe | |
# """ | |
# df = data.copy() | |
# turbulence_index = self.calculate_turbulence(df, time_period=time_period) | |
# df = df.merge(turbulence_index, on="date") | |
# df = df.sort_values(["date", "tic"]).reset_index(drop=True) | |
# return df | |
# def df_to_array(self, df, tech_indicator_list, if_vix): | |
# df = df.copy() | |
# unique_ticker = df.tic.unique() | |
# if_first_time = True | |
# for tic in unique_ticker: | |
# if if_first_time: | |
# price_array = df[df.tic == tic][["close"]].values | |
# tech_array = df[df.tic == tic][tech_indicator_list].values | |
# if if_vix: | |
# turbulence_array = df[df.tic == tic]["VIXY"].values | |
# else: | |
# turbulence_array = df[df.tic == tic]["turbulence"].values | |
# if_first_time = False | |
# else: | |
# price_array = np.hstack( | |
# [price_array, df[df.tic == tic][["close"]].values] | |
# ) | |
# tech_array = np.hstack( | |
# [tech_array, df[df.tic == tic][tech_indicator_list].values] | |
# ) | |
# print("Successfully transformed into array") | |
# return price_array, tech_array, turbulence_array | |
def get_trading_days(self, start, end): | |
nyse = tc.get_calendar("NYSE") | |
df = nyse.sessions_in_range( | |
pd.Timestamp(start, tz=pytz.UTC), pd.Timestamp(end, tz=pytz.UTC) | |
) | |
return [str(day)[:10] for day in df] | |
def fetch_latest_data( | |
self, ticker_list, time_interval, tech_indicator_list, limit=100 | |
) -> pd.DataFrame: | |
data_df = pd.DataFrame() | |
for tic in ticker_list: | |
barset = self.api.get_barset([tic], time_interval, limit=limit).df[tic] | |
barset["tic"] = tic | |
barset = barset.reset_index() | |
data_df = data_df.append(barset) | |
data_df = data_df.reset_index(drop=True) | |
start_time = data_df.time.min() | |
end_time = data_df.time.max() | |
times = [] | |
current_time = start_time | |
end = end_time + pd.Timedelta(minutes=1) | |
while current_time != end: | |
times.append(current_time) | |
current_time += pd.Timedelta(minutes=1) | |
df = data_df.copy() | |
new_df = pd.DataFrame() | |
for tic in ticker_list: | |
tmp_df = pd.DataFrame( | |
columns=["open", "high", "low", "close", "volume"], index=times | |
) | |
tic_df = df[df.tic == tic] | |
for i in range(tic_df.shape[0]): | |
tmp_df.loc[tic_df.iloc[i]["time"]] = tic_df.iloc[i][ | |
["open", "high", "low", "close", "volume"] | |
] | |
if str(tmp_df.iloc[0]["close"]) == "nan": | |
for i in range(tmp_df.shape[0]): | |
if str(tmp_df.iloc[i]["close"]) != "nan": | |
first_valid_close = tmp_df.iloc[i]["close"] | |
tmp_df.iloc[0] = [ | |
first_valid_close, | |
first_valid_close, | |
first_valid_close, | |
first_valid_close, | |
0.0, | |
] | |
break | |
if str(tmp_df.iloc[0]["close"]) == "nan": | |
print( | |
"Missing data for ticker: ", | |
tic, | |
" . The prices are all NaN. Fill with 0.", | |
) | |
tmp_df.iloc[0] = [ | |
0.0, | |
0.0, | |
0.0, | |
0.0, | |
0.0, | |
] | |
for i in range(tmp_df.shape[0]): | |
if str(tmp_df.iloc[i]["close"]) == "nan": | |
previous_close = tmp_df.iloc[i - 1]["close"] | |
if str(previous_close) == "nan": | |
raise ValueError | |
tmp_df.iloc[i] = [ | |
previous_close, | |
previous_close, | |
previous_close, | |
previous_close, | |
0.0, | |
] | |
tmp_df = tmp_df.astype(float) | |
tmp_df["tic"] = tic | |
new_df = new_df.append(tmp_df) | |
new_df = new_df.reset_index() | |
new_df = new_df.rename(columns={"index": "time"}) | |
df = self.add_technical_indicator(new_df, tech_indicator_list) | |
df["VIXY"] = 0 | |
price_array, tech_array, turbulence_array = self.df_to_array( | |
df, tech_indicator_list, if_vix=True | |
) | |
latest_price = price_array[-1] | |
latest_tech = tech_array[-1] | |
turb_df = self.api.get_barset(["VIXY"], time_interval, limit=1).df["VIXY"] | |
latest_turb = turb_df["close"].values | |
return latest_price, latest_tech, latest_turb | |
def get_portfolio_history(self, start, end): | |
trading_days = self.get_trading_days(start, end) | |
df = pd.DataFrame() | |
for day in trading_days: | |
df = df.append( | |
self.api.get_portfolio_history( | |
date_start=day, timeframe="5Min" | |
).df.iloc[:79] | |
) | |
equities = df.equity.values | |
cumu_returns = equities / equities[0] | |
cumu_returns = cumu_returns[~np.isnan(cumu_returns)] | |
return cumu_returns | |