camphong24032002
Test
6b95d78
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.cluster import AgglomerativeClustering
# import vnstock as vns
class ResSupCluster:
def __init__(
self,
data: pd.DataFrame,
feature_map: dict,
n_clusters: int = 2,
is_visualize: bool = False,
is_delete_outlier: bool = True,
):
self.n_clusters = n_clusters
self.data = data
self.feature_map = feature_map
# Delete outlier to better fit data
if is_delete_outlier:
self.delete_outlier()
# train cluster
self.model, self.data["Clusters"] = self.train_AgglomerativeClustering(
self.data[feature_map["open"]], n_clusters
)
# get resistance and support
self.get_res_sup()
if is_visualize:
self.get_viz()
def get_res_sup(self):
# Calculate cluster centroids
self.support = []
self.resistance = []
self.average = []
prices = self.data[self.feature_map["open"]]
clusters = self.data["Clusters"]
for cluster_label in range(self.n_clusters):
# Get prices in same cluster
cluster_points = prices[clusters == cluster_label]
# Get support resistance and average
self.resistance.append(max(cluster_points))
self.support.append(min(cluster_points))
self.average.append(np.mean(cluster_points))
def delete_outlier(self):
"""
Delete outlier that is 2 std away from mean
"""
# to make sure we dont drop today data
data = self.data.iloc[:-1]
# extract mean and std
mean = np.mean(data[self.feature_map["open"]])
std = np.std(data[self.feature_map["open"]])
# drop outlier
data = data[
(mean - 2 * std < data[self.feature_map["open"]])
& (data[self.feature_map["open"]] < mean + 2 * std)
]
# update self.data
today_data = self.data.iloc[-1].copy()
self.data = data
self.data[len(data.index)] = today_data
def get_viz(self):
# plt.plot(train_data.index, train_data["open"])
plt.scatter(
self.data.index,
self.data[self.feature_map["open"]],
c=self.data["Clusters"],
cmap="viridis",
label="Clusters",
)
plt.hlines(
self.support,
self.data.index[0],
self.data.index[-1],
colors="r",
label="Support",
)
plt.hlines(
self.resistance,
self.data.index[0],
self.data.index[-1],
colors="b",
label="Resistance",
)
plt.hlines(
self.average,
self.data.index[0],
self.data.index[-1],
colors="k",
label="Average",
)
plt.plot(
self.data.index,
self.data[self.feature_map["open"]],
label="Clusters",
)
plt.grid()
plt.legend()
plt.show()
def train_AgglomerativeClustering(
self, data: np.ndarray, n_clusters: int = 3
) -> np.ndarray:
"""
Fit Agglomerative Clustering
Inputs:
data: pd.DataFrame data
n_clusters: numbers of clusters, default = 4
Ouputs:
clusters: clusters data where val = clusters, index is same
"""
prices = data.values.reshape(-1, 1)
# Fit Agglomerative Clustering
model = AgglomerativeClustering(n_clusters=n_clusters, linkage="ward")
clusters = model.fit_predict(prices)
return model, clusters
def extract_all_lines(self):
return {
'support': self.support,
'resistance': self.resistance,
'average': self.average
}
# Example Ussage
# if __name__ == "__main__":
# data = vns.stock_historical_data(
# symbol="CEO",
# start_date="2023-01-31",
# end_date="2024-01-31",
# resolution="1D",
# type="stock",
# beautify=True,
# decor=False,
# source="DNSE",
# )
# Cluster = ResSupCluster(
# data=data, is_visualize=True, is_delete_outlier=True, n_clusters=3
# )