Spaces:
Running
Running
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
from sklearn.cluster import AgglomerativeClustering | |
# import vnstock as vns | |
class ResSupCluster: | |
def __init__( | |
self, | |
data: pd.DataFrame, | |
feature_map: dict, | |
n_clusters: int = 2, | |
is_visualize: bool = False, | |
is_delete_outlier: bool = True, | |
): | |
self.n_clusters = n_clusters | |
self.data = data | |
self.feature_map = feature_map | |
# Delete outlier to better fit data | |
if is_delete_outlier: | |
self.delete_outlier() | |
# train cluster | |
self.model, self.data["Clusters"] = self.train_AgglomerativeClustering( | |
self.data[feature_map["open"]], n_clusters | |
) | |
# get resistance and support | |
self.get_res_sup() | |
if is_visualize: | |
self.get_viz() | |
def get_res_sup(self): | |
# Calculate cluster centroids | |
self.support = [] | |
self.resistance = [] | |
self.average = [] | |
prices = self.data[self.feature_map["open"]] | |
clusters = self.data["Clusters"] | |
for cluster_label in range(self.n_clusters): | |
# Get prices in same cluster | |
cluster_points = prices[clusters == cluster_label] | |
# Get support resistance and average | |
self.resistance.append(max(cluster_points)) | |
self.support.append(min(cluster_points)) | |
self.average.append(np.mean(cluster_points)) | |
def delete_outlier(self): | |
""" | |
Delete outlier that is 2 std away from mean | |
""" | |
# to make sure we dont drop today data | |
data = self.data.iloc[:-1] | |
# extract mean and std | |
mean = np.mean(data[self.feature_map["open"]]) | |
std = np.std(data[self.feature_map["open"]]) | |
# drop outlier | |
data = data[ | |
(mean - 2 * std < data[self.feature_map["open"]]) | |
& (data[self.feature_map["open"]] < mean + 2 * std) | |
] | |
# update self.data | |
today_data = self.data.iloc[-1].copy() | |
self.data = data | |
self.data[len(data.index)] = today_data | |
def get_viz(self): | |
# plt.plot(train_data.index, train_data["open"]) | |
plt.scatter( | |
self.data.index, | |
self.data[self.feature_map["open"]], | |
c=self.data["Clusters"], | |
cmap="viridis", | |
label="Clusters", | |
) | |
plt.hlines( | |
self.support, | |
self.data.index[0], | |
self.data.index[-1], | |
colors="r", | |
label="Support", | |
) | |
plt.hlines( | |
self.resistance, | |
self.data.index[0], | |
self.data.index[-1], | |
colors="b", | |
label="Resistance", | |
) | |
plt.hlines( | |
self.average, | |
self.data.index[0], | |
self.data.index[-1], | |
colors="k", | |
label="Average", | |
) | |
plt.plot( | |
self.data.index, | |
self.data[self.feature_map["open"]], | |
label="Clusters", | |
) | |
plt.grid() | |
plt.legend() | |
plt.show() | |
def train_AgglomerativeClustering( | |
self, data: np.ndarray, n_clusters: int = 3 | |
) -> np.ndarray: | |
""" | |
Fit Agglomerative Clustering | |
Inputs: | |
data: pd.DataFrame data | |
n_clusters: numbers of clusters, default = 4 | |
Ouputs: | |
clusters: clusters data where val = clusters, index is same | |
""" | |
prices = data.values.reshape(-1, 1) | |
# Fit Agglomerative Clustering | |
model = AgglomerativeClustering(n_clusters=n_clusters, linkage="ward") | |
clusters = model.fit_predict(prices) | |
return model, clusters | |
def extract_all_lines(self): | |
return { | |
'support': self.support, | |
'resistance': self.resistance, | |
'average': self.average | |
} | |
# Example Ussage | |
# if __name__ == "__main__": | |
# data = vns.stock_historical_data( | |
# symbol="CEO", | |
# start_date="2023-01-31", | |
# end_date="2024-01-31", | |
# resolution="1D", | |
# type="stock", | |
# beautify=True, | |
# decor=False, | |
# source="DNSE", | |
# ) | |
# Cluster = ResSupCluster( | |
# data=data, is_visualize=True, is_delete_outlier=True, n_clusters=3 | |
# ) | |