logic-center / logic /res_sup.py
camphong24032002
Test
6b95d78
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.cluster import AgglomerativeClustering
import vnstock as vns
from collections import Counter, defaultdict
class ResSupCluster():
def __init__(self, data: pd.DataFrame, n_clusters: int = 2,
is_visualize: bool = False, is_delete_cluster_outlier: bool = True):
self.n_clusters = n_clusters
self.data = data
# train cluster
self.model, self.data["Clusters"] = self.train_AgglomerativeClustering(self.data["open"], n_clusters)
# Delete outlier to better fit data
if is_delete_cluster_outlier:
self.__delete_cluster_outlier(error_period=5)
# get resistance and support
self.__get_res_sup()
if is_visualize:
self.get_viz()
def __get_res_sup(self):
# Calculate cluster centroids
self.level_dict = defaultdict(dict)
prices = self.data["open"]
clusters = self.data["Clusters"].copy()
current_cluster = clusters.iloc[0]
# find min max of continuos cluster
min_price = float("inf")
max_price = 0
start = 0
sum = 0
for i in range(len(clusters)):
if clusters.iloc[i] == current_cluster:
max_price = max(max_price, prices.iloc[i])
min_price = min(min_price, prices.iloc[i])
sum += prices.iloc[i]
else:
period = (self.data["time"].iloc[start], self.data["time"].iloc[i])
self.level_dict[period]["cluster"] = current_cluster
self.level_dict[period]["support"] = min_price
self.level_dict[period]["resistance"] = max_price
self.level_dict[period]["index"] = (start, i)
self.level_dict[period]["average"] = sum / (i - start - 1)
# reinit
start = i
sum = 0
current_cluster = clusters.iloc[i]
min_price = prices.iloc[i]
max_price = prices.iloc[i]
period = (self.data["time"].iloc[start], self.data["time"].iloc[i])
self.level_dict[period]["cluster"] = current_cluster
self.level_dict[period]["support"] = min_price
self.level_dict[period]["resistance"] = max_price
self.level_dict[period]["average"] = sum / (i - start - 1)
self.level_dict[period]["index"] = (start, i)
def __delete_cluster_outlier(self, error_period: int = 5):
"""
Delete outlier clusters
"""
left = 0
right = 0
counter = 0
error = 10
clusters = list(self.data["Clusters"].copy())
while right < len(self.data["Clusters"]):
if self.data["Clusters"][left] == self.data["Clusters"][right]:
counter += 1
else:
if counter < error:
clusters[left:right] = [clusters[left - 1]] * counter
counter = 1
left = right
right += 1
self.data["Clusters"] = clusters
self.data = self.data.iloc[2:]
def get_viz(self):
level_dict = self.level_dict
for period in level_dict.keys():
period_dict = level_dict[period]
plt.hlines(period_dict["support"], period_dict["index"][0], period_dict["index"][1],
colors="r")
plt.hlines(period_dict["resistance"], period_dict["index"][0], period_dict["index"][1],
colors="b")
plt.hlines(period_dict["average"], period_dict["index"][0], period_dict["index"][1],
colors="k")
# labeling
plt.hlines(period_dict["support"], 0, 0, colors = "b", label="Resistance")
plt.hlines(period_dict["resistance"], 0, 0, colors = "r", label="Support")
plt.hlines(period_dict["average"], 0, 0, colors = "b", label="Resistance")
# actual price
plt.scatter(self.data.index, self.data["open"],
c=self.data["Clusters"], cmap='viridis', label='Clusters')
plt.xlabel("Index")
plt.ylabel("Prices")
plt.grid()
plt.legend()
plt.show()
def train_AgglomerativeClustering(
self, data: np.ndarray, n_clusters: int = 3
) -> np.ndarray:
"""
Fit Agglomerative Clustering
Inputs:
data: pd.DataFrame data
n_clusters: numbers of clusters, default = 4
Ouputs:
clusters: clusters data where val = clusters, index is same
"""
prices = data.values.reshape(-1, 1)
# Fit Agglomerative Clustering
model = AgglomerativeClustering(n_clusters=n_clusters, linkage="ward")
clusters = model.fit_predict(prices)
return model, clusters
# Example Ussage
if __name__ == "__main__":
data = vns.stock_historical_data(symbol="CEO", start_date="2023-01-31",
end_date='2024-01-31', resolution='1D',
type='stock', beautify=True, decor=False,
source='DNSE')
Cluster = ResSupCluster(data=data,
is_visualize=True,
is_delete_cluster_outlier=True,
n_clusters=3)
# cluster levels in self.level_dict
print(Cluster.level_dict)