import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.cluster import AgglomerativeClustering import vnstock as vns from collections import Counter, defaultdict class ResSupCluster(): def __init__(self, data: pd.DataFrame, n_clusters: int = 2, is_visualize: bool = False, is_delete_cluster_outlier: bool = True): self.n_clusters = n_clusters self.data = data # train cluster self.model, self.data["Clusters"] = self.train_AgglomerativeClustering(self.data["open"], n_clusters) # Delete outlier to better fit data if is_delete_cluster_outlier: self.__delete_cluster_outlier(error_period=5) # get resistance and support self.__get_res_sup() if is_visualize: self.get_viz() def __get_res_sup(self): # Calculate cluster centroids self.level_dict = defaultdict(dict) prices = self.data["open"] clusters = self.data["Clusters"].copy() current_cluster = clusters.iloc[0] # find min max of continuos cluster min_price = float("inf") max_price = 0 start = 0 sum = 0 for i in range(len(clusters)): if clusters.iloc[i] == current_cluster: max_price = max(max_price, prices.iloc[i]) min_price = min(min_price, prices.iloc[i]) sum += prices.iloc[i] else: period = (self.data["time"].iloc[start], self.data["time"].iloc[i]) self.level_dict[period]["cluster"] = current_cluster self.level_dict[period]["support"] = min_price self.level_dict[period]["resistance"] = max_price self.level_dict[period]["index"] = (start, i) self.level_dict[period]["average"] = sum / (i - start - 1) # reinit start = i sum = 0 current_cluster = clusters.iloc[i] min_price = prices.iloc[i] max_price = prices.iloc[i] period = (self.data["time"].iloc[start], self.data["time"].iloc[i]) self.level_dict[period]["cluster"] = current_cluster self.level_dict[period]["support"] = min_price self.level_dict[period]["resistance"] = max_price self.level_dict[period]["average"] = sum / (i - start - 1) self.level_dict[period]["index"] = (start, i) def __delete_cluster_outlier(self, error_period: int = 5): """ Delete outlier clusters """ left = 0 right = 0 counter = 0 error = 10 clusters = list(self.data["Clusters"].copy()) while right < len(self.data["Clusters"]): if self.data["Clusters"][left] == self.data["Clusters"][right]: counter += 1 else: if counter < error: clusters[left:right] = [clusters[left - 1]] * counter counter = 1 left = right right += 1 self.data["Clusters"] = clusters self.data = self.data.iloc[2:] def get_viz(self): level_dict = self.level_dict for period in level_dict.keys(): period_dict = level_dict[period] plt.hlines(period_dict["support"], period_dict["index"][0], period_dict["index"][1], colors="r") plt.hlines(period_dict["resistance"], period_dict["index"][0], period_dict["index"][1], colors="b") plt.hlines(period_dict["average"], period_dict["index"][0], period_dict["index"][1], colors="k") # labeling plt.hlines(period_dict["support"], 0, 0, colors = "b", label="Resistance") plt.hlines(period_dict["resistance"], 0, 0, colors = "r", label="Support") plt.hlines(period_dict["average"], 0, 0, colors = "b", label="Resistance") # actual price plt.scatter(self.data.index, self.data["open"], c=self.data["Clusters"], cmap='viridis', label='Clusters') plt.xlabel("Index") plt.ylabel("Prices") plt.grid() plt.legend() plt.show() def train_AgglomerativeClustering( self, data: np.ndarray, n_clusters: int = 3 ) -> np.ndarray: """ Fit Agglomerative Clustering Inputs: data: pd.DataFrame data n_clusters: numbers of clusters, default = 4 Ouputs: clusters: clusters data where val = clusters, index is same """ prices = data.values.reshape(-1, 1) # Fit Agglomerative Clustering model = AgglomerativeClustering(n_clusters=n_clusters, linkage="ward") clusters = model.fit_predict(prices) return model, clusters # Example Ussage if __name__ == "__main__": data = vns.stock_historical_data(symbol="CEO", start_date="2023-01-31", end_date='2024-01-31', resolution='1D', type='stock', beautify=True, decor=False, source='DNSE') Cluster = ResSupCluster(data=data, is_visualize=True, is_delete_cluster_outlier=True, n_clusters=3) # cluster levels in self.level_dict print(Cluster.level_dict)