Spaces:
Running
Running
File size: 6,754 Bytes
884dad1 048da70 884dad1 048da70 884dad1 048da70 884dad1 048da70 884dad1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import requests
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from vnstock import longterm_ohlc_data, stock_historical_data, price_board
from pymongo import MongoClient, ASCENDING, DESCENDING
from utils.config import DATE_FORMAT
import os
PREFIX = "https://www.hsx.vn/Modules/Chart/StaticChart/"
TIME = {"1m": 2, "3m": 3, "6m": 4, "1y": 5, "2y": 6, "5y": 7}
INDICATORS = {
"EMA": "GetEmaChart",
"MACD": "GetMacdChart",
"RSI": "GetRsiChart",
"Momentum": "GetMomentumChart",
"Williams %R": "GetWilliamChart",
"BollingerBand": "GetBollingerBandChart",
}
class Indicator:
def __init__(self) -> None:
pass
@staticmethod
def get_price(symbol: str,
count_back: int = 100,
symbol_type: str = "stock",) -> pd.DataFrame:
resolution = "D"
end_date = datetime.now()
delta = timedelta(days=count_back)
start_date = end_date - delta
start_date = datetime.strftime(start_date, '%Y-%m-%d')
end_date = datetime.strftime(end_date, '%Y-%m-%d')
df = longterm_ohlc_data(symbol, start_date, end_date,
resolution, symbol_type).reset_index(drop=True)
df[['open', 'high', 'low', 'close']] = \
df[['open', 'high', 'low', 'close']] * 1000
# convert open, high, low, close to int
df[['open', 'high', 'low', 'close']] = \
df[['open', 'high', 'low', 'close']].astype(int)
return df
@staticmethod
def get_rsi(
symbol: str,
periods: int = 14,
smooth_k: int = 3,
smooth_d: int = 3,
) -> pd.DataFrame:
try:
symbol = symbol.upper()
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
collection = database.get_collection("rsi")
datetime_now = datetime.utcnow()
hour = datetime_now.hour
weekday = datetime_now.weekday()
error = False
if weekday < 5 and hour >= 12: # Check if data is updated
newest_record = collection.find_one(sort=[("_id", DESCENDING)])
delta = timedelta(days=20)
start_date = datetime_now - delta
start_date = start_date.strftime(DATE_FORMAT)
end_date = datetime_now.strftime(DATE_FORMAT)
tmp_df = stock_historical_data("MBB", start_date, end_date)
last_date = str(tmp_df["time"].iloc[-1])
if newest_record["time"] != last_date:
try:
lst_symbols = list(newest_record.keys())[2:]
record = {}
record["time"] = last_date
for s in lst_symbols:
url = PREFIX + INDICATORS["RSI"] + \
f"?stockSymbol={symbol}\
&rangeSelector=0\
&periods={periods}"
data = requests.get(url).json()
lst_rsi = data["SeriesColection"][0]["Points"]
record[s] = lst_rsi[-1]["Value"][0]
collection.find_one_and_delete(
sort=[("_id", ASCENDING)])
collection.insert_one(record)
print("Updated data")
except Exception:
error = True
records = list(collection.find())
record_df = pd.DataFrame(records).drop(columns=["_id"])
record_df = \
record_df[["time", symbol]].rename(columns={symbol: "rsi"})
if error:
new_df = price_board(symbol)
record_df.loc[len(record_df)] = new_df[["time", "RSI"]].values
record_df["stoch_rsi"] = \
Indicator.stoch_rsi(record_df["rsi"], periods)
record_df["stoch_rsi_smooth_k"] = \
Indicator.stoch_rsi_smooth_k(record_df["stoch_rsi"], smooth_k)
record_df["stoch_rsi_smooth_d"] = Indicator.stoch_rsi_smooth_d(
record_df["stoch_rsi_smooth_k"], smooth_d
)
return record_df
except Exception:
return None
@staticmethod
def stoch_rsi(rsi: pd.Series, periods: int = 14) -> pd.Series:
ma, mi = (
rsi.rolling(window=periods).max(),
rsi.rolling(window=periods).min(),
)
return (rsi - mi) * 100 / (ma - mi)
@staticmethod
def stoch_rsi_smooth_k(stoch_rsi: pd.Series, k: int) -> pd.Series:
return stoch_rsi.rolling(window=k).mean()
@staticmethod
def stoch_rsi_smooth_d(stoch_rsi_k: pd.Series, d: int) -> pd.Series:
return stoch_rsi_k.rolling(window=d).mean()
@staticmethod
def get_ichimoku_cloud(
df: pd.DataFrame,
conversion_period=9,
base_period=26,
span_b_period=52,
displacement=26,
) -> pd.DataFrame:
space_displacement = np.full(displacement, np.nan)
tenkan_sen = (
df["high"].rolling(window=conversion_period).max()
+ df["low"].rolling(window=conversion_period).min()
) / 2
kijun_sen = (
df["high"].rolling(window=base_period).max()
+ df["low"].rolling(window=base_period).min()
) / 2
senkou_span_a = (tenkan_sen + kijun_sen) / 2
senkou_span_b = (
df["high"].rolling(window=span_b_period).max()
+ df["low"].rolling(window=span_b_period).min()
) / 2
chikou_span = df["close"].shift(-displacement)
date_displacement = np.array(
list(map(str, np.arange(1, displacement+1))))
time = np.concatenate((df["time"], date_displacement))
tenkan_sen = np.concatenate((tenkan_sen, space_displacement))
kijun_sen = np.concatenate((kijun_sen, space_displacement))
senkou_span_a = np.concatenate((space_displacement, senkou_span_a))
senkou_span_b = np.concatenate((space_displacement, senkou_span_b))
chikou_span = np.concatenate((chikou_span, space_displacement))
data_dict = {
"time": time,
"tenkan_sen": tenkan_sen,
"kijun_sen": kijun_sen,
"senkou_span_a": senkou_span_a,
"senkou_span_b": senkou_span_b,
"chikou_span": chikou_span,
"tenkan_kijun": tenkan_sen - kijun_sen,
"kumo_cloud": senkou_span_a - senkou_span_b,
"signal": senkou_span_a > senkou_span_b,
}
return pd.DataFrame(data_dict)
|