kristada673's picture
Upload 19 files
de6e775
from typing import List
import rqdatac as ricequant
from meta.data_processors._base import _Base
class Ricequant(_Base):
def __init__(
self,
data_source: str,
start_date: str,
end_date: str,
time_interval: str,
**kwargs,
):
super().__init__(data_source, start_date, end_date, time_interval, **kwargs)
if kwargs["username"] is None or kwargs["password"] is None:
ricequant.init() # if the lisence is already set, you can init without username and password
else:
ricequant.init(
kwargs["username"], kwargs["password"]
) # init with username and password
def download_data(
self, ticker_list: List[str], save_path: str = "./data/dataset.csv"
):
# download data by calling RiceQuant API
dataframe = ricequant.get_price(
ticker_list,
frequency=self.time_interval,
start_date=self.start_date,
end_date=self.end_date,
)
self.dataframe = dataframe
self.save_data(save_path)
print(
f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}"
)
# def clean_data(self, df) -> pd.DataFrame:
# ''' RiceQuant data is already cleaned, we only need to transform data format here.
# No need for filling NaN data'''
# df = df.copy()
# # raw df uses multi-index (tic,time), reset it to single index (time)
# df = df.reset_index(level=[0,1])
# # rename column order_book_id to tic
# df = df.rename(columns={'order_book_id':'tic', 'datetime':'time'})
# # reserve columns needed
# df = df[['tic','time','open','high','low','close','volume']]
# # check if there is NaN values
# assert not df.isnull().values.any()
# return df
# def add_vix(self, data):
# print('VIX is NOT applicable to China A-shares')
# return data
# def calculate_turbulence(self, data, time_period=252):
# # can add other market assets
# df = data.copy()
# df_price_pivot = df.pivot(index="date", columns="tic", values="close")
# # use returns to calculate turbulence
# df_price_pivot = df_price_pivot.pct_change()
#
# unique_date = df.date.unique()
# # start after a fixed time period
# start = time_period
# turbulence_index = [0] * start
# # turbulence_index = [0]
# count = 0
# for i in range(start, len(unique_date)):
# current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]
# # use one year rolling window to calcualte covariance
# hist_price = df_price_pivot[
# (df_price_pivot.index < unique_date[i])
# & (df_price_pivot.index >= unique_date[i - time_period])
# ]
# # Drop tickers which has number missing values more than the "oldest" ticker
# filtered_hist_price = hist_price.iloc[hist_price.isna().sum().min():].dropna(axis=1)
#
# cov_temp = filtered_hist_price.cov()
# current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(filtered_hist_price, axis=0)
# temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(
# current_temp.values.T
# )
# if temp > 0:
# count += 1
# if count > 2:
# turbulence_temp = temp[0][0]
# else:
# # avoid large outlier because of the calculation just begins
# turbulence_temp = 0
# else:
# turbulence_temp = 0
# turbulence_index.append(turbulence_temp)
#
# turbulence_index = pd.DataFrame(
# {"date": df_price_pivot.index, "turbulence": turbulence_index}
# )
# return turbulence_index
#
# def add_turbulence(self, data, time_period=252):
# """
# add turbulence index from a precalcualted dataframe
# :param data: (df) pandas dataframe
# :return: (df) pandas dataframe
# """
# df = data.copy()
# turbulence_index = self.calculate_turbulence(df, time_period=time_period)
# df = df.merge(turbulence_index, on="date")
# df = df.sort_values(["date", "tic"]).reset_index(drop=True)
# return df
# def df_to_array(self, df, tech_indicator_list, if_vix):
# df = df.copy()
# unique_ticker = df.tic.unique()
# if_first_time = True
# for tic in unique_ticker:
# if if_first_time:
# price_array = df[df.tic==tic][['close']].values
# tech_array = df[df.tic==tic][tech_indicator_list].values
# #risk_array = df[df.tic==tic]['turbulence'].values
# if_first_time = False
# else:
# price_array = np.hstack([price_array, df[df.tic==tic][['close']].values])
# tech_array = np.hstack([tech_array, df[df.tic==tic][tech_indicator_list].values])
# print('Successfully transformed into array')
# return price_array, tech_array, None