from typing import List import rqdatac as ricequant from meta.data_processors._base import _Base class Ricequant(_Base): def __init__( self, data_source: str, start_date: str, end_date: str, time_interval: str, **kwargs, ): super().__init__(data_source, start_date, end_date, time_interval, **kwargs) if kwargs["username"] is None or kwargs["password"] is None: ricequant.init() # if the lisence is already set, you can init without username and password else: ricequant.init( kwargs["username"], kwargs["password"] ) # init with username and password def download_data( self, ticker_list: List[str], save_path: str = "./data/dataset.csv" ): # download data by calling RiceQuant API dataframe = ricequant.get_price( ticker_list, frequency=self.time_interval, start_date=self.start_date, end_date=self.end_date, ) self.dataframe = dataframe self.save_data(save_path) print( f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}" ) # def clean_data(self, df) -> pd.DataFrame: # ''' RiceQuant data is already cleaned, we only need to transform data format here. # No need for filling NaN data''' # df = df.copy() # # raw df uses multi-index (tic,time), reset it to single index (time) # df = df.reset_index(level=[0,1]) # # rename column order_book_id to tic # df = df.rename(columns={'order_book_id':'tic', 'datetime':'time'}) # # reserve columns needed # df = df[['tic','time','open','high','low','close','volume']] # # check if there is NaN values # assert not df.isnull().values.any() # return df # def add_vix(self, data): # print('VIX is NOT applicable to China A-shares') # return data # def calculate_turbulence(self, data, time_period=252): # # can add other market assets # df = data.copy() # df_price_pivot = df.pivot(index="date", columns="tic", values="close") # # use returns to calculate turbulence # df_price_pivot = df_price_pivot.pct_change() # # unique_date = df.date.unique() # # start after a fixed time period # start = time_period # turbulence_index = [0] * start # # turbulence_index = [0] # count = 0 # for i in range(start, len(unique_date)): # current_price = df_price_pivot[df_price_pivot.index == unique_date[i]] # # use one year rolling window to calcualte covariance # hist_price = df_price_pivot[ # (df_price_pivot.index < unique_date[i]) # & (df_price_pivot.index >= unique_date[i - time_period]) # ] # # Drop tickers which has number missing values more than the "oldest" ticker # filtered_hist_price = hist_price.iloc[hist_price.isna().sum().min():].dropna(axis=1) # # cov_temp = filtered_hist_price.cov() # current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(filtered_hist_price, axis=0) # temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot( # current_temp.values.T # ) # if temp > 0: # count += 1 # if count > 2: # turbulence_temp = temp[0][0] # else: # # avoid large outlier because of the calculation just begins # turbulence_temp = 0 # else: # turbulence_temp = 0 # turbulence_index.append(turbulence_temp) # # turbulence_index = pd.DataFrame( # {"date": df_price_pivot.index, "turbulence": turbulence_index} # ) # return turbulence_index # # def add_turbulence(self, data, time_period=252): # """ # add turbulence index from a precalcualted dataframe # :param data: (df) pandas dataframe # :return: (df) pandas dataframe # """ # df = data.copy() # turbulence_index = self.calculate_turbulence(df, time_period=time_period) # df = df.merge(turbulence_index, on="date") # df = df.sort_values(["date", "tic"]).reset_index(drop=True) # return df # def df_to_array(self, df, tech_indicator_list, if_vix): # df = df.copy() # unique_ticker = df.tic.unique() # if_first_time = True # for tic in unique_ticker: # if if_first_time: # price_array = df[df.tic==tic][['close']].values # tech_array = df[df.tic==tic][tech_indicator_list].values # #risk_array = df[df.tic==tic]['turbulence'].values # if_first_time = False # else: # price_array = np.hstack([price_array, df[df.tic==tic][['close']].values]) # tech_array = np.hstack([tech_array, df[df.tic==tic][tech_indicator_list].values]) # print('Successfully transformed into array') # return price_array, tech_array, None