Spaces:
Running
Running
import requests | |
from datetime import datetime, timedelta | |
import pandas as pd | |
import numpy as np | |
from vnstock import longterm_ohlc_data | |
from pymongo import MongoClient, ASCENDING, DESCENDING | |
from utils.config import DATE_FORMAT, VN100_URL | |
from .utils import Utility | |
import os | |
# from dotenv import load_dotenv | |
# load_dotenv() | |
PREFIX = "https://www.hsx.vn/Modules/Chart/StaticChart/" | |
TIME = {"1m": 2, "3m": 3, "6m": 4, "1y": 5, "2y": 6, "5y": 7} | |
INDICATORS = { | |
"EMA": "GetEmaChart", | |
"MACD": "GetMacdChart", | |
"RSI": "GetRsiChart", | |
"Momentum": "GetMomentumChart", | |
"Williams %R": "GetWilliamChart", | |
"BollingerBand": "GetBollingerBandChart", | |
} | |
updating_price = False | |
updating_rsi = False | |
updating_macd = False | |
class Indicator: | |
def __init__(self) -> None: | |
pass | |
def get_vn100(source: str = "database") -> dict: | |
try: | |
lst_symbols = [] | |
if source == "database": | |
uri = os.environ.get("MONGODB_URI") | |
client = MongoClient(uri) | |
database = client.get_database("data") | |
collection = database.get_collection("rsi") | |
newest_record = collection.find_one() | |
lst_symbols = list(newest_record["value"].keys()) | |
elif source == "hsx": | |
data = requests.get(VN100_URL).json() | |
data = data["rows"] | |
for value in data: | |
lst_symbols.append(value["cell"][2].strip()) | |
return {"symbols": lst_symbols} | |
except Exception as e: | |
return {"message": f"Caught error {e}"} | |
def update_daily_price() -> None: | |
global updating_price | |
if updating_price: | |
return | |
updating_price = True | |
datetime_now = datetime.utcnow() | |
hour = datetime_now.hour | |
weekday = datetime_now.weekday() | |
start_date = datetime_now.date() | |
delta = timedelta(days=1) | |
end_date = start_date + delta | |
start_date = start_date.strftime(DATE_FORMAT) | |
end_date = end_date.strftime(DATE_FORMAT) | |
try: | |
if weekday < 5 and hour >= 12: | |
uri = os.environ.get("MONGODB_URI") | |
client = MongoClient(uri) | |
database = client.get_database("data") | |
collection = database.get_collection("price") | |
tmp_df = longterm_ohlc_data("MBB", | |
start_date, | |
end_date, | |
"D", | |
"stock").reset_index(drop=True) | |
if tmp_df.shape[0] == 0: | |
updating_price = False | |
return | |
last_date = str(tmp_df["time"]) | |
newest_record = \ | |
collection.find_one(sort=[("_id", DESCENDING)]) | |
if newest_record["time"] != last_date: | |
lst_symbols = list(newest_record["value"].keys()) | |
record = {} | |
record["time"] = last_date | |
values = [] | |
for symbol in lst_symbols: | |
df = longterm_ohlc_data(symbol, | |
start_date, | |
end_date, | |
resolution="D", | |
type="stock" | |
).reset_index(drop=True) | |
df[["open", "high", "low", "close"]] = \ | |
df[["open", "high", "low", "close"]] * 1000 | |
# convert open, high, low, close to int | |
df[["open", "high", "low", "close"]] = \ | |
df[["open", "high", "low", "close"]].astype(int) | |
df = df[["ticker", "open", | |
"high", "low", "close"]].iloc[-1] | |
values.append(df.to_dict()) | |
record["value"] = values | |
collection.insert_one(record) | |
collection.find_one_and_delete({}, sort=[("_id", ASCENDING)]) | |
print("Updated price") | |
updating_price = False | |
except Exception as e: | |
print(f"Error: {e}") | |
updating_price = False | |
def update_entire_price() -> None: | |
global updating_price | |
if updating_price: | |
return | |
updating_price = True | |
datetime_now = datetime.utcnow() | |
hour = datetime_now.hour | |
weekday = datetime_now.weekday() | |
try: | |
symbol_dict = Indicator.get_vn100(source="hsx") | |
lst_symbols = symbol_dict["symbols"] | |
if weekday < 5 and hour < 12: | |
updating_price = False | |
return | |
end_date = datetime_now.date() | |
delta = timedelta(days=300) | |
start_date = end_date - delta | |
start_date = start_date.strftime(DATE_FORMAT) | |
end_date = end_date.strftime(DATE_FORMAT) | |
lst_time = [] | |
lst_values = [] | |
if len(lst_symbols) != 100: | |
updating_price = False | |
return | |
for symbol in lst_symbols: | |
df = longterm_ohlc_data(symbol, | |
start_date, | |
end_date, | |
resolution="D", | |
type="stock" | |
).reset_index(drop=True).tail(150) | |
lst_time = list(df["time"]) | |
df[["open", "high", "low", "close"]] = \ | |
df[["open", "high", "low", "close"]] * 1000 | |
# convert open, high, low, close to int | |
df[["open", "high", "low", "close"]] = \ | |
df[["open", "high", "low", "close"]].astype(int) | |
df = df[["ticker", "open", "high", "low", "close"]] | |
lst_values.append(df.to_dict(orient="records")) | |
records = [] | |
for i in range(len(lst_time)): | |
record = {} | |
record["time"] = lst_time[i] | |
values = [] | |
for symbol_index in range(100): | |
values.append(lst_values[symbol_index][i]) | |
record["value"] = values | |
records.append(record) | |
uri = os.environ.get("MONGODB_URI") | |
client = MongoClient(uri) | |
database = client.get_database("data") | |
collection = database.get_collection("price") | |
if "price" in database.list_collection_names(): | |
collection.drop() | |
collection.insert_many(records) | |
print("Updated entire price") | |
updating_price = False | |
except Exception as e: | |
print(f"Error: {e}") | |
updating_price = False | |
def get_price(symbol: str, | |
count_back: int = 150, | |
source: str = "database") -> pd.DataFrame: | |
try: | |
symbol = symbol.upper() | |
if source == "database": | |
uri = os.environ.get("MONGODB_URI") | |
client = MongoClient(uri) | |
database = client.get_database("data") | |
collection = database.get_collection("price") | |
result = list(collection.find()) | |
tmp_df = pd.DataFrame(result[0]["value"]) | |
lst_symbols = tmp_df["ticker"].values | |
if symbol not in lst_symbols: | |
return pd.DataFrame( | |
[{"message": "The symbol is not existed"}] | |
) | |
symbol_index = np.argwhere(lst_symbols == symbol)[0][0] | |
lst_values = [] | |
for record in result: | |
value = record["value"][symbol_index] | |
value["time"] = record["time"] | |
lst_values.append(value) | |
return pd.DataFrame(lst_values) | |
elif source == "vnstock": | |
datetime_now = datetime.utcnow() | |
end_date = datetime_now.date() | |
delta = timedelta(days=count_back) | |
start_date = end_date - delta | |
start_date = start_date.strftime(DATE_FORMAT) | |
end_date = end_date.strftime(DATE_FORMAT) | |
df = longterm_ohlc_data(symbol, | |
start_date, | |
end_date, | |
resolution="D", | |
type="stock" | |
).reset_index(drop=True) | |
df[["open", "high", "low", "close"]] = \ | |
df[["open", "high", "low", "close"]] * 1000 | |
# convert open, high, low, close to int | |
df[["open", "high", "low", "close"]] = \ | |
df[["open", "high", "low", "close"]].astype(int) | |
return df[["time", "ticker", "open", "high", "low", "close"]] | |
except Exception as e: | |
return pd.DataFrame( | |
[{"message": f"Caught error {e}"}] | |
) | |
def update_daily_rsi() -> None: | |
global updating_rsi | |
if updating_rsi: | |
return | |
updating_rsi = True | |
datetime_now = datetime.utcnow() | |
hour = datetime_now.hour | |
weekday = datetime_now.weekday() | |
start_date = datetime_now.date() | |
delta = timedelta(days=1) | |
end_date = start_date + delta | |
start_date = start_date.strftime(DATE_FORMAT) | |
end_date = end_date.strftime(DATE_FORMAT) | |
try: | |
if weekday < 5 and hour >= 12: | |
uri = os.environ.get("MONGODB_URI") | |
client = MongoClient(uri) | |
database = client.get_database("data") | |
collection = database.get_collection("price") | |
tmp_df = longterm_ohlc_data("MBB", | |
start_date, | |
end_date, | |
"D", | |
"stock").reset_index(drop=True) | |
if tmp_df.shape[0] == 0: | |
updating_rsi = False | |
return | |
last_date = str(tmp_df["time"]) | |
newest_record = \ | |
collection.find_one(sort=[("_id", DESCENDING)]) | |
if newest_record["time"] != last_date: | |
lst_symbols = list(newest_record["value"].keys()) | |
record = {} | |
record["time"] = last_date | |
str_symbols = ','.join(lst_symbols) | |
data = requests.get('https://apipubaws.tcbs.com.vn/stock-insight/v1/stock/second-tc-price?tickers={}'.format(str_symbols)).json() | |
for i in data["data"]: | |
record[i["t"]] = i["rsi"] | |
collection.insert_one(record) | |
collection.find_one_and_delete({}, sort=[("_id", ASCENDING)]) | |
print("Updated daily rsi") | |
updating_rsi = False | |
except Exception as e: | |
print(f"Error: {e}") | |
updating_rsi = False | |
def update_entire_rsi() -> None: | |
global updating_rsi | |
if updating_rsi: | |
return | |
updating_rsi = True | |
datetime_now = datetime.utcnow() | |
hour = datetime_now.hour | |
weekday = datetime_now.weekday() | |
try: | |
symbol_dict = Indicator.get_vn100(source="hsx") | |
lst_symbols = symbol_dict["symbols"] | |
if weekday < 5 and hour < 12: | |
updating_rsi = False | |
return | |
get_time = True | |
lst_values = {} | |
if len(lst_symbols) != 100: | |
updating_rsi = False | |
return | |
cnt = 0 | |
for symbol in lst_symbols: | |
cnt += 1 | |
url = PREFIX + INDICATORS["RSI"] \ | |
+ f"?stockSymbol={symbol}&rangeSelector=3&periods=14" | |
data = requests.get(url).json() | |
rsi_df = pd.DataFrame(data["SeriesColection"][0]["Points"]) | |
if get_time: | |
lst_values["time"] = \ | |
rsi_df["Time"].apply(lambda x: | |
Utility.ts_to_date(x/1000)) | |
get_time = False | |
lst_values[symbol] = rsi_df["Value"].apply(lambda x: x[0]) | |
df = pd.DataFrame(lst_values) | |
records = df.to_dict(orient="records") | |
uri = os.environ.get("MONGODB_URI") | |
client = MongoClient(uri) | |
database = client.get_database("data") | |
collection = database.get_collection("rsi") | |
if "rsi" in database.list_collection_names(): | |
collection.drop() | |
collection.insert_many(records) | |
print("Updated entire RSI") | |
updating_rsi = False | |
except Exception as e: | |
print(f"Error: {e}") | |
updating_rsi = False | |
def get_rsi( | |
symbol: str, | |
periods: int = 14, | |
smooth_k: int = 3, | |
smooth_d: int = 3, | |
) -> pd.DataFrame: | |
try: | |
symbol = symbol.upper() | |
uri = os.environ.get("MONGODB_URI") | |
client = MongoClient(uri) | |
database = client.get_database("data") | |
collection = database.get_collection("rsi") | |
records = list(collection.find()) | |
record_df = pd.DataFrame(records).drop(columns=["_id"]) | |
record_df = \ | |
record_df[["time", symbol]].rename(columns={symbol: "rsi"}) | |
record_df["stoch_rsi"] = \ | |
Indicator.stoch_rsi(record_df["rsi"], periods) | |
record_df["stoch_rsi_smooth_k"] = \ | |
Indicator.stoch_rsi_smooth_k(record_df["stoch_rsi"], smooth_k) | |
record_df["stoch_rsi_smooth_d"] = Indicator.stoch_rsi_smooth_d( | |
record_df["stoch_rsi_smooth_k"], smooth_d | |
) | |
return record_df | |
except Exception as e: | |
return pd.DataFrame( | |
[{"message": f"Caught error {e}"}] | |
) | |
def stoch_rsi(rsi: pd.Series, periods: int = 14) -> pd.Series: | |
ma, mi = ( | |
rsi.rolling(window=periods).max(), | |
rsi.rolling(window=periods).min(), | |
) | |
return (rsi - mi) * 100 / (ma - mi) | |
def stoch_rsi_smooth_k(stoch_rsi: pd.Series, k: int) -> pd.Series: | |
return stoch_rsi.rolling(window=k).mean() | |
def stoch_rsi_smooth_d(stoch_rsi_k: pd.Series, d: int) -> pd.Series: | |
return stoch_rsi_k.rolling(window=d).mean() | |
def update_entire_macd() -> None: | |
global updating_macd | |
if updating_macd: | |
return | |
updating_macd = True | |
datetime_now = datetime.utcnow() | |
hour = datetime_now.hour | |
weekday = datetime_now.weekday() | |
try: | |
symbol_dict = Indicator.get_vn100(source="hsx") | |
lst_symbols = symbol_dict["symbols"] | |
if weekday < 5 and hour < 12: | |
updating_macd = False | |
return | |
get_time = True | |
lst_values = {} | |
if len(lst_symbols) != 100: | |
updating_macd = False | |
return | |
cnt = 0 | |
for symbol in lst_symbols: | |
cnt += 1 | |
url = PREFIX + INDICATORS["MACD"] \ | |
+ f"?stockSymbol={symbol}&rangeSelector=4&fastPeriod=12&slowPeriod=26&signalPeriod=9" | |
data = requests.get(url).json() | |
rsi_df = pd.DataFrame( | |
data["SeriesColection"][0]["Points"]).tail(20) | |
if get_time: | |
lst_values["time"] = \ | |
rsi_df["Time"].apply(lambda x: | |
Utility.ts_to_date(x/1000)) | |
get_time = False | |
lst_values[symbol] = rsi_df["Value"].apply(lambda x: int(x[0])) | |
df = pd.DataFrame(lst_values) | |
records = df.to_dict(orient="records") | |
uri = os.environ.get("MONGODB_URI") | |
client = MongoClient(uri) | |
database = client.get_database("data") | |
collection = database.get_collection("macd") | |
if "macd" in database.list_collection_names(): | |
collection.drop() | |
collection.insert_many(records) | |
print("Updated entire MACD") | |
updating_macd = False | |
except Exception as e: | |
print(f"Error: {e}") | |
updating_macd = False | |
def get_macd( | |
symbol: str, | |
) -> pd.DataFrame: | |
try: | |
symbol = symbol.upper() | |
uri = os.environ.get("MONGODB_URI") | |
client = MongoClient(uri) | |
database = client.get_database("data") | |
collection = database.get_collection("macd") | |
records = list(collection.find()) | |
record_df = pd.DataFrame(records).drop(columns=["_id"]) | |
record_df = \ | |
record_df[["time", symbol]].rename(columns={symbol: "macd"}) | |
return record_df | |
except Exception as e: | |
return pd.DataFrame( | |
[{"message": f"Caught error {e}"}] | |
) | |
def get_ichimoku_cloud( | |
df: pd.DataFrame, | |
conversion_period=9, | |
base_period=26, | |
span_b_period=52, | |
displacement=26, | |
) -> pd.DataFrame: | |
space_displacement = np.full(displacement, np.nan) | |
tenkan_sen = ( | |
df["high"].rolling(window=conversion_period).max() | |
+ df["low"].rolling(window=conversion_period).min() | |
) / 2 | |
kijun_sen = ( | |
df["high"].rolling(window=base_period).max() | |
+ df["low"].rolling(window=base_period).min() | |
) / 2 | |
senkou_span_a = (tenkan_sen + kijun_sen) / 2 | |
senkou_span_b = ( | |
df["high"].rolling(window=span_b_period).max() | |
+ df["low"].rolling(window=span_b_period).min() | |
) / 2 | |
chikou_span = df["close"].shift(-displacement) | |
last_date = datetime.strptime(df["time"].iloc[-1], DATE_FORMAT) | |
lst_date = Utility.generate_dates(last_date, displacement) | |
time = np.concatenate((df["time"], lst_date)) | |
tenkan_sen = np.concatenate((tenkan_sen, space_displacement)) | |
kijun_sen = np.concatenate((kijun_sen, space_displacement)) | |
senkou_span_a = np.concatenate((space_displacement, senkou_span_a)) | |
senkou_span_b = np.concatenate((space_displacement, senkou_span_b)) | |
chikou_span = np.concatenate((chikou_span, space_displacement)) | |
data_dict = { | |
"time": time, | |
"tenkan_sen": tenkan_sen, | |
"kijun_sen": kijun_sen, | |
"senkou_span_a": senkou_span_a, | |
"senkou_span_b": senkou_span_b, | |
"chikou_span": chikou_span, | |
"tenkan_kijun": tenkan_sen - kijun_sen, | |
"kumo_cloud": senkou_span_a - senkou_span_b | |
} | |
return pd.DataFrame(data_dict) | |