File size: 9,340 Bytes
6b95d78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#%%
import pandas as pd
import numpy as np
import vnstock as vns
import matplotlib.pyplot as plt
from datetime import datetime


def get_candle_plot(df: pd.DataFrame, buy_sell_df: pd.DataFrame ) -> None:
    # "up" dataframe will store the self.df  
    # when the closing stock price is greater 
    # than or equal to the opening stock prices 
    up = df[df.close >= df.open] 
    
    # "down" dataframe will store the df 
    # when the closing stock price is 
    # lesser than the opening stock prices 
    down = df[df.close < df.open] 
    
    # When the stock prices have decreased, then it 
    # will be represented by blue color candlestick 
    col1 = 'red'
    
    # When the stock prices have increased, then it  
    # will be represented by green color candlestick 
    col2 = 'green'
    
    # Setting width of candlestick elements 
    width = .3
    width2 = .03
    
    fig = plt.figure()

    # Plotting up prices of the stock 
    plt.bar(up.time, up.close-up.open, width, bottom=up.open, color=col1) 
    plt.bar(up.time, up.high-up.close, width2, bottom=up.close, color=col1) 
    plt.bar(up.time, up.low-up.open, width2, bottom=up.open, color=col1) 
    
    # Plotting down prices of the stock 
    plt.bar(down.time, down.close-down.open, width, bottom=down.open, color=col2) 
    plt.bar(down.time, down.high-down.open, width2, bottom=down.open, color=col2) 
    plt.bar(down.time, down.low-down.close, width2, bottom=down.close, color=col2) 
    

    # x and y labeling
    plt.xlabel("Index")
    plt.ylabel("Prices (VND)")

    # plot buy point
    for i in buy_sell_df.index:
        if buy_sell_df.signal.loc[i] == "buy":
            color = "red"
        else:
            color = "blue"

        plt.axvline(x = buy_sell_df.date.loc[i], 
                    color = color,
                    linestyle='--'  )

        
    # displaying candlestick chart of stock data  
    # of a week 
    plt.show() 

class CandleBlow():
    def __init__(self, df: pd.DataFrame, 
                 is_viz: bool=True, use_close_price: bool=True,
                 slope_thres = 55) -> None:
        # init variables
        self.df = df
        self.is_buy = False
        self.is_sell = False

        # ensure slope thres is positbve
        slope_thres = abs(slope_thres)

        # detect hammer value
        self.is_hammer = self.detect_hammer(tail_ratio=2, tol_pct=0.1/100)
        self.is_reverse_hammer = self.detect_inverse_hammer(tail_ratio=2, tol_pct=0.1/100)


        # change point
        is_change_point = self.is_hammer or self.is_reverse_hammer

        if is_change_point:
            # get fit
            self.fit_function, self.fit_values = self.__get_fit(degree=1, use_close_price=use_close_price)

            # find derivative
            self.deriv_function, self.deriv_value = self.__get_derivative(self.fit_function)
            
            # indentify buy point
            if abs(self.deriv_value[-1]) > slope_thres and is_viz:
                self.__get_viz(self.fit_values, self.deriv_value)

            is_buy = self.deriv_value[-1] < -slope_thres
            is_sell = self.deriv_value[-1] > slope_thres
            self.is_buy = is_change_point and is_buy
            self.is_sell = is_change_point and is_sell

            

    def __get_viz(self, fit_values=None, deriv_value=None) -> None:
    
        # "up" dataframe will store the self.df  
        # when the closing stock price is greater 
        # than or equal to the opening stock prices 
        up = self.df[self.df.close >= self.df.open] 
        
        # "down" dataframe will store the self.df 
        # when the closing stock price is 
        # lesser than the opening stock prices 
        down = self.df[self.df.close < self.df.open] 
        
        # When the stock prices have decreased, then it 
        # will be represented by blue color candlestick 
        col1 = 'red'
        
        # When the stock prices have increased, then it  
        # will be represented by green color candlestick 
        col2 = 'green'
        
        # Setting width of candlestick elements 
        width = .3
        width2 = .03
        
        fig, axs = plt.subplots(2,1, sharex=True)

        # Plotting up prices of the stock 
        axs[0].bar(up.index, up.close-up.open, width, bottom=up.open, color=col1) 
        axs[0].bar(up.index, up.high-up.close, width2, bottom=up.close, color=col1) 
        axs[0].bar(up.index, up.low-up.open, width2, bottom=up.open, color=col1) 
        
        # Plotting down prices of the stock 
        axs[0].bar(down.index, down.close-down.open, width, bottom=down.open, color=col2) 
        axs[0].bar(down.index, down.high-down.open, width2, bottom=down.open, color=col2) 
        axs[0].bar(down.index, down.low-down.close, width2, bottom=down.close, color=col2) 
        

        # x and y labeling
        axs[1].set_xlabel("Index")
        axs[0].set_ylabel("Prices (VND)")

        if len(fit_values) > 0:
            axs[0].plot(self.df.index, fit_values, label = "Fit Line")
        
        if len(deriv_value) > 0:
            axs[1].plot(self.df.index, deriv_value, label = "Derivative Line")

        axs[0].grid()
        axs[1].grid()
            
        # displaying candlestick chart of stock data  
        # of a week 
        fig.show() 

    def __get_fit(self, degree: int = 5, 
                  use_close_price: bool = True) -> np.ndarray:
        """
        Get poly fit coef and value estimation for stock data:
        Inputs:
            self.df: pd.DataFrame, stock price from vnstock
            degree: int, how tight is the fit
            use_close: bool, use close or open price
        Outputs:
            est_value: fit estimate value
            var: np.poly1d object function of the polyfit
        """
        
        if use_close_price:
            price = self.df.close
        else:
            price = self.df.open

        data_len = self.df.shape[0]

        # Perform polynomial fitting
        coefficients = np.polyfit(self.df.index, price, degree)

        # funciton
        fit_function = np.poly1d(coefficients)
        index = np.arange(data_len)
        est_value = fit_function(index)

        # # get y_axis value
        # est_value = self.__get_fit_value(coefficients ,self.df.shape[0])

        return fit_function, est_value
    
    def __get_derivative(self, fit_function: np.poly1d) -> np.poly1d:
        """
        Find derivative function of the fit function
        Inputs:
            fit_function: np.poly1d object of the fit function, produced by np.polyfit
        outputs:
            deriv_function: np.poly1d objects of the derivative function
            deriv_value: np.ndarray of the output value from deriv function
        """
        data_len = self.df.shape[0]
        deriv_function = fit_function.deriv()
        deriv_value = deriv_function(np.arange(data_len))
        return deriv_function, deriv_value
    
    def detect_hammer(self, 
                      tol_pct = 0.1 / 100, 
                      tail_ratio = 2.5) -> bool:
        today_price = self.df.iloc[-1]
        close = today_price.close
        open = today_price.open
        high = today_price.high
        low = today_price.low

        tol_price = high - tol_pct * high

        return ((close >= tol_price or open >= tol_price)
                and high - low >= tail_ratio * abs(close - open))
    


    def detect_inverse_hammer(self, 
                              tol_pct = 0.1 / 100,
                               tail_ratio = 2.5) -> bool:
        today_price = self.df.iloc[-1]
        close = today_price.close
        open = today_price.open
        high = today_price.high
        low = today_price.low

        tol_price = low + tol_pct * high

        return ((close <= tol_price or open <= tol_price)
                and high - low >= tail_ratio * abs(close - open))
    


#%%
    
# Sample Use
if __name__ == "__main__":
    df = vns.stock_historical_data(symbol="ACB", start_date="2023-01-15", 
                                end_date='2024-01-15', resolution='1D',
                                type='stock', beautify=True, decor=False)

    buy_sell_df = pd.DataFrame({"date": [df.time.iloc[0]],
                            "hammer": [True],
                            "reverse_hammer": [True],
                            "signal":["buy"]})

    # use trend from 1.5 trading week
    for i in range(7, df.shape[0]):
        train_data = df.iloc[i-7 : i]
        candle = CandleBlow(df=train_data, 
                            use_close_price=False, 
                            is_viz=False)
        
        if candle.is_buy:
            buy_sell_df.loc[buy_sell_df.shape[0]] = {"date": train_data.time.iloc[-1],
                            "hammer": candle.is_hammer,
                            "reverse_hammer": candle.is_reverse_hammer,
                            "signal":"buy"}

        if candle.is_sell:
            buy_sell_df.loc[buy_sell_df.shape[0]] = {"date": train_data.time.iloc[-1],
                            "hammer": candle.is_hammer,
                            "reverse_hammer": candle.is_reverse_hammer,
                            "signal":"sell"}


    # plot result
    buy_sell_df = buy_sell_df.iloc[1:]
    get_candle_plot(df, buy_sell_df)