#%% import pandas as pd import numpy as np import vnstock as vns import matplotlib.pyplot as plt from datetime import datetime def get_candle_plot(df: pd.DataFrame, buy_sell_df: pd.DataFrame ) -> None: # "up" dataframe will store the self.df # when the closing stock price is greater # than or equal to the opening stock prices up = df[df.close >= df.open] # "down" dataframe will store the df # when the closing stock price is # lesser than the opening stock prices down = df[df.close < df.open] # When the stock prices have decreased, then it # will be represented by blue color candlestick col1 = 'red' # When the stock prices have increased, then it # will be represented by green color candlestick col2 = 'green' # Setting width of candlestick elements width = .3 width2 = .03 fig = plt.figure() # Plotting up prices of the stock plt.bar(up.time, up.close-up.open, width, bottom=up.open, color=col1) plt.bar(up.time, up.high-up.close, width2, bottom=up.close, color=col1) plt.bar(up.time, up.low-up.open, width2, bottom=up.open, color=col1) # Plotting down prices of the stock plt.bar(down.time, down.close-down.open, width, bottom=down.open, color=col2) plt.bar(down.time, down.high-down.open, width2, bottom=down.open, color=col2) plt.bar(down.time, down.low-down.close, width2, bottom=down.close, color=col2) # x and y labeling plt.xlabel("Index") plt.ylabel("Prices (VND)") # plot buy point for i in buy_sell_df.index: if buy_sell_df.signal.loc[i] == "buy": color = "red" else: color = "blue" plt.axvline(x = buy_sell_df.date.loc[i], color = color, linestyle='--' ) # displaying candlestick chart of stock data # of a week plt.show() class CandleBlow(): def __init__(self, df: pd.DataFrame, is_viz: bool=True, use_close_price: bool=True, slope_thres = 55) -> None: # init variables self.df = df self.is_buy = False self.is_sell = False # ensure slope thres is positbve slope_thres = abs(slope_thres) # detect hammer value self.is_hammer = self.detect_hammer(tail_ratio=2, tol_pct=0.1/100) self.is_reverse_hammer = self.detect_inverse_hammer(tail_ratio=2, tol_pct=0.1/100) # change point is_change_point = self.is_hammer or self.is_reverse_hammer if is_change_point: # get fit self.fit_function, self.fit_values = self.__get_fit(degree=1, use_close_price=use_close_price) # find derivative self.deriv_function, self.deriv_value = self.__get_derivative(self.fit_function) # indentify buy point if abs(self.deriv_value[-1]) > slope_thres and is_viz: self.__get_viz(self.fit_values, self.deriv_value) is_buy = self.deriv_value[-1] < -slope_thres is_sell = self.deriv_value[-1] > slope_thres self.is_buy = is_change_point and is_buy self.is_sell = is_change_point and is_sell def __get_viz(self, fit_values=None, deriv_value=None) -> None: # "up" dataframe will store the self.df # when the closing stock price is greater # than or equal to the opening stock prices up = self.df[self.df.close >= self.df.open] # "down" dataframe will store the self.df # when the closing stock price is # lesser than the opening stock prices down = self.df[self.df.close < self.df.open] # When the stock prices have decreased, then it # will be represented by blue color candlestick col1 = 'red' # When the stock prices have increased, then it # will be represented by green color candlestick col2 = 'green' # Setting width of candlestick elements width = .3 width2 = .03 fig, axs = plt.subplots(2,1, sharex=True) # Plotting up prices of the stock axs[0].bar(up.index, up.close-up.open, width, bottom=up.open, color=col1) axs[0].bar(up.index, up.high-up.close, width2, bottom=up.close, color=col1) axs[0].bar(up.index, up.low-up.open, width2, bottom=up.open, color=col1) # Plotting down prices of the stock axs[0].bar(down.index, down.close-down.open, width, bottom=down.open, color=col2) axs[0].bar(down.index, down.high-down.open, width2, bottom=down.open, color=col2) axs[0].bar(down.index, down.low-down.close, width2, bottom=down.close, color=col2) # x and y labeling axs[1].set_xlabel("Index") axs[0].set_ylabel("Prices (VND)") if len(fit_values) > 0: axs[0].plot(self.df.index, fit_values, label = "Fit Line") if len(deriv_value) > 0: axs[1].plot(self.df.index, deriv_value, label = "Derivative Line") axs[0].grid() axs[1].grid() # displaying candlestick chart of stock data # of a week fig.show() def __get_fit(self, degree: int = 5, use_close_price: bool = True) -> np.ndarray: """ Get poly fit coef and value estimation for stock data: Inputs: self.df: pd.DataFrame, stock price from vnstock degree: int, how tight is the fit use_close: bool, use close or open price Outputs: est_value: fit estimate value var: np.poly1d object function of the polyfit """ if use_close_price: price = self.df.close else: price = self.df.open data_len = self.df.shape[0] # Perform polynomial fitting coefficients = np.polyfit(self.df.index, price, degree) # funciton fit_function = np.poly1d(coefficients) index = np.arange(data_len) est_value = fit_function(index) # # get y_axis value # est_value = self.__get_fit_value(coefficients ,self.df.shape[0]) return fit_function, est_value def __get_derivative(self, fit_function: np.poly1d) -> np.poly1d: """ Find derivative function of the fit function Inputs: fit_function: np.poly1d object of the fit function, produced by np.polyfit outputs: deriv_function: np.poly1d objects of the derivative function deriv_value: np.ndarray of the output value from deriv function """ data_len = self.df.shape[0] deriv_function = fit_function.deriv() deriv_value = deriv_function(np.arange(data_len)) return deriv_function, deriv_value def detect_hammer(self, tol_pct = 0.1 / 100, tail_ratio = 2.5) -> bool: today_price = self.df.iloc[-1] close = today_price.close open = today_price.open high = today_price.high low = today_price.low tol_price = high - tol_pct * high return ((close >= tol_price or open >= tol_price) and high - low >= tail_ratio * abs(close - open)) def detect_inverse_hammer(self, tol_pct = 0.1 / 100, tail_ratio = 2.5) -> bool: today_price = self.df.iloc[-1] close = today_price.close open = today_price.open high = today_price.high low = today_price.low tol_price = low + tol_pct * high return ((close <= tol_price or open <= tol_price) and high - low >= tail_ratio * abs(close - open)) #%% # Sample Use if __name__ == "__main__": df = vns.stock_historical_data(symbol="ACB", start_date="2023-01-15", end_date='2024-01-15', resolution='1D', type='stock', beautify=True, decor=False) buy_sell_df = pd.DataFrame({"date": [df.time.iloc[0]], "hammer": [True], "reverse_hammer": [True], "signal":["buy"]}) # use trend from 1.5 trading week for i in range(7, df.shape[0]): train_data = df.iloc[i-7 : i] candle = CandleBlow(df=train_data, use_close_price=False, is_viz=False) if candle.is_buy: buy_sell_df.loc[buy_sell_df.shape[0]] = {"date": train_data.time.iloc[-1], "hammer": candle.is_hammer, "reverse_hammer": candle.is_reverse_hammer, "signal":"buy"} if candle.is_sell: buy_sell_df.loc[buy_sell_df.shape[0]] = {"date": train_data.time.iloc[-1], "hammer": candle.is_hammer, "reverse_hammer": candle.is_reverse_hammer, "signal":"sell"} # plot result buy_sell_df = buy_sell_df.iloc[1:] get_candle_plot(df, buy_sell_df)