Spaces:
Runtime error
Runtime error
| from typing import List | |
| import rqdatac as ricequant | |
| from meta.data_processors._base import _Base | |
| class Ricequant(_Base): | |
| def __init__( | |
| self, | |
| data_source: str, | |
| start_date: str, | |
| end_date: str, | |
| time_interval: str, | |
| **kwargs, | |
| ): | |
| super().__init__(data_source, start_date, end_date, time_interval, **kwargs) | |
| if kwargs["username"] is None or kwargs["password"] is None: | |
| ricequant.init() # if the lisence is already set, you can init without username and password | |
| else: | |
| ricequant.init( | |
| kwargs["username"], kwargs["password"] | |
| ) # init with username and password | |
| def download_data( | |
| self, ticker_list: List[str], save_path: str = "./data/dataset.csv" | |
| ): | |
| # download data by calling RiceQuant API | |
| dataframe = ricequant.get_price( | |
| ticker_list, | |
| frequency=self.time_interval, | |
| start_date=self.start_date, | |
| end_date=self.end_date, | |
| ) | |
| self.dataframe = dataframe | |
| self.save_data(save_path) | |
| print( | |
| f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}" | |
| ) | |
| # def clean_data(self, df) -> pd.DataFrame: | |
| # ''' RiceQuant data is already cleaned, we only need to transform data format here. | |
| # No need for filling NaN data''' | |
| # df = df.copy() | |
| # # raw df uses multi-index (tic,time), reset it to single index (time) | |
| # df = df.reset_index(level=[0,1]) | |
| # # rename column order_book_id to tic | |
| # df = df.rename(columns={'order_book_id':'tic', 'datetime':'time'}) | |
| # # reserve columns needed | |
| # df = df[['tic','time','open','high','low','close','volume']] | |
| # # check if there is NaN values | |
| # assert not df.isnull().values.any() | |
| # return df | |
| # def add_vix(self, data): | |
| # print('VIX is NOT applicable to China A-shares') | |
| # return data | |
| # def calculate_turbulence(self, data, time_period=252): | |
| # # can add other market assets | |
| # df = data.copy() | |
| # df_price_pivot = df.pivot(index="date", columns="tic", values="close") | |
| # # use returns to calculate turbulence | |
| # df_price_pivot = df_price_pivot.pct_change() | |
| # | |
| # unique_date = df.date.unique() | |
| # # start after a fixed time period | |
| # start = time_period | |
| # turbulence_index = [0] * start | |
| # # turbulence_index = [0] | |
| # count = 0 | |
| # for i in range(start, len(unique_date)): | |
| # current_price = df_price_pivot[df_price_pivot.index == unique_date[i]] | |
| # # use one year rolling window to calcualte covariance | |
| # hist_price = df_price_pivot[ | |
| # (df_price_pivot.index < unique_date[i]) | |
| # & (df_price_pivot.index >= unique_date[i - time_period]) | |
| # ] | |
| # # Drop tickers which has number missing values more than the "oldest" ticker | |
| # filtered_hist_price = hist_price.iloc[hist_price.isna().sum().min():].dropna(axis=1) | |
| # | |
| # cov_temp = filtered_hist_price.cov() | |
| # current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(filtered_hist_price, axis=0) | |
| # temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot( | |
| # current_temp.values.T | |
| # ) | |
| # if temp > 0: | |
| # count += 1 | |
| # if count > 2: | |
| # turbulence_temp = temp[0][0] | |
| # else: | |
| # # avoid large outlier because of the calculation just begins | |
| # turbulence_temp = 0 | |
| # else: | |
| # turbulence_temp = 0 | |
| # turbulence_index.append(turbulence_temp) | |
| # | |
| # turbulence_index = pd.DataFrame( | |
| # {"date": df_price_pivot.index, "turbulence": turbulence_index} | |
| # ) | |
| # return turbulence_index | |
| # | |
| # def add_turbulence(self, data, time_period=252): | |
| # """ | |
| # add turbulence index from a precalcualted dataframe | |
| # :param data: (df) pandas dataframe | |
| # :return: (df) pandas dataframe | |
| # """ | |
| # df = data.copy() | |
| # turbulence_index = self.calculate_turbulence(df, time_period=time_period) | |
| # df = df.merge(turbulence_index, on="date") | |
| # df = df.sort_values(["date", "tic"]).reset_index(drop=True) | |
| # return df | |
| # def df_to_array(self, df, tech_indicator_list, if_vix): | |
| # df = df.copy() | |
| # unique_ticker = df.tic.unique() | |
| # if_first_time = True | |
| # for tic in unique_ticker: | |
| # if if_first_time: | |
| # price_array = df[df.tic==tic][['close']].values | |
| # tech_array = df[df.tic==tic][tech_indicator_list].values | |
| # #risk_array = df[df.tic==tic]['turbulence'].values | |
| # if_first_time = False | |
| # else: | |
| # price_array = np.hstack([price_array, df[df.tic==tic][['close']].values]) | |
| # tech_array = np.hstack([tech_array, df[df.tic==tic][tech_indicator_list].values]) | |
| # print('Successfully transformed into array') | |
| # return price_array, tech_array, None | |