import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.cluster import AgglomerativeClustering # import vnstock as vns class ResSupCluster: def __init__( self, data: pd.DataFrame, feature_map: dict, n_clusters: int = 2, is_visualize: bool = False, is_delete_outlier: bool = True, ): self.n_clusters = n_clusters self.data = data self.feature_map = feature_map # Delete outlier to better fit data if is_delete_outlier: self.delete_outlier() # train cluster self.model, self.data["Clusters"] = self.train_AgglomerativeClustering( self.data[feature_map["open"]], n_clusters ) # get resistance and support self.get_res_sup() if is_visualize: self.get_viz() def get_res_sup(self): # Calculate cluster centroids self.support = [] self.resistance = [] self.average = [] prices = self.data[self.feature_map["open"]] clusters = self.data["Clusters"] for cluster_label in range(self.n_clusters): # Get prices in same cluster cluster_points = prices[clusters == cluster_label] # Get support resistance and average self.resistance.append(max(cluster_points)) self.support.append(min(cluster_points)) self.average.append(np.mean(cluster_points)) def delete_outlier(self): """ Delete outlier that is 2 std away from mean """ # to make sure we dont drop today data data = self.data.iloc[:-1] # extract mean and std mean = np.mean(data[self.feature_map["open"]]) std = np.std(data[self.feature_map["open"]]) # drop outlier data = data[ (mean - 2 * std < data[self.feature_map["open"]]) & (data[self.feature_map["open"]] < mean + 2 * std) ] # update self.data today_data = self.data.iloc[-1].copy() self.data = data self.data[len(data.index)] = today_data def get_viz(self): # plt.plot(train_data.index, train_data["open"]) plt.scatter( self.data.index, self.data[self.feature_map["open"]], c=self.data["Clusters"], cmap="viridis", label="Clusters", ) plt.hlines( self.support, self.data.index[0], self.data.index[-1], colors="r", label="Support", ) plt.hlines( self.resistance, self.data.index[0], self.data.index[-1], colors="b", label="Resistance", ) plt.hlines( self.average, self.data.index[0], self.data.index[-1], colors="k", label="Average", ) plt.plot( self.data.index, self.data[self.feature_map["open"]], label="Clusters", ) plt.grid() plt.legend() plt.show() def train_AgglomerativeClustering( self, data: np.ndarray, n_clusters: int = 3 ) -> np.ndarray: """ Fit Agglomerative Clustering Inputs: data: pd.DataFrame data n_clusters: numbers of clusters, default = 4 Ouputs: clusters: clusters data where val = clusters, index is same """ prices = data.values.reshape(-1, 1) # Fit Agglomerative Clustering model = AgglomerativeClustering(n_clusters=n_clusters, linkage="ward") clusters = model.fit_predict(prices) return model, clusters def extract_all_lines(self): return { 'support': self.support, 'resistance': self.resistance, 'average': self.average } # Example Ussage # if __name__ == "__main__": # data = vns.stock_historical_data( # symbol="CEO", # start_date="2023-01-31", # end_date="2024-01-31", # resolution="1D", # type="stock", # beautify=True, # decor=False, # source="DNSE", # ) # Cluster = ResSupCluster( # data=data, is_visualize=True, is_delete_outlier=True, n_clusters=3 # )