import requests from datetime import datetime, timedelta import pandas as pd import numpy as np from vnstock import longterm_ohlc_data, stock_historical_data, price_board from pymongo import MongoClient, ASCENDING, DESCENDING from utils.config import DATE_FORMAT import os PREFIX = "https://www.hsx.vn/Modules/Chart/StaticChart/" TIME = {"1m": 2, "3m": 3, "6m": 4, "1y": 5, "2y": 6, "5y": 7} INDICATORS = { "EMA": "GetEmaChart", "MACD": "GetMacdChart", "RSI": "GetRsiChart", "Momentum": "GetMomentumChart", "Williams %R": "GetWilliamChart", "BollingerBand": "GetBollingerBandChart", } class Indicator: def __init__(self) -> None: pass @staticmethod def get_price(symbol: str, count_back: int = 100, symbol_type: str = "stock",) -> pd.DataFrame: resolution = "D" end_date = datetime.now() delta = timedelta(days=count_back) start_date = end_date - delta start_date = datetime.strftime(start_date, '%Y-%m-%d') end_date = datetime.strftime(end_date, '%Y-%m-%d') df = longterm_ohlc_data(symbol, start_date, end_date, resolution, symbol_type).reset_index(drop=True) df[['open', 'high', 'low', 'close']] = \ df[['open', 'high', 'low', 'close']] * 1000 # convert open, high, low, close to int df[['open', 'high', 'low', 'close']] = \ df[['open', 'high', 'low', 'close']].astype(int) return df @staticmethod def get_rsi( symbol: str, periods: int = 14, smooth_k: int = 3, smooth_d: int = 3, ) -> pd.DataFrame: try: symbol = symbol.upper() uri = os.environ.get("MONGODB_URI") client = MongoClient(uri) database = client.get_database("data") collection = database.get_collection("rsi") datetime_now = datetime.utcnow() hour = datetime_now.hour weekday = datetime_now.weekday() error = False if weekday < 5 and hour >= 12: # Check if data is updated newest_record = collection.find_one(sort=[("_id", DESCENDING)]) delta = timedelta(days=20) start_date = datetime_now - delta start_date = start_date.strftime(DATE_FORMAT) end_date = datetime_now.strftime(DATE_FORMAT) tmp_df = stock_historical_data("MBB", start_date, end_date) last_date = str(tmp_df["time"].iloc[-1]) if newest_record["time"] != last_date: try: lst_symbols = list(newest_record.keys())[2:] record = {} record["time"] = last_date for s in lst_symbols: url = PREFIX + INDICATORS["RSI"] + \ f"?stockSymbol={symbol}\ &rangeSelector=0\ &periods={periods}" data = requests.get(url).json() lst_rsi = data["SeriesColection"][0]["Points"] record[s] = lst_rsi[-1]["Value"][0] collection.find_one_and_delete( sort=[("_id", ASCENDING)]) collection.insert_one(record) print("Updated data") except Exception: error = True records = list(collection.find()) record_df = pd.DataFrame(records).drop(columns=["_id"]) record_df = \ record_df[["time", symbol]].rename(columns={symbol: "rsi"}) if error: new_df = price_board(symbol) record_df.loc[len(record_df)] = new_df[["time", "RSI"]].values record_df["stoch_rsi"] = \ Indicator.stoch_rsi(record_df["rsi"], periods) record_df["stoch_rsi_smooth_k"] = \ Indicator.stoch_rsi_smooth_k(record_df["stoch_rsi"], smooth_k) record_df["stoch_rsi_smooth_d"] = Indicator.stoch_rsi_smooth_d( record_df["stoch_rsi_smooth_k"], smooth_d ) return record_df except Exception: return None @staticmethod def stoch_rsi(rsi: pd.Series, periods: int = 14) -> pd.Series: ma, mi = ( rsi.rolling(window=periods).max(), rsi.rolling(window=periods).min(), ) return (rsi - mi) * 100 / (ma - mi) @staticmethod def stoch_rsi_smooth_k(stoch_rsi: pd.Series, k: int) -> pd.Series: return stoch_rsi.rolling(window=k).mean() @staticmethod def stoch_rsi_smooth_d(stoch_rsi_k: pd.Series, d: int) -> pd.Series: return stoch_rsi_k.rolling(window=d).mean() @staticmethod def get_ichimoku_cloud( df: pd.DataFrame, conversion_period=9, base_period=26, span_b_period=52, displacement=26, ) -> pd.DataFrame: space_displacement = np.full(displacement, np.nan) tenkan_sen = ( df["high"].rolling(window=conversion_period).max() + df["low"].rolling(window=conversion_period).min() ) / 2 kijun_sen = ( df["high"].rolling(window=base_period).max() + df["low"].rolling(window=base_period).min() ) / 2 senkou_span_a = (tenkan_sen + kijun_sen) / 2 senkou_span_b = ( df["high"].rolling(window=span_b_period).max() + df["low"].rolling(window=span_b_period).min() ) / 2 chikou_span = df["close"].shift(-displacement) date_displacement = np.array( list(map(str, np.arange(1, displacement+1)))) time = np.concatenate((df["time"], date_displacement)) tenkan_sen = np.concatenate((tenkan_sen, space_displacement)) kijun_sen = np.concatenate((kijun_sen, space_displacement)) senkou_span_a = np.concatenate((space_displacement, senkou_span_a)) senkou_span_b = np.concatenate((space_displacement, senkou_span_b)) chikou_span = np.concatenate((chikou_span, space_displacement)) data_dict = { "time": time, "tenkan_sen": tenkan_sen, "kijun_sen": kijun_sen, "senkou_span_a": senkou_span_a, "senkou_span_b": senkou_span_b, "chikou_span": chikou_span, "tenkan_kijun": tenkan_sen - kijun_sen, "kumo_cloud": senkou_span_a - senkou_span_b, "signal": senkou_span_a > senkou_span_b, } return pd.DataFrame(data_dict)