data-api / services /indicator.py
camphong24032002
feat: get vn100 symbols
be1eeae
raw
history blame
19.5 kB
import requests
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from vnstock import longterm_ohlc_data
from pymongo import MongoClient, ASCENDING, DESCENDING
from utils.config import DATE_FORMAT, VN100_URL
from .utils import Utility
import os
# from dotenv import load_dotenv
# load_dotenv()
PREFIX = "https://www.hsx.vn/Modules/Chart/StaticChart/"
TIME = {"1m": 2, "3m": 3, "6m": 4, "1y": 5, "2y": 6, "5y": 7}
INDICATORS = {
"EMA": "GetEmaChart",
"MACD": "GetMacdChart",
"RSI": "GetRsiChart",
"Momentum": "GetMomentumChart",
"Williams %R": "GetWilliamChart",
"BollingerBand": "GetBollingerBandChart",
}
updating_price = False
updating_rsi = False
updating_macd = False
class Indicator:
def __init__(self) -> None:
pass
@staticmethod
def get_vn100(source: str = "database") -> dict:
try:
lst_symbols = []
if source == "database":
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
collection = database.get_collection("rsi")
newest_record = collection.find_one()
lst_symbols = list(newest_record["value"].keys())
elif source == "hsx":
data = requests.get(VN100_URL).json()
data = data["rows"]
for value in data:
lst_symbols.append(value["cell"][2].strip())
return {"symbols": lst_symbols}
except Exception as e:
return {"message": f"Caught error {e}"}
@staticmethod
def update_daily_price() -> None:
global updating_price
if updating_price:
return
updating_price = True
datetime_now = datetime.utcnow()
hour = datetime_now.hour
weekday = datetime_now.weekday()
start_date = datetime_now.date()
delta = timedelta(days=1)
end_date = start_date + delta
start_date = start_date.strftime(DATE_FORMAT)
end_date = end_date.strftime(DATE_FORMAT)
try:
if weekday < 5 and hour >= 12:
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
collection = database.get_collection("price")
tmp_df = longterm_ohlc_data("MBB",
start_date,
end_date,
"D",
"stock").reset_index(drop=True)
if tmp_df.shape[0] == 0:
updating_price = False
return
last_date = str(tmp_df["time"])
newest_record = \
collection.find_one(sort=[("_id", DESCENDING)])
if newest_record["time"] != last_date:
lst_symbols = list(newest_record["value"].keys())
record = {}
record["time"] = last_date
values = []
for symbol in lst_symbols:
df = longterm_ohlc_data(symbol,
start_date,
end_date,
resolution="D",
type="stock"
).reset_index(drop=True)
df[["open", "high", "low", "close"]] = \
df[["open", "high", "low", "close"]] * 1000
# convert open, high, low, close to int
df[["open", "high", "low", "close"]] = \
df[["open", "high", "low", "close"]].astype(int)
df = df[["ticker", "open",
"high", "low", "close"]].iloc[-1]
values.append(df.to_dict())
record["value"] = values
collection.insert_one(record)
collection.find_one_and_delete({}, sort=[("_id", ASCENDING)])
print("Updated price")
updating_price = False
except Exception as e:
print(f"Error: {e}")
updating_price = False
@staticmethod
def update_entire_price() -> None:
global updating_price
if updating_price:
return
updating_price = True
datetime_now = datetime.utcnow()
hour = datetime_now.hour
weekday = datetime_now.weekday()
try:
symbol_dict = Indicator.get_vn100(source="hsx")
lst_symbols = symbol_dict["symbols"]
if weekday < 5 and hour < 12:
updating_price = False
return
end_date = datetime_now.date()
delta = timedelta(days=300)
start_date = end_date - delta
start_date = start_date.strftime(DATE_FORMAT)
end_date = end_date.strftime(DATE_FORMAT)
lst_time = []
lst_values = []
if len(lst_symbols) != 100:
updating_price = False
return
for symbol in lst_symbols:
df = longterm_ohlc_data(symbol,
start_date,
end_date,
resolution="D",
type="stock"
).reset_index(drop=True).tail(150)
lst_time = list(df["time"])
df[["open", "high", "low", "close"]] = \
df[["open", "high", "low", "close"]] * 1000
# convert open, high, low, close to int
df[["open", "high", "low", "close"]] = \
df[["open", "high", "low", "close"]].astype(int)
df = df[["ticker", "open", "high", "low", "close"]]
lst_values.append(df.to_dict(orient="records"))
records = []
for i in range(len(lst_time)):
record = {}
record["time"] = lst_time[i]
values = []
for symbol_index in range(100):
values.append(lst_values[symbol_index][i])
record["value"] = values
records.append(record)
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
collection = database.get_collection("price")
if "price" in database.list_collection_names():
collection.drop()
collection.insert_many(records)
print("Updated entire price")
updating_price = False
except Exception as e:
print(f"Error: {e}")
updating_price = False
@staticmethod
def get_price(symbol: str,
count_back: int = 150,
source: str = "database") -> pd.DataFrame:
try:
symbol = symbol.upper()
if source == "database":
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
collection = database.get_collection("price")
result = list(collection.find())
tmp_df = pd.DataFrame(result[0]["value"])
lst_symbols = tmp_df["ticker"].values
if symbol not in lst_symbols:
return pd.DataFrame(
[{"message": "The symbol is not existed"}]
)
symbol_index = np.argwhere(lst_symbols == symbol)[0][0]
lst_values = []
for record in result:
value = record["value"][symbol_index]
value["time"] = record["time"]
lst_values.append(value)
return pd.DataFrame(lst_values)
elif source == "vnstock":
datetime_now = datetime.utcnow()
end_date = datetime_now.date()
delta = timedelta(days=count_back)
start_date = end_date - delta
start_date = start_date.strftime(DATE_FORMAT)
end_date = end_date.strftime(DATE_FORMAT)
df = longterm_ohlc_data(symbol,
start_date,
end_date,
resolution="D",
type="stock"
).reset_index(drop=True)
df[["open", "high", "low", "close"]] = \
df[["open", "high", "low", "close"]] * 1000
# convert open, high, low, close to int
df[["open", "high", "low", "close"]] = \
df[["open", "high", "low", "close"]].astype(int)
return df[["time", "ticker", "open", "high", "low", "close"]]
except Exception as e:
return pd.DataFrame(
[{"message": f"Caught error {e}"}]
)
@staticmethod
def update_daily_rsi() -> None:
global updating_rsi
if updating_rsi:
return
updating_rsi = True
datetime_now = datetime.utcnow()
hour = datetime_now.hour
weekday = datetime_now.weekday()
start_date = datetime_now.date()
delta = timedelta(days=1)
end_date = start_date + delta
start_date = start_date.strftime(DATE_FORMAT)
end_date = end_date.strftime(DATE_FORMAT)
try:
if weekday < 5 and hour >= 12:
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
collection = database.get_collection("price")
tmp_df = longterm_ohlc_data("MBB",
start_date,
end_date,
"D",
"stock").reset_index(drop=True)
if tmp_df.shape[0] == 0:
updating_rsi = False
return
last_date = str(tmp_df["time"])
newest_record = \
collection.find_one(sort=[("_id", DESCENDING)])
if newest_record["time"] != last_date:
lst_symbols = list(newest_record["value"].keys())
record = {}
record["time"] = last_date
str_symbols = ','.join(lst_symbols)
data = requests.get('https://apipubaws.tcbs.com.vn/stock-insight/v1/stock/second-tc-price?tickers={}'.format(str_symbols)).json()
for i in data["data"]:
record[i["t"]] = i["rsi"]
collection.insert_one(record)
collection.find_one_and_delete({}, sort=[("_id", ASCENDING)])
print("Updated daily rsi")
updating_rsi = False
except Exception as e:
print(f"Error: {e}")
updating_rsi = False
@staticmethod
def update_entire_rsi() -> None:
global updating_rsi
if updating_rsi:
return
updating_rsi = True
datetime_now = datetime.utcnow()
hour = datetime_now.hour
weekday = datetime_now.weekday()
try:
symbol_dict = Indicator.get_vn100(source="hsx")
lst_symbols = symbol_dict["symbols"]
if weekday < 5 and hour < 12:
updating_rsi = False
return
get_time = True
lst_values = {}
if len(lst_symbols) != 100:
updating_rsi = False
return
cnt = 0
for symbol in lst_symbols:
cnt += 1
url = PREFIX + INDICATORS["RSI"] \
+ f"?stockSymbol={symbol}&rangeSelector=3&periods=14"
data = requests.get(url).json()
rsi_df = pd.DataFrame(data["SeriesColection"][0]["Points"])
if get_time:
lst_values["time"] = \
rsi_df["Time"].apply(lambda x:
Utility.ts_to_date(x/1000))
get_time = False
lst_values[symbol] = rsi_df["Value"].apply(lambda x: x[0])
df = pd.DataFrame(lst_values)
records = df.to_dict(orient="records")
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
collection = database.get_collection("rsi")
if "rsi" in database.list_collection_names():
collection.drop()
collection.insert_many(records)
print("Updated entire RSI")
updating_rsi = False
except Exception as e:
print(f"Error: {e}")
updating_rsi = False
@staticmethod
def get_rsi(
symbol: str,
periods: int = 14,
smooth_k: int = 3,
smooth_d: int = 3,
) -> pd.DataFrame:
try:
symbol = symbol.upper()
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
collection = database.get_collection("rsi")
records = list(collection.find())
record_df = pd.DataFrame(records).drop(columns=["_id"])
record_df = \
record_df[["time", symbol]].rename(columns={symbol: "rsi"})
record_df["stoch_rsi"] = \
Indicator.stoch_rsi(record_df["rsi"], periods)
record_df["stoch_rsi_smooth_k"] = \
Indicator.stoch_rsi_smooth_k(record_df["stoch_rsi"], smooth_k)
record_df["stoch_rsi_smooth_d"] = Indicator.stoch_rsi_smooth_d(
record_df["stoch_rsi_smooth_k"], smooth_d
)
return record_df
except Exception as e:
return pd.DataFrame(
[{"message": f"Caught error {e}"}]
)
@staticmethod
def stoch_rsi(rsi: pd.Series, periods: int = 14) -> pd.Series:
ma, mi = (
rsi.rolling(window=periods).max(),
rsi.rolling(window=periods).min(),
)
return (rsi - mi) * 100 / (ma - mi)
@staticmethod
def stoch_rsi_smooth_k(stoch_rsi: pd.Series, k: int) -> pd.Series:
return stoch_rsi.rolling(window=k).mean()
@staticmethod
def stoch_rsi_smooth_d(stoch_rsi_k: pd.Series, d: int) -> pd.Series:
return stoch_rsi_k.rolling(window=d).mean()
@staticmethod
def update_entire_macd() -> None:
global updating_macd
if updating_macd:
return
updating_macd = True
datetime_now = datetime.utcnow()
hour = datetime_now.hour
weekday = datetime_now.weekday()
try:
symbol_dict = Indicator.get_vn100(source="hsx")
lst_symbols = symbol_dict["symbols"]
if weekday < 5 and hour < 12:
updating_macd = False
return
get_time = True
lst_values = {}
if len(lst_symbols) != 100:
updating_macd = False
return
cnt = 0
for symbol in lst_symbols:
cnt += 1
url = PREFIX + INDICATORS["MACD"] \
+ f"?stockSymbol={symbol}&rangeSelector=4&fastPeriod=12&slowPeriod=26&signalPeriod=9"
data = requests.get(url).json()
rsi_df = pd.DataFrame(
data["SeriesColection"][0]["Points"]).tail(20)
if get_time:
lst_values["time"] = \
rsi_df["Time"].apply(lambda x:
Utility.ts_to_date(x/1000))
get_time = False
lst_values[symbol] = rsi_df["Value"].apply(lambda x: int(x[0]))
df = pd.DataFrame(lst_values)
records = df.to_dict(orient="records")
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
collection = database.get_collection("macd")
if "macd" in database.list_collection_names():
collection.drop()
collection.insert_many(records)
print("Updated entire MACD")
updating_macd = False
except Exception as e:
print(f"Error: {e}")
updating_macd = False
@staticmethod
def get_macd(
symbol: str,
) -> pd.DataFrame:
try:
symbol = symbol.upper()
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
collection = database.get_collection("macd")
records = list(collection.find())
record_df = pd.DataFrame(records).drop(columns=["_id"])
record_df = \
record_df[["time", symbol]].rename(columns={symbol: "macd"})
return record_df
except Exception as e:
return pd.DataFrame(
[{"message": f"Caught error {e}"}]
)
@staticmethod
def get_ichimoku_cloud(
df: pd.DataFrame,
conversion_period=9,
base_period=26,
span_b_period=52,
displacement=26,
) -> pd.DataFrame:
space_displacement = np.full(displacement, np.nan)
tenkan_sen = (
df["high"].rolling(window=conversion_period).max()
+ df["low"].rolling(window=conversion_period).min()
) / 2
kijun_sen = (
df["high"].rolling(window=base_period).max()
+ df["low"].rolling(window=base_period).min()
) / 2
senkou_span_a = (tenkan_sen + kijun_sen) / 2
senkou_span_b = (
df["high"].rolling(window=span_b_period).max()
+ df["low"].rolling(window=span_b_period).min()
) / 2
chikou_span = df["close"].shift(-displacement)
last_date = datetime.strptime(df["time"].iloc[-1], DATE_FORMAT)
lst_date = Utility.generate_dates(last_date, displacement)
time = np.concatenate((df["time"], lst_date))
tenkan_sen = np.concatenate((tenkan_sen, space_displacement))
kijun_sen = np.concatenate((kijun_sen, space_displacement))
senkou_span_a = np.concatenate((space_displacement, senkou_span_a))
senkou_span_b = np.concatenate((space_displacement, senkou_span_b))
chikou_span = np.concatenate((chikou_span, space_displacement))
data_dict = {
"time": time,
"tenkan_sen": tenkan_sen,
"kijun_sen": kijun_sen,
"senkou_span_a": senkou_span_a,
"senkou_span_b": senkou_span_b,
"chikou_span": chikou_span,
"tenkan_kijun": tenkan_sen - kijun_sen,
"kumo_cloud": senkou_span_a - senkou_span_b
}
return pd.DataFrame(data_dict)