Spaces:
Running
Running
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
from sklearn.cluster import AgglomerativeClustering | |
import vnstock as vns | |
from collections import Counter, defaultdict | |
class ResSupCluster(): | |
def __init__(self, data: pd.DataFrame, n_clusters: int = 2, | |
is_visualize: bool = False, is_delete_cluster_outlier: bool = True): | |
self.n_clusters = n_clusters | |
self.data = data | |
# train cluster | |
self.model, self.data["Clusters"] = self.train_AgglomerativeClustering(self.data["open"], n_clusters) | |
# Delete outlier to better fit data | |
if is_delete_cluster_outlier: | |
self.__delete_cluster_outlier(error_period=5) | |
# get resistance and support | |
self.__get_res_sup() | |
if is_visualize: | |
self.get_viz() | |
def __get_res_sup(self): | |
# Calculate cluster centroids | |
self.level_dict = defaultdict(dict) | |
prices = self.data["open"] | |
clusters = self.data["Clusters"].copy() | |
current_cluster = clusters.iloc[0] | |
# find min max of continuos cluster | |
min_price = float("inf") | |
max_price = 0 | |
start = 0 | |
sum = 0 | |
for i in range(len(clusters)): | |
if clusters.iloc[i] == current_cluster: | |
max_price = max(max_price, prices.iloc[i]) | |
min_price = min(min_price, prices.iloc[i]) | |
sum += prices.iloc[i] | |
else: | |
period = (self.data["time"].iloc[start], self.data["time"].iloc[i]) | |
self.level_dict[period]["cluster"] = current_cluster | |
self.level_dict[period]["support"] = min_price | |
self.level_dict[period]["resistance"] = max_price | |
self.level_dict[period]["index"] = (start, i) | |
self.level_dict[period]["average"] = sum / (i - start - 1) | |
# reinit | |
start = i | |
sum = 0 | |
current_cluster = clusters.iloc[i] | |
min_price = prices.iloc[i] | |
max_price = prices.iloc[i] | |
period = (self.data["time"].iloc[start], self.data["time"].iloc[i]) | |
self.level_dict[period]["cluster"] = current_cluster | |
self.level_dict[period]["support"] = min_price | |
self.level_dict[period]["resistance"] = max_price | |
self.level_dict[period]["average"] = sum / (i - start - 1) | |
self.level_dict[period]["index"] = (start, i) | |
def __delete_cluster_outlier(self, error_period: int = 5): | |
""" | |
Delete outlier clusters | |
""" | |
left = 0 | |
right = 0 | |
counter = 0 | |
error = 10 | |
clusters = list(self.data["Clusters"].copy()) | |
while right < len(self.data["Clusters"]): | |
if self.data["Clusters"][left] == self.data["Clusters"][right]: | |
counter += 1 | |
else: | |
if counter < error: | |
clusters[left:right] = [clusters[left - 1]] * counter | |
counter = 1 | |
left = right | |
right += 1 | |
self.data["Clusters"] = clusters | |
self.data = self.data.iloc[2:] | |
def get_viz(self): | |
level_dict = self.level_dict | |
for period in level_dict.keys(): | |
period_dict = level_dict[period] | |
plt.hlines(period_dict["support"], period_dict["index"][0], period_dict["index"][1], | |
colors="r") | |
plt.hlines(period_dict["resistance"], period_dict["index"][0], period_dict["index"][1], | |
colors="b") | |
plt.hlines(period_dict["average"], period_dict["index"][0], period_dict["index"][1], | |
colors="k") | |
# labeling | |
plt.hlines(period_dict["support"], 0, 0, colors = "b", label="Resistance") | |
plt.hlines(period_dict["resistance"], 0, 0, colors = "r", label="Support") | |
plt.hlines(period_dict["average"], 0, 0, colors = "b", label="Resistance") | |
# actual price | |
plt.scatter(self.data.index, self.data["open"], | |
c=self.data["Clusters"], cmap='viridis', label='Clusters') | |
plt.xlabel("Index") | |
plt.ylabel("Prices") | |
plt.grid() | |
plt.legend() | |
plt.show() | |
def train_AgglomerativeClustering( | |
self, data: np.ndarray, n_clusters: int = 3 | |
) -> np.ndarray: | |
""" | |
Fit Agglomerative Clustering | |
Inputs: | |
data: pd.DataFrame data | |
n_clusters: numbers of clusters, default = 4 | |
Ouputs: | |
clusters: clusters data where val = clusters, index is same | |
""" | |
prices = data.values.reshape(-1, 1) | |
# Fit Agglomerative Clustering | |
model = AgglomerativeClustering(n_clusters=n_clusters, linkage="ward") | |
clusters = model.fit_predict(prices) | |
return model, clusters | |
# Example Ussage | |
if __name__ == "__main__": | |
data = vns.stock_historical_data(symbol="CEO", start_date="2023-01-31", | |
end_date='2024-01-31', resolution='1D', | |
type='stock', beautify=True, decor=False, | |
source='DNSE') | |
Cluster = ResSupCluster(data=data, | |
is_visualize=True, | |
is_delete_cluster_outlier=True, | |
n_clusters=3) | |
# cluster levels in self.level_dict | |
print(Cluster.level_dict) | |