Spaces:
Running
Running
File size: 4,382 Bytes
6b95d78 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.cluster import AgglomerativeClustering
# import vnstock as vns
class ResSupCluster:
def __init__(
self,
data: pd.DataFrame,
feature_map: dict,
n_clusters: int = 2,
is_visualize: bool = False,
is_delete_outlier: bool = True,
):
self.n_clusters = n_clusters
self.data = data
self.feature_map = feature_map
# Delete outlier to better fit data
if is_delete_outlier:
self.delete_outlier()
# train cluster
self.model, self.data["Clusters"] = self.train_AgglomerativeClustering(
self.data[feature_map["open"]], n_clusters
)
# get resistance and support
self.get_res_sup()
if is_visualize:
self.get_viz()
def get_res_sup(self):
# Calculate cluster centroids
self.support = []
self.resistance = []
self.average = []
prices = self.data[self.feature_map["open"]]
clusters = self.data["Clusters"]
for cluster_label in range(self.n_clusters):
# Get prices in same cluster
cluster_points = prices[clusters == cluster_label]
# Get support resistance and average
self.resistance.append(max(cluster_points))
self.support.append(min(cluster_points))
self.average.append(np.mean(cluster_points))
def delete_outlier(self):
"""
Delete outlier that is 2 std away from mean
"""
# to make sure we dont drop today data
data = self.data.iloc[:-1]
# extract mean and std
mean = np.mean(data[self.feature_map["open"]])
std = np.std(data[self.feature_map["open"]])
# drop outlier
data = data[
(mean - 2 * std < data[self.feature_map["open"]])
& (data[self.feature_map["open"]] < mean + 2 * std)
]
# update self.data
today_data = self.data.iloc[-1].copy()
self.data = data
self.data[len(data.index)] = today_data
def get_viz(self):
# plt.plot(train_data.index, train_data["open"])
plt.scatter(
self.data.index,
self.data[self.feature_map["open"]],
c=self.data["Clusters"],
cmap="viridis",
label="Clusters",
)
plt.hlines(
self.support,
self.data.index[0],
self.data.index[-1],
colors="r",
label="Support",
)
plt.hlines(
self.resistance,
self.data.index[0],
self.data.index[-1],
colors="b",
label="Resistance",
)
plt.hlines(
self.average,
self.data.index[0],
self.data.index[-1],
colors="k",
label="Average",
)
plt.plot(
self.data.index,
self.data[self.feature_map["open"]],
label="Clusters",
)
plt.grid()
plt.legend()
plt.show()
def train_AgglomerativeClustering(
self, data: np.ndarray, n_clusters: int = 3
) -> np.ndarray:
"""
Fit Agglomerative Clustering
Inputs:
data: pd.DataFrame data
n_clusters: numbers of clusters, default = 4
Ouputs:
clusters: clusters data where val = clusters, index is same
"""
prices = data.values.reshape(-1, 1)
# Fit Agglomerative Clustering
model = AgglomerativeClustering(n_clusters=n_clusters, linkage="ward")
clusters = model.fit_predict(prices)
return model, clusters
def extract_all_lines(self):
return {
'support': self.support,
'resistance': self.resistance,
'average': self.average
}
# Example Ussage
# if __name__ == "__main__":
# data = vns.stock_historical_data(
# symbol="CEO",
# start_date="2023-01-31",
# end_date="2024-01-31",
# resolution="1D",
# type="stock",
# beautify=True,
# decor=False,
# source="DNSE",
# )
# Cluster = ResSupCluster(
# data=data, is_visualize=True, is_delete_outlier=True, n_clusters=3
# )
|