Spaces:
Running
Running
from services.preprocess import Preprocess | |
from services.strategy import Strategy | |
from services.notification import Notification | |
from models.rsi import RSISignalPayload | |
from models.ichimoku import IchimokuPayload | |
from models.macd import MACDSignalPayload | |
from models.summary import SummarySignalPayload | |
from pymongo import MongoClient, ASCENDING, DESCENDING | |
from fastapi import status, APIRouter | |
from typing import Sequence | |
import requests | |
import pandas as pd | |
import os | |
import asyncio | |
# from dotenv import load_dotenv | |
# load_dotenv() | |
router = APIRouter() | |
DATA_URL = "https://intellistock-data-api.hf.space/data" | |
os.environ["HTTP_PROXY"] = "http://127.0.0.1:8888" | |
os.environ["HTTPS_PROXY"] = "http://127.0.0.1:8888" | |
async def test(): | |
prefix_url = os.environ.get("PREFIX_URL") | |
await Notification.send_msg(prefix_url, "Test") | |
async def get_daily_report(): | |
uri = os.environ.get("MONGODB_URI") | |
client = MongoClient(uri) | |
database = client.get_database("data") | |
rsi_collection = database.get_collection("rsi") | |
price_collection = database.get_collection("price") | |
macd_collection = database.get_collection("macd") | |
signal_collection = database.get_collection("signal") | |
rsi_records = list(rsi_collection.find(sort=[("_id", ASCENDING)])) | |
price_records = list(price_collection.find(sort=[("_id", ASCENDING)])) | |
macd_records = list(macd_collection.find(sort=[("_id", ASCENDING)])) | |
rsi_df = pd.DataFrame(rsi_records).drop(columns=["_id"]) | |
macd_df = pd.DataFrame(macd_records).drop(columns=["_id"]) | |
lst_symbols = rsi_df.columns[1:] | |
last_date = rsi_df["time"].iloc[-1] | |
newest_record = \ | |
signal_collection.find_one(sort=[("_id", DESCENDING)]) | |
if newest_record["time"] == last_date: | |
return {"message": "The data is up to date!"} | |
result = Strategy.get_daily_report(price_records, | |
rsi_df, | |
macd_df, | |
last_date, | |
lst_symbols) | |
today_df = result[result["time"] == last_date][["ticker", | |
"rsi_signal", | |
"macd_signal", | |
"ichimoku_signal_1", | |
"ichimoku_signal_3"]] | |
today_dict = today_df.to_dict(orient="list") | |
today_dict["time"] = last_date | |
signal_collection.insert_one(today_dict) | |
prefix_url = os.environ.get("PREFIX_URL") | |
asyncio.create_task( | |
Notification.daily_message(today_df, prefix_url, last_date)) | |
return {"message": "Updated signal"} | |
async def get_daily_nadaraya_report(): | |
uri = os.environ.get("MONGODB_URI") | |
client = MongoClient(uri) | |
database = client.get_database("data") | |
nada_collection = database.get_collection("nada") | |
rsi_collection = database.get_collection("rsi") | |
rsi_records = list(rsi_collection.find(sort=[("_id", ASCENDING)])) | |
price_collection = database.get_collection("price") | |
price_records = list(price_collection.find(sort=[("_id", ASCENDING)])) | |
rsi_df = pd.DataFrame(rsi_records).drop(columns=["_id"]) | |
lst_symbols = rsi_df.columns[1:] | |
last_date = rsi_df["time"].iloc[-1] | |
newest_record = \ | |
nada_collection.find_one(sort=[("_id", DESCENDING)]) | |
if newest_record["time"] == last_date: | |
return {"message": "The data is up to date!"} | |
result = Strategy.get_daily_nadaraya(price_records, | |
lst_symbols, | |
last_date) | |
result_dict = result.to_dict(orient="list") | |
result_dict["time"] = last_date | |
nada_collection.insert_one(result_dict) | |
prefix_url = os.environ.get("PREFIX_URL") | |
asyncio.create_task( | |
Notification.notify_nadaraya(result, prefix_url, last_date)) | |
return {"message": "Updated nadaraya"} | |
async def get_rsi_signal(payload: RSISignalPayload) -> Sequence[dict]: | |
rsi_dict = {"symbol": payload.symbol, | |
"periods": payload.periods, | |
"smooth_k": payload.smooth_k, | |
"smooth_d": payload.smooth_d} | |
try: | |
response = requests.post(DATA_URL+"/get_rsi", json=rsi_dict) | |
rsi_json = response.json() | |
rsi_df = Preprocess.lst_dict_to_df(rsi_json) | |
rsi_df = Strategy.rsi_strategy( | |
rsi_df, threshold=( | |
payload.threshold_low, payload.threshold_high | |
) | |
) | |
rsi_signal_lst_dict = \ | |
rsi_df[["time", "rsi_signal"]].to_dict(orient="records") | |
except Exception as e: | |
return [{"message": f"Caught error {e}"}] | |
return rsi_signal_lst_dict | |
async def get_ichimoku_data(payload: IchimokuPayload) -> Sequence[dict]: | |
ichimoku_dict = { | |
"symbol": payload.symbol | |
} | |
try: | |
response = requests.post(DATA_URL+"/get_ichimoku", json=ichimoku_dict) | |
ichimoku_json = response.json() | |
ichimoku_df = Preprocess.lst_dict_to_df(ichimoku_json) | |
ichimoku_df = Strategy.ichimoku_strategy(ichimoku_df) | |
ichimoku_signal_lst_dict = \ | |
ichimoku_df[["time", | |
"ichimoku_signal_1", | |
"ichimoku_signal_3"]].to_dict(orient="records") | |
except Exception as e: | |
return [{"message": f"Caught error {e}"}] | |
return ichimoku_signal_lst_dict | |
async def get_macd_data(payload: MACDSignalPayload) -> Sequence[dict]: | |
macd_dict = { | |
"symbol": payload.symbol | |
} | |
try: | |
response = requests.post(DATA_URL+"/get_macd", json=macd_dict) | |
macd_json = response.json() | |
macd_df = Preprocess.lst_dict_to_df(macd_json) | |
macd_df = Strategy.macd_strategy(macd_df) | |
macd_signal_lst_dict = \ | |
macd_df[["time", "macd_signal"]].to_dict(orient="records") | |
except Exception as e: | |
return [{"message": f"Caught error {e}"}] | |
return macd_signal_lst_dict | |
async def get_summary_data(payload: SummarySignalPayload) -> Sequence[dict]: | |
summary_dict = {"symbol": payload.symbol, | |
"periods": payload.periods, | |
"smooth_k": payload.smooth_k, | |
"smooth_d": payload.smooth_d} | |
try: | |
response = requests.post(DATA_URL+"/get_rsi", | |
json=summary_dict) | |
rsi_json = response.json() | |
rsi_df = Preprocess.lst_dict_to_df(rsi_json) | |
rsi_df = Strategy.rsi_strategy(rsi_df) | |
response = requests.post(DATA_URL+"/get_macd", | |
json=summary_dict) | |
macd_json = response.json() | |
macd_df = Preprocess.lst_dict_to_df(macd_json) | |
macd_df = Strategy.macd_strategy(macd_df) | |
df = pd.merge(rsi_df, macd_df, how="left", on="time") | |
response = requests.post(DATA_URL+"/get_ichimoku", | |
json=summary_dict) | |
ichimoku_json = response.json() | |
ichimoku_df = Preprocess.lst_dict_to_df(ichimoku_json) | |
ichimoku_df = Strategy.ichimoku_strategy(ichimoku_df) | |
df = pd.merge(df, ichimoku_df, how="right", on="time") | |
query_df = \ | |
df.query("ichimoku_signal_1 != -1 | ichimoku_signal_3 != -1") | |
query_df = query_df.fillna(-1) | |
summary_lst_dict = query_df.to_dict(orient="records") | |
except Exception as e: | |
return [{"message": f"Caught error {e}"}] | |
return summary_lst_dict | |