Spaces:
Running
Running
import requests | |
from datetime import datetime, timedelta | |
import pandas as pd | |
import numpy as np | |
from vnstock import longterm_ohlc_data, stock_historical_data, price_board | |
from pymongo import MongoClient, ASCENDING, DESCENDING | |
from utils.config import DATE_FORMAT | |
import os | |
PREFIX = "https://www.hsx.vn/Modules/Chart/StaticChart/" | |
TIME = {"1m": 2, "3m": 3, "6m": 4, "1y": 5, "2y": 6, "5y": 7} | |
INDICATORS = { | |
"EMA": "GetEmaChart", | |
"MACD": "GetMacdChart", | |
"RSI": "GetRsiChart", | |
"Momentum": "GetMomentumChart", | |
"Williams %R": "GetWilliamChart", | |
"BollingerBand": "GetBollingerBandChart", | |
} | |
class Indicator: | |
def __init__(self) -> None: | |
pass | |
def get_price(symbol: str, | |
count_back: int = 100, | |
symbol_type: str = "stock",) -> pd.DataFrame: | |
resolution = "D" | |
end_date = datetime.now() | |
delta = timedelta(days=count_back) | |
start_date = end_date - delta | |
start_date = datetime.strftime(start_date, '%Y-%m-%d') | |
end_date = datetime.strftime(end_date, '%Y-%m-%d') | |
df = longterm_ohlc_data(symbol, start_date, end_date, | |
resolution, symbol_type).reset_index(drop=True) | |
df[['open', 'high', 'low', 'close']] = \ | |
df[['open', 'high', 'low', 'close']] * 1000 | |
# convert open, high, low, close to int | |
df[['open', 'high', 'low', 'close']] = \ | |
df[['open', 'high', 'low', 'close']].astype(int) | |
return df | |
def get_rsi( | |
symbol: str, | |
periods: int = 14, | |
smooth_k: int = 3, | |
smooth_d: int = 3, | |
) -> pd.DataFrame: | |
try: | |
symbol = symbol.upper() | |
uri = os.environ.get("MONGODB_URI") | |
client = MongoClient(uri) | |
database = client.get_database("data") | |
collection = database.get_collection("rsi") | |
datetime_now = datetime.utcnow() | |
hour = datetime_now.hour | |
weekday = datetime_now.weekday() | |
error = False | |
if weekday < 5 and hour >= 12: # Check if data is updated | |
newest_record = collection.find_one(sort=[("_id", DESCENDING)]) | |
delta = timedelta(days=20) | |
start_date = datetime_now - delta | |
start_date = start_date.strftime(DATE_FORMAT) | |
end_date = datetime_now.strftime(DATE_FORMAT) | |
tmp_df = stock_historical_data("MBB", start_date, end_date) | |
last_date = str(tmp_df["time"].iloc[-1]) | |
if newest_record["time"] != last_date: | |
try: | |
lst_symbols = list(newest_record.keys())[2:] | |
record = {} | |
record["time"] = last_date | |
for s in lst_symbols: | |
url = PREFIX + INDICATORS["RSI"] + \ | |
f"?stockSymbol={symbol}\ | |
&rangeSelector=0\ | |
&periods={periods}" | |
data = requests.get(url).json() | |
lst_rsi = data["SeriesColection"][0]["Points"] | |
record[s] = lst_rsi[-1]["Value"][0] | |
collection.find_one_and_delete( | |
sort=[("_id", ASCENDING)]) | |
collection.insert_one(record) | |
print("Updated data") | |
except Exception: | |
error = True | |
records = list(collection.find()) | |
record_df = pd.DataFrame(records).drop(columns=["_id"]) | |
record_df = \ | |
record_df[["time", symbol]].rename(columns={symbol: "rsi"}) | |
if error: | |
new_df = price_board(symbol) | |
record_df.loc[len(record_df)] = new_df[["time", "RSI"]].values | |
record_df["stoch_rsi"] = \ | |
Indicator.stoch_rsi(record_df["rsi"], periods) | |
record_df["stoch_rsi_smooth_k"] = \ | |
Indicator.stoch_rsi_smooth_k(record_df["stoch_rsi"], smooth_k) | |
record_df["stoch_rsi_smooth_d"] = Indicator.stoch_rsi_smooth_d( | |
record_df["stoch_rsi_smooth_k"], smooth_d | |
) | |
return record_df | |
except Exception: | |
return None | |
def stoch_rsi(rsi: pd.Series, periods: int = 14) -> pd.Series: | |
ma, mi = ( | |
rsi.rolling(window=periods).max(), | |
rsi.rolling(window=periods).min(), | |
) | |
return (rsi - mi) * 100 / (ma - mi) | |
def stoch_rsi_smooth_k(stoch_rsi: pd.Series, k: int) -> pd.Series: | |
return stoch_rsi.rolling(window=k).mean() | |
def stoch_rsi_smooth_d(stoch_rsi_k: pd.Series, d: int) -> pd.Series: | |
return stoch_rsi_k.rolling(window=d).mean() | |
def get_ichimoku_cloud( | |
df: pd.DataFrame, | |
conversion_period=9, | |
base_period=26, | |
span_b_period=52, | |
displacement=26, | |
) -> pd.DataFrame: | |
space_displacement = np.full(displacement, np.nan) | |
tenkan_sen = ( | |
df["high"].rolling(window=conversion_period).max() | |
+ df["low"].rolling(window=conversion_period).min() | |
) / 2 | |
kijun_sen = ( | |
df["high"].rolling(window=base_period).max() | |
+ df["low"].rolling(window=base_period).min() | |
) / 2 | |
senkou_span_a = (tenkan_sen + kijun_sen) / 2 | |
senkou_span_b = ( | |
df["high"].rolling(window=span_b_period).max() | |
+ df["low"].rolling(window=span_b_period).min() | |
) / 2 | |
chikou_span = df["close"].shift(-displacement) | |
date_displacement = np.array( | |
list(map(str, np.arange(1, displacement+1)))) | |
time = np.concatenate((df["time"], date_displacement)) | |
tenkan_sen = np.concatenate((tenkan_sen, space_displacement)) | |
kijun_sen = np.concatenate((kijun_sen, space_displacement)) | |
senkou_span_a = np.concatenate((space_displacement, senkou_span_a)) | |
senkou_span_b = np.concatenate((space_displacement, senkou_span_b)) | |
chikou_span = np.concatenate((chikou_span, space_displacement)) | |
data_dict = { | |
"time": time, | |
"tenkan_sen": tenkan_sen, | |
"kijun_sen": kijun_sen, | |
"senkou_span_a": senkou_span_a, | |
"senkou_span_b": senkou_span_b, | |
"chikou_span": chikou_span, | |
"tenkan_kijun": tenkan_sen - kijun_sen, | |
"kumo_cloud": senkou_span_a - senkou_span_b, | |
"signal": senkou_span_a > senkou_span_b, | |
} | |
return pd.DataFrame(data_dict) | |