import requests from datetime import datetime, timedelta import pandas as pd import numpy as np from vnstock import longterm_ohlc_data from pymongo import MongoClient, ASCENDING, DESCENDING from utils.config import DATE_FORMAT, VN100_URL from .utils import Utility import os # from dotenv import load_dotenv # load_dotenv() PREFIX = "https://www.hsx.vn/Modules/Chart/StaticChart/" TIME = {"1m": 2, "3m": 3, "6m": 4, "1y": 5, "2y": 6, "5y": 7} INDICATORS = { "EMA": "GetEmaChart", "MACD": "GetMacdChart", "RSI": "GetRsiChart", "Momentum": "GetMomentumChart", "Williams %R": "GetWilliamChart", "BollingerBand": "GetBollingerBandChart", } updating_price = False updating_rsi = False updating_macd = False class Indicator: def __init__(self) -> None: pass @staticmethod def get_vn100(source: str = "database") -> dict: try: lst_symbols = [] if source == "database": uri = os.environ.get("MONGODB_URI") client = MongoClient(uri) database = client.get_database("data") collection = database.get_collection("rsi") newest_record = collection.find_one(sort=[("_id", ASCENDING)]) lst_symbols = list(newest_record.keys())[2:] elif source == "hsx": data = requests.get(VN100_URL).json() data = data["rows"] for value in data: lst_symbols.append(value["cell"][2].strip()) return {"symbols": lst_symbols} except Exception as e: return {"message": f"Caught error {e}."} @staticmethod async def update_daily_price() -> dict: global updating_price if updating_price: print("The data has been updating.") return updating_price = True datetime_now = datetime.utcnow() hour = datetime_now.hour weekday = datetime_now.weekday() end_date = datetime_now.date() delta = timedelta(days=5) start_date = end_date - delta start_date = start_date.strftime(DATE_FORMAT) end_date = end_date.strftime(DATE_FORMAT) try: if weekday < 5 and hour >= 12: uri = os.environ.get("MONGODB_URI") client = MongoClient(uri) database = client.get_database("data") collection = database.get_collection("price") tmp_df = longterm_ohlc_data("FPT", start_date, end_date, "D", "stock").reset_index(drop=True) if tmp_df.shape[0] == 0: updating_price = False print("There is no data to updated") return tmp_df = tmp_df.iloc[-1] last_date = str(tmp_df["time"]) newest_record = \ collection.find_one(sort=[("_id", DESCENDING)]) ret = None if newest_record["time"] != last_date: rsi_collection = database.get_collection("rsi") tmp_record = \ rsi_collection.find_one(sort=[("_id", DESCENDING)]) lst_symbols = list(tmp_record.keys())[2:] record = {} record["time"] = last_date values = [] for symbol in lst_symbols: df = longterm_ohlc_data(symbol, start_date, end_date, resolution="D", type="stock" ).reset_index(drop=True) df[["open", "high", "low", "close"]] = \ df[["open", "high", "low", "close"]] * 1000 # convert open, high, low, close to int df[["open", "high", "low", "close"]] = \ df[["open", "high", "low", "close"]].astype(int) df = df[["ticker", "open", "high", "low", "close", "volume"]].iloc[-1] values.append(df.to_dict()) record["value"] = values collection.insert_one(record) ret = "Updated data." else: ret = "The data is up to date." updating_price = False print(ret) except Exception as e: updating_price = False print(f"Caught error {e}.") @staticmethod def update_entire_price() -> None: global updating_price if updating_price: return {"message": "The data has been updating."} updating_price = True datetime_now = datetime.utcnow() hour = datetime_now.hour weekday = datetime_now.weekday() try: symbol_dict = Indicator.get_vn100(source="hsx") lst_symbols = symbol_dict["symbols"] if weekday < 5 and hour < 12: updating_price = False return {"message": "The market is in trading."} end_date = datetime_now.date() delta = timedelta(days=350) start_date = end_date - delta start_date = start_date.strftime(DATE_FORMAT) end_date = end_date.strftime(DATE_FORMAT) lst_time = [] lst_values = [] if len(lst_symbols) != 100: updating_price = False return {"message": "Not enough 100 symbols."} for symbol in lst_symbols: df = longterm_ohlc_data(symbol, start_date, end_date, resolution="D", type="stock" ).reset_index(drop=True).tail(205) lst_time = list(df["time"]) df[["open", "high", "low", "close"]] = \ df[["open", "high", "low", "close"]] * 1000 # convert open, high, low, close to int df[["open", "high", "low", "close"]] = \ df[["open", "high", "low", "close"]].astype(int) df = df[["ticker", "open", "high", "low", "close", "volume"]] lst_values.append(df.to_dict(orient="records")) records = [] for i in range(len(lst_time)): record = {} record["time"] = lst_time[i] values = [] for symbol_index in range(100): values.append(lst_values[symbol_index][i]) record["value"] = values records.append(record) uri = os.environ.get("MONGODB_URI") client = MongoClient(uri) database = client.get_database("data") collection = database.get_collection("price") if "price" in database.list_collection_names(): collection.drop() collection.insert_many(records) updating_price = False return {"message": "Updated data."} except Exception as e: updating_price = False return {"message": f"Caught error {e}."} @staticmethod def get_price(symbol: str, count_back: int = 150, source: str = "database") -> pd.DataFrame: try: symbol = symbol.upper() if source == "database": uri = os.environ.get("MONGODB_URI") client = MongoClient(uri) database = client.get_database("data") collection = database.get_collection("price") result = list(collection.find()) tmp_df = pd.DataFrame(result[0]["value"]) lst_symbols = tmp_df["ticker"].values if symbol not in lst_symbols: return pd.DataFrame( [{"message": "The symbol is not existed"}] ) symbol_index = np.argwhere(lst_symbols == symbol)[0][0] lst_values = [] for record in result: value = record["value"][symbol_index] value["time"] = record["time"] lst_values.append(value) return pd.DataFrame(lst_values) elif source == "vnstock": datetime_now = datetime.utcnow() end_date = datetime_now.date() delta = timedelta(days=count_back) start_date = end_date - delta start_date = start_date.strftime(DATE_FORMAT) end_date = end_date.strftime(DATE_FORMAT) df = longterm_ohlc_data(symbol, start_date, end_date, resolution="D", type="stock" ).reset_index(drop=True) df[["open", "high", "low", "close"]] = \ df[["open", "high", "low", "close"]] * 1000 # convert open, high, low, close to int df[["open", "high", "low", "close"]] = \ df[["open", "high", "low", "close"]].astype(int) return df[["time", "ticker", "open", "high", "low", "close"]] except Exception as e: return pd.DataFrame( [{"message": f"Caught error {e}."}] ) @staticmethod async def update_daily_rsi() -> None: global updating_rsi if updating_rsi: print("The data has been updating.") return # return {"message": "The data has been updating."} updating_rsi = True datetime_now = datetime.utcnow() hour = datetime_now.hour weekday = datetime_now.weekday() end_date = datetime_now.date() delta = timedelta(days=5) start_date = end_date - delta start_date = start_date.strftime(DATE_FORMAT) end_date = end_date.strftime(DATE_FORMAT) try: ret = None if weekday < 5 and hour >= 12: uri = os.environ.get("MONGODB_URI") client = MongoClient(uri) database = client.get_database("data") collection = database.get_collection("rsi") tmp_df = longterm_ohlc_data("FPT", start_date, end_date, "D", "stock").reset_index(drop=True) if tmp_df.shape[0] == 0: updating_rsi = False print("The market is in trading.") return # return {"message": "The market is in trading."} tmp_df = tmp_df.iloc[-1] last_date = str(tmp_df["time"]) newest_record = \ collection.find_one(sort=[("_id", DESCENDING)]) if newest_record["time"] != last_date: lst_symbols = list(newest_record.keys())[2:] record = {} record["time"] = last_date for batch_idx in range(10): str_symbols = ','.join( lst_symbols[batch_idx*10:(batch_idx+1)*10]) data = requests.get('https://apipubaws.tcbs.com.vn/stock-insight/v1/stock/second-tc-price?tickers={}'.format(str_symbols)).json() for i in data["data"]: record[i["t"]] = i["rsi"] collection.insert_one(record) ret = "Updated data." else: ret = "The data is up to date." else: ret = "The market is in trading." updating_rsi = False print(ret) except Exception as e: updating_rsi = False print(f"Caught error {e}.") @staticmethod def update_entire_rsi() -> None: global updating_rsi if updating_rsi: return {"message": "The data has been updating."} updating_rsi = True datetime_now = datetime.utcnow() hour = datetime_now.hour weekday = datetime_now.weekday() try: symbol_dict = Indicator.get_vn100(source="hsx") lst_symbols = symbol_dict["symbols"] if weekday < 5 and hour < 12: updating_rsi = False return {"message": "The market is in trading."} get_time = True lst_values = {} if len(lst_symbols) != 100: updating_rsi = False return {"message": "Not enough 100 symbols."} cnt = 0 for symbol in lst_symbols: cnt += 1 url = PREFIX + INDICATORS["RSI"] \ + f"?stockSymbol={symbol}&rangeSelector=3&periods=14" data = requests.get(url).json() rsi_df = pd.DataFrame(data["SeriesColection"][0]["Points"]) if get_time: lst_values["time"] = \ rsi_df["Time"].apply(lambda x: Utility.ts_to_date(x/1000)) get_time = False lst_values[symbol] = rsi_df["Value"].apply(lambda x: x[0]) df = pd.DataFrame(lst_values) records = df.to_dict(orient="records") uri = os.environ.get("MONGODB_URI") client = MongoClient(uri) database = client.get_database("data") collection = database.get_collection("rsi") if "rsi" in database.list_collection_names(): collection.drop() collection.insert_many(records) updating_rsi = False return {"message": "Updated data."} except Exception as e: updating_rsi = False return {"message": f"Caught error {e}."} @staticmethod def get_rsi( symbol: str, periods: int = 14, smooth_k: int = 3, smooth_d: int = 3, ) -> pd.DataFrame: try: symbol = symbol.upper() uri = os.environ.get("MONGODB_URI") client = MongoClient(uri) database = client.get_database("data") collection = database.get_collection("rsi") records = list(collection.find()) record_df = pd.DataFrame(records).drop(columns=["_id"]) record_df = \ record_df[["time", symbol]].rename(columns={symbol: "rsi"}) record_df["stoch_rsi"] = \ Indicator.stoch_rsi(record_df["rsi"], periods) record_df["stoch_rsi_smooth_k"] = \ Indicator.stoch_rsi_smooth_k(record_df["stoch_rsi"], smooth_k) record_df["stoch_rsi_smooth_d"] = Indicator.stoch_rsi_smooth_d( record_df["stoch_rsi_smooth_k"], smooth_d ) return record_df except Exception as e: return pd.DataFrame( [{"message": f"Caught error {e}."}] ) @staticmethod def stoch_rsi(rsi: pd.Series, periods: int = 14) -> pd.Series: ma, mi = ( rsi.rolling(window=periods).max(), rsi.rolling(window=periods).min(), ) return (rsi - mi) * 100 / (ma - mi) @staticmethod def stoch_rsi_smooth_k(stoch_rsi: pd.Series, k: int) -> pd.Series: return stoch_rsi.rolling(window=k).mean() @staticmethod def stoch_rsi_smooth_d(stoch_rsi_k: pd.Series, d: int) -> pd.Series: return stoch_rsi_k.rolling(window=d).mean() @staticmethod async def update_entire_macd() -> None: global updating_macd if updating_macd: print("The data has been updating.") return updating_macd = True datetime_now = datetime.utcnow() hour = datetime_now.hour weekday = datetime_now.weekday() try: symbol_dict = Indicator.get_vn100(source="hsx") lst_symbols = symbol_dict["symbols"] if weekday < 5 and hour < 12: updating_macd = False print("The market is in trading.") return get_time = True lst_values = {} if len(lst_symbols) != 100: updating_macd = False print("Not enough 100 symbols.") return cnt = 0 for symbol in lst_symbols: cnt += 1 url = PREFIX + INDICATORS["MACD"] \ + f"?stockSymbol={symbol}&rangeSelector=4&fastPeriod=12&slowPeriod=26&signalPeriod=9" data = requests.get(url).json() rsi_df = pd.DataFrame( data["SeriesColection"][0]["Points"]).tail(20) if get_time: lst_values["time"] = \ rsi_df["Time"].apply(lambda x: Utility.ts_to_date(x/1000)) get_time = False lst_values[symbol] = rsi_df["Value"].apply(lambda x: int(x[0])) df = pd.DataFrame(lst_values) records = df.to_dict(orient="records") uri = os.environ.get("MONGODB_URI") client = MongoClient(uri) database = client.get_database("data") collection = database.get_collection("macd") if "macd" in database.list_collection_names(): collection.drop() collection.insert_many(records) updating_macd = False print("Updated data.") except Exception as e: updating_macd = False print(f"Caught error {e}.") @staticmethod def get_macd( symbol: str, ) -> pd.DataFrame: try: symbol = symbol.upper() uri = os.environ.get("MONGODB_URI") client = MongoClient(uri) database = client.get_database("data") collection = database.get_collection("macd") records = list(collection.find()) record_df = pd.DataFrame(records).drop(columns=["_id"]) record_df = \ record_df[["time", symbol]].rename(columns={symbol: "macd"}) return record_df except Exception as e: return pd.DataFrame( [{"message": f"Caught error {e}."}] ) @staticmethod def get_ichimoku_cloud( df: pd.DataFrame, conversion_period=9, base_period=26, span_b_period=52, displacement=26, ) -> pd.DataFrame: space_displacement = np.full(displacement, np.nan) tenkan_sen = ( df["high"].rolling(window=conversion_period).max() + df["low"].rolling(window=conversion_period).min() ) / 2 kijun_sen = ( df["high"].rolling(window=base_period).max() + df["low"].rolling(window=base_period).min() ) / 2 senkou_span_a = (tenkan_sen + kijun_sen) / 2 senkou_span_b = ( df["high"].rolling(window=span_b_period).max() + df["low"].rolling(window=span_b_period).min() ) / 2 chikou_span = df["close"].shift(-displacement) last_date = datetime.strptime(df["time"].iloc[-1], DATE_FORMAT) lst_date = Utility.generate_dates(last_date, displacement) time = np.concatenate((df["time"], lst_date)) tenkan_sen = np.concatenate((tenkan_sen, space_displacement)) kijun_sen = np.concatenate((kijun_sen, space_displacement)) senkou_span_a = np.concatenate((space_displacement, senkou_span_a)) senkou_span_b = np.concatenate((space_displacement, senkou_span_b)) chikou_span = np.concatenate((chikou_span, space_displacement)) data_dict = { "time": time, "tenkan_sen": tenkan_sen, "kijun_sen": kijun_sen, "senkou_span_a": senkou_span_a, "senkou_span_b": senkou_span_b, "chikou_span": chikou_span, "tenkan_kijun": tenkan_sen - kijun_sen, "kumo_cloud": senkou_span_a - senkou_span_b } return pd.DataFrame(data_dict)