Spaces:
Runtime error
Runtime error
import threading | |
import gym | |
import numpy as np | |
import requests | |
import pandas as pd | |
from datetime import datetime, timedelta | |
from stable_baselines3 import PPO | |
from stable_baselines3.common.vec_env import DummyVecEnv | |
from gym import spaces | |
import time | |
import firebase_admin | |
from firebase_admin import credentials, db | |
import os | |
import gradio as gr | |
#cred = credentials.Certificate("credentials.json") | |
firebase_admin.initialize_app(cred, {"databaseURL": "https://socail-swap-default-rtdb.asia-southeast1.firebasedatabase.app/"}) | |
ref = db.reference() | |
buy_signals = [] | |
sell_signals = [] | |
hold_signals = [] | |
class TradingEnv(gym.Env): | |
def __init__(self, data, window_size=50): | |
super(TradingEnv, self).__init__() | |
super(TradingEnv, self).__init__() | |
self.data = data | |
self.window_size = window_size | |
self.current_step = window_size | |
self.action_space = spaces.Discrete(3) | |
self.observation_space = spaces.Box( | |
low=0, high=1, shape=(window_size, 2), dtype=np.float32) | |
def reset(self): | |
self.current_step = self.window_size | |
return self._get_observation() | |
def _get_observation(self): | |
window_data = self.data[self.current_step-self.window_size:self.current_step] | |
obs = window_data[['close', 'EMA']].values | |
obs = (obs - obs.min()) / (obs.max() - obs.min()) | |
return obs | |
def step(self, action): | |
reward = 0 | |
done = False | |
self.current_step += 1 | |
if self.current_step >= len(self.data): | |
done = True | |
else: | |
if action == 1: | |
reward = self.data['close'].iloc[self.current_step] - self.data['close'].iloc[self.current_step - 1] | |
elif action == 2: | |
reward = self.data['close'].iloc[self.current_step - 1] - self.data['close'].iloc[self.current_step] | |
return self._get_observation(), reward, done, {} | |
def fetch_data(symbol='ETH', tsym='USD', start_date='2021-01-01', api_key='66bc686cb714fadda1fad0320704c98869d4b31ce7d9d27560c6c574b4d04c54'): | |
start_date = datetime.strptime(start_date, '%Y-%m-%d') | |
end_date = datetime.utcnow() | |
to_ts = int(end_date.timestamp()) | |
url = f'https://min-api.cryptocompare.com/data/v2/histohour?fsym={symbol}&tsym={tsym}&toTs={to_ts}&api_key={api_key}' | |
response = requests.get(url) | |
data = response.json() | |
if data['Response'] == 'Success': | |
data_points = data['Data']['Data'] | |
df = pd.DataFrame(data_points) | |
df['time'] = pd.to_datetime(df['time'], unit='s') | |
df.set_index('time', inplace=True) | |
# Filter data based on start_date | |
df = df[df.index >= start_date] | |
return df[['close']] | |
else: | |
print(f"Error fetching data: {data['Message']}") | |
return None | |
def calculate_ema(data, span=20): | |
data['EMA'] = data['close'].ewm(span=span, adjust=False).mean() | |
return data | |
def run_model(): | |
while True: | |
try: | |
new_data = fetch_data(start_date=(datetime.utcnow() - timedelta(days=3)).strftime('%Y-%m-%d')) | |
new_data = calculate_ema(new_data) | |
if len(new_data) < 50: | |
print("Not enough data to update the environment.") | |
time.sleep(3600) | |
continue | |
env = DummyVecEnv([lambda: TradingEnv(new_data)]) | |
model.set_env(env) | |
obs = env.reset() | |
dates = new_data.index[50:] | |
prices = new_data['close'][50:] | |
emas = new_data['EMA'][50:] | |
actions = [] | |
for date, price, ema in zip(dates, prices, emas): | |
action, _ = model.predict(obs) | |
actions.append(action[0]) | |
obs, _, done, _ = env.step(action) | |
if done: | |
break | |
new_buy_signals = [(date, price, ema) for date, price, ema, action in zip(dates, prices, emas, actions) if action == 1 and date not in [signal[0] for signal in buy_signals]] | |
new_sell_signals = [(date, price, ema) for date, price, ema, action in zip(dates, prices, emas, actions) if action == 2 and date not in [signal[0] for signal in sell_signals]] | |
new_hold_signals = [(date, price, ema) for date, price, ema, action in zip(dates, prices, emas, actions) if action == 0 and date not in [signal[0] for signal in hold_signals]] | |
for signal in new_buy_signals: | |
if signal[0] not in [s[0] for s in buy_signals] and signal[0] not in [s[0] for s in sell_signals] and signal[0] not in [s[0] for s in hold_signals]: | |
buy_signals.append(signal) | |
for signal in new_sell_signals: | |
if signal[0] not in [s[0] for s in sell_signals] and signal[0] not in [s[0] for s in buy_signals] and signal[0] not in [s[0] for s in hold_signals]: | |
sell_signals.append(signal) | |
for signal in new_hold_signals: | |
if signal[0] not in [s[0] for s in hold_signals] and signal[0] not in [s[0] for s in buy_signals] and signal[0] not in [s[0] for s in sell_signals]: | |
hold_signals.append(signal) | |
buy_signals_data = [{'timestamp': signal[0].strftime('%Y-%m-%d %H:%M:%S'), 'type': 'b', 'price': round(signal[1], 2), 'ema': round(signal[2], 2)} for signal in buy_signals] | |
sell_signals_data = [{'timestamp': signal[0].strftime('%Y-%m-%d %H:%M:%S'), 'type': 's', 'price': round(signal[1], 2), 'ema': round(signal[2], 2)} for signal in sell_signals] | |
hold_signals_data = [{'timestamp': signal[0].strftime('%Y-%m-%d %H:%M:%S'), 'type': 'h', 'price': round(signal[1], 2), 'ema': round(signal[2], 2)} for signal in hold_signals] | |
all_signals_data = buy_signals_data + sell_signals_data + hold_signals_data | |
all_signals_data.sort(key=lambda x: x['timestamp'], reverse=True) | |
ref.child('signals').child('data').set(all_signals_data) | |
time.sleep(3590) | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
break | |
def start_model_in_background(): | |
thread = threading.Thread(target=run_model) | |
thread.daemon = True | |
thread.start() | |
def dummy_interface(): | |
return "Model is running in the background." | |
if __name__ == "__main__": | |
data = fetch_data() | |
data = calculate_ema(data) | |
if len(data) < 50: | |
raise ValueError("Not enough data to fill the window size.") | |
env = DummyVecEnv([lambda: TradingEnv(data)]) | |
model = PPO.load("ppo_trading_agent", env=env) | |
start_model_in_background() | |
gr.Interface(fn=dummy_interface, inputs=[], outputs="text").launch() |