Spaces:
Running
Running
import pandas as pd | |
import numpy as np | |
from .indicator import Indicator | |
from .config import QUERY_CONDITION | |
from logic.nadaraya import NadarayaWatson | |
class Strategy: | |
""" | |
Signal: -1 is null, 0 is buy and 1 is sell | |
""" | |
def __init__(self) -> None: | |
pass | |
def rsi_strategy(rsi_df, threshold=(20, 80)) -> pd.DataFrame: | |
low, high = threshold[0], threshold[1] | |
signal_values = [] | |
loop_df = rsi_df[["stoch_rsi", | |
"stoch_rsi_smooth_k", | |
"stoch_rsi_smooth_d"]] | |
for stoch, smooth_k, smooth_d in loop_df.values: | |
if stoch <= low and \ | |
smooth_k <= low and \ | |
smooth_d <= low: | |
signal_values.append(0) | |
elif stoch >= high and \ | |
smooth_k >= high and \ | |
smooth_d >= high: | |
signal_values.append(1) | |
else: | |
signal_values.append(-1) | |
rsi_df["rsi_signal"] = signal_values | |
return rsi_df | |
def ichimoku_strategy(df) -> pd.DataFrame: | |
signal_values_1, signal_values_3 = [-1], [-1] | |
for index in range(1, df.shape[0]): | |
prev = df.iloc[index - 1] | |
signal_1, signal_3 = -1, -1 | |
if (not (np.isnan(df["tenkan_kijun"].iloc[index])) and | |
df["tenkan_kijun"].iloc[index - 1: index + 1].prod() < 0): | |
if prev["tenkan_kijun"] < 0 and prev["tenkan_sen"] < min( | |
prev["senkou_span_a"], prev["senkou_span_b"] | |
): | |
signal_1 = 0 | |
elif prev["tenkan_kijun"] > 0 and prev["tenkan_sen"] > max( | |
prev["senkou_span_a"], prev["senkou_span_b"] | |
): | |
signal_1 = 1 | |
signal_values_1.append(signal_1) | |
if (not (np.isnan(df["kumo_cloud"].iloc[index])) and | |
df["kumo_cloud"].iloc[index - 1: index + 1].prod() < 0): | |
if prev["kumo_cloud"] < 0: | |
signal_3 = 0 | |
elif prev["kumo_cloud"] > 0: | |
signal_3 = 1 | |
signal_values_3.append(signal_3) | |
df["ichimoku_signal_1"] = signal_values_1 | |
df["ichimoku_signal_3"] = signal_values_3 | |
return df | |
def macd_strategy(macd_df) -> pd.DataFrame: | |
signal_values = [] | |
for macd in macd_df["macd"]: | |
if macd < 0: | |
signal_values.append(0) | |
elif macd > 0: | |
signal_values.append(1) | |
else: | |
signal_values.append(-1) | |
macd_df["macd_signal"] = signal_values | |
return macd_df | |
def get_daily_report(price_records, | |
rsi_df, | |
macd_df, | |
last_date, | |
lst_symbols): | |
try: | |
result = [] | |
for index, symbol in enumerate(lst_symbols): | |
symbol_rsi_df = rsi_df[["time", symbol]] | |
symbol_macd_df = macd_df[["time", symbol]] | |
price_values = [] | |
for price_record in price_records: | |
tmp_record = price_record["value"][index] | |
tmp_record["time"] = price_record["time"] | |
price_values.append(tmp_record) | |
symbol_price_df = pd.DataFrame(price_values) | |
symbol_rsi_df = \ | |
symbol_rsi_df[["time", symbol]].rename( | |
columns={symbol: "rsi"}) | |
symbol_rsi_df = Indicator.get_all_rsi(symbol_rsi_df) | |
symbol_rsi_df = Strategy.rsi_strategy(symbol_rsi_df) | |
symbol_rsi_df = symbol_rsi_df.iloc[-1:] | |
symbol_macd_df = \ | |
symbol_macd_df[["time", symbol]].rename( | |
columns={symbol: "macd"}) | |
symbol_macd_df = Strategy.macd_strategy(symbol_macd_df) | |
symbol_macd_df = symbol_macd_df.iloc[-1:] | |
df = pd.merge(symbol_rsi_df, | |
symbol_macd_df, | |
how="inner", | |
on="time") | |
symbol_ichi_df = Indicator.get_ichimoku_cloud(symbol_price_df) | |
symbol_ichi_df = Strategy.ichimoku_strategy(symbol_ichi_df) | |
symbol_ichi_df = symbol_ichi_df[["time", | |
"ichimoku_signal_1", | |
"ichimoku_signal_3"]] | |
symbol_ichi_df["ticker"] = symbol | |
df = pd.merge(df, symbol_ichi_df, how="right", on="time") | |
query_df = df[df["time"] >= last_date] | |
query_df = query_df.query(QUERY_CONDITION) | |
query_df = query_df.fillna(-1) | |
result.extend(query_df.to_dict(orient="records")) | |
result_df = pd.DataFrame(result) | |
return result_df | |
except Exception as e: | |
return {"message": f"Caught error {e}"} | |
def nadaraya_strategy(symbol, price, envelope) -> dict: | |
open_signal, close_signal = -1, -1 | |
if price["open"] >= envelope[0]: | |
open_signal = 1 | |
elif price["open"] <= envelope[1]: | |
open_signal = 0 | |
if price["close"] >= envelope[0]: | |
close_signal = 1 | |
elif price["close"] <= envelope[1]: | |
close_signal = 0 | |
return [{"ticker": symbol, | |
"open_signal": open_signal, | |
"close_signal": close_signal}] | |
def get_daily_nadaraya(price_records, | |
lst_symbols, | |
last_date): | |
try: | |
result = [] | |
for index, symbol in enumerate(lst_symbols): | |
price_values = [] | |
for price_record in price_records: | |
tmp_record = price_record["value"][index] | |
tmp_record["time"] = price_record["time"] | |
price_values.append(tmp_record) | |
symbol_price_df = pd.DataFrame(price_values).iloc[-201:] | |
nada_envelopes = \ | |
NadarayaWatson.nadaraya_watson_envelope_indicator( | |
symbol_price_df["close"]) | |
current_price = symbol_price_df.iloc[-1] | |
signal = Strategy.nadaraya_strategy(symbol, | |
current_price, | |
nada_envelopes[-1]) | |
result.extend(signal) | |
result_df = pd.DataFrame(result) | |
result_df = result_df.query("open_signal > -1 | close_signal > -1") | |
return result_df | |
except Exception as e: | |
return {"message": f"Caught error {e}"} | |