logic-center / routes /signal.py
camphong24032002
Test
c39532a
from services.preprocess import Preprocess
from services.strategy import Strategy
from services.notification import Notification
from models.rsi import RSISignalPayload
from models.ichimoku import IchimokuPayload
from models.macd import MACDSignalPayload
from models.summary import SummarySignalPayload
from pymongo import MongoClient, ASCENDING, DESCENDING
from fastapi import status, APIRouter
from typing import Sequence
import requests
import pandas as pd
import os
import asyncio
# from dotenv import load_dotenv
# load_dotenv()
router = APIRouter()
DATA_URL = "https://intellistock-data-api.hf.space/data"
os.environ["HTTP_PROXY"] = "http://127.0.0.1:8888"
os.environ["HTTPS_PROXY"] = "http://127.0.0.1:8888"
@router.get(
"/test",
name="Get daily signal report",
status_code=status.HTTP_200_OK
)
async def test():
prefix_url = os.environ.get("PREFIX_URL")
await Notification.send_msg(prefix_url, "Test")
@router.get(
"/daily_report",
name="Get daily signal report",
status_code=status.HTTP_200_OK
)
async def get_daily_report():
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
rsi_collection = database.get_collection("rsi")
price_collection = database.get_collection("price")
macd_collection = database.get_collection("macd")
signal_collection = database.get_collection("signal")
rsi_records = list(rsi_collection.find(sort=[("_id", ASCENDING)]))
price_records = list(price_collection.find(sort=[("_id", ASCENDING)]))
macd_records = list(macd_collection.find(sort=[("_id", ASCENDING)]))
rsi_df = pd.DataFrame(rsi_records).drop(columns=["_id"])
macd_df = pd.DataFrame(macd_records).drop(columns=["_id"])
lst_symbols = rsi_df.columns[1:]
last_date = rsi_df["time"].iloc[-1]
newest_record = \
signal_collection.find_one(sort=[("_id", DESCENDING)])
if newest_record["time"] == last_date:
return {"message": "The data is up to date!"}
result = Strategy.get_daily_report(price_records,
rsi_df,
macd_df,
last_date,
lst_symbols)
today_df = result[result["time"] == last_date][["ticker",
"rsi_signal",
"macd_signal",
"ichimoku_signal_1",
"ichimoku_signal_3"]]
today_dict = today_df.to_dict(orient="list")
today_dict["time"] = last_date
signal_collection.insert_one(today_dict)
prefix_url = os.environ.get("PREFIX_URL")
asyncio.create_task(
Notification.daily_message(today_df, prefix_url, last_date))
return {"message": "Updated signal"}
@router.get(
"/daily_nadaraya",
name="Get daily nadaraya signal",
status_code=status.HTTP_200_OK
)
async def get_daily_nadaraya_report():
uri = os.environ.get("MONGODB_URI")
client = MongoClient(uri)
database = client.get_database("data")
nada_collection = database.get_collection("nada")
rsi_collection = database.get_collection("rsi")
rsi_records = list(rsi_collection.find(sort=[("_id", ASCENDING)]))
price_collection = database.get_collection("price")
price_records = list(price_collection.find(sort=[("_id", ASCENDING)]))
rsi_df = pd.DataFrame(rsi_records).drop(columns=["_id"])
lst_symbols = rsi_df.columns[1:]
last_date = rsi_df["time"].iloc[-1]
newest_record = \
nada_collection.find_one(sort=[("_id", DESCENDING)])
if newest_record["time"] == last_date:
return {"message": "The data is up to date!"}
result = Strategy.get_daily_nadaraya(price_records,
lst_symbols,
last_date)
result_dict = result.to_dict(orient="list")
result_dict["time"] = last_date
nada_collection.insert_one(result_dict)
prefix_url = os.environ.get("PREFIX_URL")
asyncio.create_task(
Notification.notify_nadaraya(result, prefix_url, last_date))
return {"message": "Updated nadaraya"}
@router.post(
"/rsi",
name="Get signal of RSI",
status_code=status.HTTP_200_OK
)
async def get_rsi_signal(payload: RSISignalPayload) -> Sequence[dict]:
rsi_dict = {"symbol": payload.symbol,
"periods": payload.periods,
"smooth_k": payload.smooth_k,
"smooth_d": payload.smooth_d}
try:
response = requests.post(DATA_URL+"/get_rsi", json=rsi_dict)
rsi_json = response.json()
rsi_df = Preprocess.lst_dict_to_df(rsi_json)
rsi_df = Strategy.rsi_strategy(
rsi_df, threshold=(
payload.threshold_low, payload.threshold_high
)
)
rsi_signal_lst_dict = \
rsi_df[["time", "rsi_signal"]].to_dict(orient="records")
except Exception as e:
return [{"message": f"Caught error {e}"}]
return rsi_signal_lst_dict
@router.post(
"/ichimoku",
name="Get signal of Ichimoku",
status_code=status.HTTP_200_OK
)
async def get_ichimoku_data(payload: IchimokuPayload) -> Sequence[dict]:
ichimoku_dict = {
"symbol": payload.symbol
}
try:
response = requests.post(DATA_URL+"/get_ichimoku", json=ichimoku_dict)
ichimoku_json = response.json()
ichimoku_df = Preprocess.lst_dict_to_df(ichimoku_json)
ichimoku_df = Strategy.ichimoku_strategy(ichimoku_df)
ichimoku_signal_lst_dict = \
ichimoku_df[["time",
"ichimoku_signal_1",
"ichimoku_signal_3"]].to_dict(orient="records")
except Exception as e:
return [{"message": f"Caught error {e}"}]
return ichimoku_signal_lst_dict
@router.post(
"/macd",
name="Get signal of MACD",
status_code=status.HTTP_200_OK
)
async def get_macd_data(payload: MACDSignalPayload) -> Sequence[dict]:
macd_dict = {
"symbol": payload.symbol
}
try:
response = requests.post(DATA_URL+"/get_macd", json=macd_dict)
macd_json = response.json()
macd_df = Preprocess.lst_dict_to_df(macd_json)
macd_df = Strategy.macd_strategy(macd_df)
macd_signal_lst_dict = \
macd_df[["time", "macd_signal"]].to_dict(orient="records")
except Exception as e:
return [{"message": f"Caught error {e}"}]
return macd_signal_lst_dict
@router.post(
"/summary",
name="Get summary of signal",
status_code=status.HTTP_200_OK
)
async def get_summary_data(payload: SummarySignalPayload) -> Sequence[dict]:
summary_dict = {"symbol": payload.symbol,
"periods": payload.periods,
"smooth_k": payload.smooth_k,
"smooth_d": payload.smooth_d}
try:
response = requests.post(DATA_URL+"/get_rsi",
json=summary_dict)
rsi_json = response.json()
rsi_df = Preprocess.lst_dict_to_df(rsi_json)
rsi_df = Strategy.rsi_strategy(rsi_df)
response = requests.post(DATA_URL+"/get_macd",
json=summary_dict)
macd_json = response.json()
macd_df = Preprocess.lst_dict_to_df(macd_json)
macd_df = Strategy.macd_strategy(macd_df)
df = pd.merge(rsi_df, macd_df, how="left", on="time")
response = requests.post(DATA_URL+"/get_ichimoku",
json=summary_dict)
ichimoku_json = response.json()
ichimoku_df = Preprocess.lst_dict_to_df(ichimoku_json)
ichimoku_df = Strategy.ichimoku_strategy(ichimoku_df)
df = pd.merge(df, ichimoku_df, how="right", on="time")
query_df = \
df.query("ichimoku_signal_1 != -1 | ichimoku_signal_3 != -1")
query_df = query_df.fillna(-1)
summary_lst_dict = query_df.to_dict(orient="records")
except Exception as e:
return [{"message": f"Caught error {e}"}]
return summary_lst_dict