data-api / services /indicator.py
camphong24032002
Update rsi and ichimoku api
048da70
raw
history blame
6.75 kB
import requests
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from vnstock import longterm_ohlc_data, stock_historical_data, price_board
from pymongo import MongoClient, ASCENDING, DESCENDING
from utils.config import DATE_FORMAT
import os
PREFIX = "https://www.hsx.vn/Modules/Chart/StaticChart/"
TIME = {"1m": 2, "3m": 3, "6m": 4, "1y": 5, "2y": 6, "5y": 7}
INDICATORS = {
"EMA": "GetEmaChart",
"MACD": "GetMacdChart",
"RSI": "GetRsiChart",
"Momentum": "GetMomentumChart",
"Williams %R": "GetWilliamChart",
"BollingerBand": "GetBollingerBandChart",
}
class Indicator:
def __init__(self) -> None:
pass
@staticmethod
def get_price(symbol: str,
count_back: int = 100,
symbol_type: str = "stock",) -> pd.DataFrame:
resolution = "D"
end_date = datetime.now()
delta = timedelta(days=count_back)
start_date = end_date - delta
start_date = datetime.strftime(start_date, '%Y-%m-%d')
end_date = datetime.strftime(end_date, '%Y-%m-%d')
df = longterm_ohlc_data(symbol, start_date, end_date,
resolution, symbol_type).reset_index(drop=True)
df[['open', 'high', 'low', 'close']] = \
df[['open', 'high', 'low', 'close']] * 1000
# convert open, high, low, close to int
df[['open', 'high', 'low', 'close']] = \
df[['open', 'high', 'low', 'close']].astype(int)
return df
@staticmethod
def get_rsi(
symbol: str,
periods: int = 14,
smooth_k: int = 3,
smooth_d: int = 3,
) -> pd.DataFrame:
try:
symbol = symbol.upper()
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
collection = database.get_collection("rsi")
datetime_now = datetime.utcnow()
hour = datetime_now.hour
weekday = datetime_now.weekday()
error = False
if weekday < 5 and hour >= 12: # Check if data is updated
newest_record = collection.find_one(sort=[("_id", DESCENDING)])
delta = timedelta(days=20)
start_date = datetime_now - delta
start_date = start_date.strftime(DATE_FORMAT)
end_date = datetime_now.strftime(DATE_FORMAT)
tmp_df = stock_historical_data("MBB", start_date, end_date)
last_date = str(tmp_df["time"].iloc[-1])
if newest_record["time"] != last_date:
try:
lst_symbols = list(newest_record.keys())[2:]
record = {}
record["time"] = last_date
for s in lst_symbols:
url = PREFIX + INDICATORS["RSI"] + \
f"?stockSymbol={symbol}\
&rangeSelector=0\
&periods={periods}"
data = requests.get(url).json()
lst_rsi = data["SeriesColection"][0]["Points"]
record[s] = lst_rsi[-1]["Value"][0]
collection.find_one_and_delete(
sort=[("_id", ASCENDING)])
collection.insert_one(record)
print("Updated data")
except Exception:
error = True
records = list(collection.find())
record_df = pd.DataFrame(records).drop(columns=["_id"])
record_df = \
record_df[["time", symbol]].rename(columns={symbol: "rsi"})
if error:
new_df = price_board(symbol)
record_df.loc[len(record_df)] = new_df[["time", "RSI"]].values
record_df["stoch_rsi"] = \
Indicator.stoch_rsi(record_df["rsi"], periods)
record_df["stoch_rsi_smooth_k"] = \
Indicator.stoch_rsi_smooth_k(record_df["stoch_rsi"], smooth_k)
record_df["stoch_rsi_smooth_d"] = Indicator.stoch_rsi_smooth_d(
record_df["stoch_rsi_smooth_k"], smooth_d
)
return record_df
except Exception:
return None
@staticmethod
def stoch_rsi(rsi: pd.Series, periods: int = 14) -> pd.Series:
ma, mi = (
rsi.rolling(window=periods).max(),
rsi.rolling(window=periods).min(),
)
return (rsi - mi) * 100 / (ma - mi)
@staticmethod
def stoch_rsi_smooth_k(stoch_rsi: pd.Series, k: int) -> pd.Series:
return stoch_rsi.rolling(window=k).mean()
@staticmethod
def stoch_rsi_smooth_d(stoch_rsi_k: pd.Series, d: int) -> pd.Series:
return stoch_rsi_k.rolling(window=d).mean()
@staticmethod
def get_ichimoku_cloud(
df: pd.DataFrame,
conversion_period=9,
base_period=26,
span_b_period=52,
displacement=26,
) -> pd.DataFrame:
space_displacement = np.full(displacement, np.nan)
tenkan_sen = (
df["high"].rolling(window=conversion_period).max()
+ df["low"].rolling(window=conversion_period).min()
) / 2
kijun_sen = (
df["high"].rolling(window=base_period).max()
+ df["low"].rolling(window=base_period).min()
) / 2
senkou_span_a = (tenkan_sen + kijun_sen) / 2
senkou_span_b = (
df["high"].rolling(window=span_b_period).max()
+ df["low"].rolling(window=span_b_period).min()
) / 2
chikou_span = df["close"].shift(-displacement)
date_displacement = np.array(
list(map(str, np.arange(1, displacement+1))))
time = np.concatenate((df["time"], date_displacement))
tenkan_sen = np.concatenate((tenkan_sen, space_displacement))
kijun_sen = np.concatenate((kijun_sen, space_displacement))
senkou_span_a = np.concatenate((space_displacement, senkou_span_a))
senkou_span_b = np.concatenate((space_displacement, senkou_span_b))
chikou_span = np.concatenate((chikou_span, space_displacement))
data_dict = {
"time": time,
"tenkan_sen": tenkan_sen,
"kijun_sen": kijun_sen,
"senkou_span_a": senkou_span_a,
"senkou_span_b": senkou_span_b,
"chikou_span": chikou_span,
"tenkan_kijun": tenkan_sen - kijun_sen,
"kumo_cloud": senkou_span_a - senkou_span_b,
"signal": senkou_span_a > senkou_span_b,
}
return pd.DataFrame(data_dict)