Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| from sklearn.cluster import AgglomerativeClustering | |
| import vnstock as vns | |
| from collections import Counter, defaultdict | |
| class ResSupCluster(): | |
| def __init__(self, data: pd.DataFrame, n_clusters: int = 2, | |
| is_visualize: bool = False, is_delete_cluster_outlier: bool = True): | |
| self.n_clusters = n_clusters | |
| self.data = data | |
| # train cluster | |
| self.model, self.data["Clusters"] = self.train_AgglomerativeClustering(self.data["open"], n_clusters) | |
| # Delete outlier to better fit data | |
| if is_delete_cluster_outlier: | |
| self.__delete_cluster_outlier(error_period=5) | |
| # get resistance and support | |
| self.__get_res_sup() | |
| if is_visualize: | |
| self.get_viz() | |
| def __get_res_sup(self): | |
| # Calculate cluster centroids | |
| self.level_dict = defaultdict(dict) | |
| prices = self.data["open"] | |
| clusters = self.data["Clusters"].copy() | |
| current_cluster = clusters.iloc[0] | |
| # find min max of continuos cluster | |
| min_price = float("inf") | |
| max_price = 0 | |
| start = 0 | |
| sum = 0 | |
| for i in range(len(clusters)): | |
| if clusters.iloc[i] == current_cluster: | |
| max_price = max(max_price, prices.iloc[i]) | |
| min_price = min(min_price, prices.iloc[i]) | |
| sum += prices.iloc[i] | |
| else: | |
| period = (self.data["time"].iloc[start], self.data["time"].iloc[i]) | |
| self.level_dict[period]["cluster"] = current_cluster | |
| self.level_dict[period]["support"] = min_price | |
| self.level_dict[period]["resistance"] = max_price | |
| self.level_dict[period]["index"] = (start, i) | |
| self.level_dict[period]["average"] = sum / (i - start - 1) | |
| # reinit | |
| start = i | |
| sum = 0 | |
| current_cluster = clusters.iloc[i] | |
| min_price = prices.iloc[i] | |
| max_price = prices.iloc[i] | |
| period = (self.data["time"].iloc[start], self.data["time"].iloc[i]) | |
| self.level_dict[period]["cluster"] = current_cluster | |
| self.level_dict[period]["support"] = min_price | |
| self.level_dict[period]["resistance"] = max_price | |
| self.level_dict[period]["average"] = sum / (i - start - 1) | |
| self.level_dict[period]["index"] = (start, i) | |
| def __delete_cluster_outlier(self, error_period: int = 5): | |
| """ | |
| Delete outlier clusters | |
| """ | |
| left = 0 | |
| right = 0 | |
| counter = 0 | |
| error = 10 | |
| clusters = list(self.data["Clusters"].copy()) | |
| while right < len(self.data["Clusters"]): | |
| if self.data["Clusters"][left] == self.data["Clusters"][right]: | |
| counter += 1 | |
| else: | |
| if counter < error: | |
| clusters[left:right] = [clusters[left - 1]] * counter | |
| counter = 1 | |
| left = right | |
| right += 1 | |
| self.data["Clusters"] = clusters | |
| self.data = self.data.iloc[2:] | |
| def get_viz(self): | |
| level_dict = self.level_dict | |
| for period in level_dict.keys(): | |
| period_dict = level_dict[period] | |
| plt.hlines(period_dict["support"], period_dict["index"][0], period_dict["index"][1], | |
| colors="r") | |
| plt.hlines(period_dict["resistance"], period_dict["index"][0], period_dict["index"][1], | |
| colors="b") | |
| plt.hlines(period_dict["average"], period_dict["index"][0], period_dict["index"][1], | |
| colors="k") | |
| # labeling | |
| plt.hlines(period_dict["support"], 0, 0, colors = "b", label="Resistance") | |
| plt.hlines(period_dict["resistance"], 0, 0, colors = "r", label="Support") | |
| plt.hlines(period_dict["average"], 0, 0, colors = "b", label="Resistance") | |
| # actual price | |
| plt.scatter(self.data.index, self.data["open"], | |
| c=self.data["Clusters"], cmap='viridis', label='Clusters') | |
| plt.xlabel("Index") | |
| plt.ylabel("Prices") | |
| plt.grid() | |
| plt.legend() | |
| plt.show() | |
| def train_AgglomerativeClustering( | |
| self, data: np.ndarray, n_clusters: int = 3 | |
| ) -> np.ndarray: | |
| """ | |
| Fit Agglomerative Clustering | |
| Inputs: | |
| data: pd.DataFrame data | |
| n_clusters: numbers of clusters, default = 4 | |
| Ouputs: | |
| clusters: clusters data where val = clusters, index is same | |
| """ | |
| prices = data.values.reshape(-1, 1) | |
| # Fit Agglomerative Clustering | |
| model = AgglomerativeClustering(n_clusters=n_clusters, linkage="ward") | |
| clusters = model.fit_predict(prices) | |
| return model, clusters | |
| # Example Ussage | |
| if __name__ == "__main__": | |
| data = vns.stock_historical_data(symbol="CEO", start_date="2023-01-31", | |
| end_date='2024-01-31', resolution='1D', | |
| type='stock', beautify=True, decor=False, | |
| source='DNSE') | |
| Cluster = ResSupCluster(data=data, | |
| is_visualize=True, | |
| is_delete_cluster_outlier=True, | |
| n_clusters=3) | |
| # cluster levels in self.level_dict | |
| print(Cluster.level_dict) | |