kristada673's picture
Upload 19 files
de6e775
raw
history blame
No virus
16.7 kB
from typing import List
import alpaca_trade_api as tradeapi
import numpy as np
import pandas as pd
import pytz
try:
import exchange_calendars as tc
except:
print(
"Cannot import exchange_calendars.",
"If you are using python>=3.7, please install it.",
)
import trading_calendars as tc
print("Use trading_calendars instead for alpaca processor.")
# from basic_processor import _Base
from meta.data_processors._base import _Base
from meta.data_processors._base import calc_time_zone
from meta.config import (
TIME_ZONE_SHANGHAI,
TIME_ZONE_USEASTERN,
TIME_ZONE_PARIS,
TIME_ZONE_BERLIN,
TIME_ZONE_JAKARTA,
TIME_ZONE_SELFDEFINED,
USE_TIME_ZONE_SELFDEFINED,
BINANCE_BASE_URL,
)
class Alpaca(_Base):
# def __init__(self, API_KEY=None, API_SECRET=None, API_BASE_URL=None, api=None):
# if api is None:
# try:
# self.api = tradeapi.REST(API_KEY, API_SECRET, API_BASE_URL, "v2")
# except BaseException:
# raise ValueError("Wrong Account Info!")
# else:
# self.api = api
def __init__(
self,
data_source: str,
start_date: str,
end_date: str,
time_interval: str,
**kwargs,
):
super().__init__(data_source, start_date, end_date, time_interval, **kwargs)
if kwargs["API"] is None:
try:
self.api = tradeapi.REST(
kwargs["API_KEY"],
kwargs["API_SECRET"],
kwargs["API_BASE_URL"],
"v2",
)
except BaseException:
raise ValueError("Wrong Account Info!")
else:
self.api = kwargs["API"]
def download_data(
self,
ticker_list,
start_date,
end_date,
time_interval,
save_path: str = "./data/dataset.csv",
) -> pd.DataFrame:
self.time_zone = calc_time_zone(
ticker_list, TIME_ZONE_SELFDEFINED, USE_TIME_ZONE_SELFDEFINED
)
start_date = pd.Timestamp(self.start_date, tz=self.time_zone)
end_date = pd.Timestamp(self.end_date, tz=self.time_zone) + pd.Timedelta(days=1)
self.time_interval = time_interval
date = start_date
data_df = pd.DataFrame()
while date != end_date:
start_time = (date + pd.Timedelta("09:30:00")).isoformat()
end_time = (date + pd.Timedelta("15:59:00")).isoformat()
for tic in ticker_list:
barset = self.api.get_bars(
tic,
time_interval,
start=start_time,
end=end_time,
limit=500,
).df
barset["tic"] = tic
barset = barset.reset_index()
data_df = data_df.append(barset)
print(("Data before ") + end_time + " is successfully fetched")
# print(data_df.head())
date = date + pd.Timedelta(days=1)
if date.isoformat()[-14:-6] == "01:00:00":
date = date - pd.Timedelta("01:00:00")
elif date.isoformat()[-14:-6] == "23:00:00":
date = date + pd.Timedelta("01:00:00")
if date.isoformat()[-14:-6] != "00:00:00":
raise ValueError("Timezone Error")
data_df["time"] = data_df["timestamp"].apply(
lambda x: x.strftime("%Y-%m-%d %H:%M:%S")
)
self.dataframe = data_df
self.save_data(save_path)
print(
f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}"
)
def clean_data(self):
df = self.dataframe.copy()
tic_list = np.unique(df.tic.values)
trading_days = self.get_trading_days(start=self.start, end=self.end)
# produce full time index
times = []
for day in trading_days:
current_time = pd.Timestamp(day + " 09:30:00").tz_localize(self.time_zone)
for _ in range(390):
times.append(current_time)
current_time += pd.Timedelta(minutes=1)
# create a new dataframe with full time series
new_df = pd.DataFrame()
for tic in tic_list:
tmp_df = pd.DataFrame(
columns=["open", "high", "low", "close", "volume"], index=times
)
tic_df = df[df.tic == tic]
for i in range(tic_df.shape[0]):
tmp_df.loc[tic_df.iloc[i]["time"]] = tic_df.iloc[i][
["open", "high", "low", "close", "volume"]
]
# if the close price of the first row is NaN
if str(tmp_df.iloc[0]["close"]) == "nan":
print(
"The price of the first row for ticker ",
tic,
" is NaN. ",
"It will filled with the first valid price.",
)
for i in range(tmp_df.shape[0]):
if str(tmp_df.iloc[i]["close"]) != "nan":
first_valid_price = tmp_df.iloc[i]["close"]
tmp_df.iloc[0] = [
first_valid_price,
first_valid_price,
first_valid_price,
first_valid_price,
0.0,
]
break
# if the close price of the first row is still NaN (All the prices are NaN in this case)
if str(tmp_df.iloc[0]["close"]) == "nan":
print(
"Missing data for ticker: ",
tic,
" . The prices are all NaN. Fill with 0.",
)
tmp_df.iloc[0] = [
0.0,
0.0,
0.0,
0.0,
0.0,
]
# forward filling row by row
for i in range(tmp_df.shape[0]):
if str(tmp_df.iloc[i]["close"]) == "nan":
previous_close = tmp_df.iloc[i - 1]["close"]
if str(previous_close) == "nan":
raise ValueError
tmp_df.iloc[i] = [
previous_close,
previous_close,
previous_close,
previous_close,
0.0,
]
tmp_df = tmp_df.astype(float)
tmp_df["tic"] = tic
new_df = new_df.append(tmp_df)
new_df = new_df.reset_index()
new_df = new_df.rename(columns={"index": "time"})
print("Data clean finished!")
self.dataframe = new_df
# def add_technical_indicator(
# self,
# df,
# tech_indicator_list=[
# "macd",
# "boll_ub",
# "boll_lb",
# "rsi_30",
# "dx_30",
# "close_30_sma",
# "close_60_sma",
# ],
# ):
# df = df.rename(columns={"time": "date"})
# df = df.copy()
# df = df.sort_values(by=["tic", "date"])
# stock = Sdf.retype(df.copy())
# unique_ticker = stock.tic.unique()
# tech_indicator_list = tech_indicator_list
#
# for indicator in tech_indicator_list:
# indicator_df = pd.DataFrame()
# for i in range(len(unique_ticker)):
# # print(unique_ticker[i], i)
# temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]
# temp_indicator = pd.DataFrame(temp_indicator)
# temp_indicator["tic"] = unique_ticker[i]
# # print(len(df[df.tic == unique_ticker[i]]['date'].to_list()))
# temp_indicator["date"] = df[df.tic == unique_ticker[i]][
# "date"
# ].to_list()
# indicator_df = indicator_df.append(temp_indicator, ignore_index=True)
# df = df.merge(
# indicator_df[["tic", "date", indicator]], on=["tic", "date"], how="left"
# )
# df = df.sort_values(by=["date", "tic"])
# df = df.rename(columns={"date": "time"})
# print("Succesfully add technical indicators")
# return df
# def add_vix(self, data):
# vix_df = self.download_data(["VIXY"], self.start, self.end, self.time_interval)
# cleaned_vix = self.clean_data(vix_df)
# vix = cleaned_vix[["time", "close"]]
# vix = vix.rename(columns={"close": "VIXY"})
#
# df = data.copy()
# df = df.merge(vix, on="time")
# df = df.sort_values(["time", "tic"]).reset_index(drop=True)
# return df
# def calculate_turbulence(self, data, time_period=252):
# # can add other market assets
# df = data.copy()
# df_price_pivot = df.pivot(index="date", columns="tic", values="close")
# # use returns to calculate turbulence
# df_price_pivot = df_price_pivot.pct_change()
#
# unique_date = df.date.unique()
# # start after a fixed time period
# start = time_period
# turbulence_index = [0] * start
# # turbulence_index = [0]
# count = 0
# for i in range(start, len(unique_date)):
# current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]
# # use one year rolling window to calcualte covariance
# hist_price = df_price_pivot[
# (df_price_pivot.index < unique_date[i])
# & (df_price_pivot.index >= unique_date[i - time_period])
# ]
# # Drop tickers which has number missing values more than the "oldest" ticker
# filtered_hist_price = hist_price.iloc[
# hist_price.isna().sum().min() :
# ].dropna(axis=1)
#
# cov_temp = filtered_hist_price.cov()
# current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(
# filtered_hist_price, axis=0
# )
# temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(
# current_temp.values.T
# )
# if temp > 0:
# count += 1
# if count > 2:
# turbulence_temp = temp[0][0]
# else:
# # avoid large outlier because of the calculation just begins
# turbulence_temp = 0
# else:
# turbulence_temp = 0
# turbulence_index.append(turbulence_temp)
#
# turbulence_index = pd.DataFrame(
# {"date": df_price_pivot.index, "turbulence": turbulence_index}
# )
# return turbulence_index
#
# def add_turbulence(self, data, time_period=252):
# """
# add turbulence index from a precalcualted dataframe
# :param data: (df) pandas dataframe
# :return: (df) pandas dataframe
# """
# df = data.copy()
# turbulence_index = self.calculate_turbulence(df, time_period=time_period)
# df = df.merge(turbulence_index, on="date")
# df = df.sort_values(["date", "tic"]).reset_index(drop=True)
# return df
# def df_to_array(self, df, tech_indicator_list, if_vix):
# df = df.copy()
# unique_ticker = df.tic.unique()
# if_first_time = True
# for tic in unique_ticker:
# if if_first_time:
# price_array = df[df.tic == tic][["close"]].values
# tech_array = df[df.tic == tic][tech_indicator_list].values
# if if_vix:
# turbulence_array = df[df.tic == tic]["VIXY"].values
# else:
# turbulence_array = df[df.tic == tic]["turbulence"].values
# if_first_time = False
# else:
# price_array = np.hstack(
# [price_array, df[df.tic == tic][["close"]].values]
# )
# tech_array = np.hstack(
# [tech_array, df[df.tic == tic][tech_indicator_list].values]
# )
# print("Successfully transformed into array")
# return price_array, tech_array, turbulence_array
def get_trading_days(self, start, end):
nyse = tc.get_calendar("NYSE")
df = nyse.sessions_in_range(
pd.Timestamp(start, tz=pytz.UTC), pd.Timestamp(end, tz=pytz.UTC)
)
return [str(day)[:10] for day in df]
def fetch_latest_data(
self, ticker_list, time_interval, tech_indicator_list, limit=100
) -> pd.DataFrame:
data_df = pd.DataFrame()
for tic in ticker_list:
barset = self.api.get_barset([tic], time_interval, limit=limit).df[tic]
barset["tic"] = tic
barset = barset.reset_index()
data_df = data_df.append(barset)
data_df = data_df.reset_index(drop=True)
start_time = data_df.time.min()
end_time = data_df.time.max()
times = []
current_time = start_time
end = end_time + pd.Timedelta(minutes=1)
while current_time != end:
times.append(current_time)
current_time += pd.Timedelta(minutes=1)
df = data_df.copy()
new_df = pd.DataFrame()
for tic in ticker_list:
tmp_df = pd.DataFrame(
columns=["open", "high", "low", "close", "volume"], index=times
)
tic_df = df[df.tic == tic]
for i in range(tic_df.shape[0]):
tmp_df.loc[tic_df.iloc[i]["time"]] = tic_df.iloc[i][
["open", "high", "low", "close", "volume"]
]
if str(tmp_df.iloc[0]["close"]) == "nan":
for i in range(tmp_df.shape[0]):
if str(tmp_df.iloc[i]["close"]) != "nan":
first_valid_close = tmp_df.iloc[i]["close"]
tmp_df.iloc[0] = [
first_valid_close,
first_valid_close,
first_valid_close,
first_valid_close,
0.0,
]
break
if str(tmp_df.iloc[0]["close"]) == "nan":
print(
"Missing data for ticker: ",
tic,
" . The prices are all NaN. Fill with 0.",
)
tmp_df.iloc[0] = [
0.0,
0.0,
0.0,
0.0,
0.0,
]
for i in range(tmp_df.shape[0]):
if str(tmp_df.iloc[i]["close"]) == "nan":
previous_close = tmp_df.iloc[i - 1]["close"]
if str(previous_close) == "nan":
raise ValueError
tmp_df.iloc[i] = [
previous_close,
previous_close,
previous_close,
previous_close,
0.0,
]
tmp_df = tmp_df.astype(float)
tmp_df["tic"] = tic
new_df = new_df.append(tmp_df)
new_df = new_df.reset_index()
new_df = new_df.rename(columns={"index": "time"})
df = self.add_technical_indicator(new_df, tech_indicator_list)
df["VIXY"] = 0
price_array, tech_array, turbulence_array = self.df_to_array(
df, tech_indicator_list, if_vix=True
)
latest_price = price_array[-1]
latest_tech = tech_array[-1]
turb_df = self.api.get_barset(["VIXY"], time_interval, limit=1).df["VIXY"]
latest_turb = turb_df["close"].values
return latest_price, latest_tech, latest_turb
def get_portfolio_history(self, start, end):
trading_days = self.get_trading_days(start, end)
df = pd.DataFrame()
for day in trading_days:
df = df.append(
self.api.get_portfolio_history(
date_start=day, timeframe="5Min"
).df.iloc[:79]
)
equities = df.equity.values
cumu_returns = equities / equities[0]
cumu_returns = cumu_returns[~np.isnan(cumu_returns)]
return cumu_returns