import threading import gym import numpy as np import requests import pandas as pd from datetime import datetime, timedelta from stable_baselines3 import PPO from stable_baselines3.common.vec_env import DummyVecEnv from gym import spaces import time import firebase_admin from firebase_admin import credentials, db import os import gradio as gr #cred = credentials.Certificate("credentials.json") firebase_admin.initialize_app(cred, {"databaseURL": "https://socail-swap-default-rtdb.asia-southeast1.firebasedatabase.app/"}) ref = db.reference() buy_signals = [] sell_signals = [] hold_signals = [] class TradingEnv(gym.Env): def __init__(self, data, window_size=50): super(TradingEnv, self).__init__() super(TradingEnv, self).__init__() self.data = data self.window_size = window_size self.current_step = window_size self.action_space = spaces.Discrete(3) self.observation_space = spaces.Box( low=0, high=1, shape=(window_size, 2), dtype=np.float32) def reset(self): self.current_step = self.window_size return self._get_observation() def _get_observation(self): window_data = self.data[self.current_step-self.window_size:self.current_step] obs = window_data[['close', 'EMA']].values obs = (obs - obs.min()) / (obs.max() - obs.min()) return obs def step(self, action): reward = 0 done = False self.current_step += 1 if self.current_step >= len(self.data): done = True else: if action == 1: reward = self.data['close'].iloc[self.current_step] - self.data['close'].iloc[self.current_step - 1] elif action == 2: reward = self.data['close'].iloc[self.current_step - 1] - self.data['close'].iloc[self.current_step] return self._get_observation(), reward, done, {} def fetch_data(symbol='ETH', tsym='USD', start_date='2021-01-01', api_key='66bc686cb714fadda1fad0320704c98869d4b31ce7d9d27560c6c574b4d04c54'): start_date = datetime.strptime(start_date, '%Y-%m-%d') end_date = datetime.utcnow() to_ts = int(end_date.timestamp()) url = f'https://min-api.cryptocompare.com/data/v2/histohour?fsym={symbol}&tsym={tsym}&toTs={to_ts}&api_key={api_key}' response = requests.get(url) data = response.json() if data['Response'] == 'Success': data_points = data['Data']['Data'] df = pd.DataFrame(data_points) df['time'] = pd.to_datetime(df['time'], unit='s') df.set_index('time', inplace=True) # Filter data based on start_date df = df[df.index >= start_date] return df[['close']] else: print(f"Error fetching data: {data['Message']}") return None def calculate_ema(data, span=20): data['EMA'] = data['close'].ewm(span=span, adjust=False).mean() return data def run_model(): while True: try: new_data = fetch_data(start_date=(datetime.utcnow() - timedelta(days=3)).strftime('%Y-%m-%d')) new_data = calculate_ema(new_data) if len(new_data) < 50: print("Not enough data to update the environment.") time.sleep(3600) continue env = DummyVecEnv([lambda: TradingEnv(new_data)]) model.set_env(env) obs = env.reset() dates = new_data.index[50:] prices = new_data['close'][50:] emas = new_data['EMA'][50:] actions = [] for date, price, ema in zip(dates, prices, emas): action, _ = model.predict(obs) actions.append(action[0]) obs, _, done, _ = env.step(action) if done: break new_buy_signals = [(date, price, ema) for date, price, ema, action in zip(dates, prices, emas, actions) if action == 1 and date not in [signal[0] for signal in buy_signals]] new_sell_signals = [(date, price, ema) for date, price, ema, action in zip(dates, prices, emas, actions) if action == 2 and date not in [signal[0] for signal in sell_signals]] new_hold_signals = [(date, price, ema) for date, price, ema, action in zip(dates, prices, emas, actions) if action == 0 and date not in [signal[0] for signal in hold_signals]] for signal in new_buy_signals: if signal[0] not in [s[0] for s in buy_signals] and signal[0] not in [s[0] for s in sell_signals] and signal[0] not in [s[0] for s in hold_signals]: buy_signals.append(signal) for signal in new_sell_signals: if signal[0] not in [s[0] for s in sell_signals] and signal[0] not in [s[0] for s in buy_signals] and signal[0] not in [s[0] for s in hold_signals]: sell_signals.append(signal) for signal in new_hold_signals: if signal[0] not in [s[0] for s in hold_signals] and signal[0] not in [s[0] for s in buy_signals] and signal[0] not in [s[0] for s in sell_signals]: hold_signals.append(signal) buy_signals_data = [{'timestamp': signal[0].strftime('%Y-%m-%d %H:%M:%S'), 'type': 'b', 'price': round(signal[1], 2), 'ema': round(signal[2], 2)} for signal in buy_signals] sell_signals_data = [{'timestamp': signal[0].strftime('%Y-%m-%d %H:%M:%S'), 'type': 's', 'price': round(signal[1], 2), 'ema': round(signal[2], 2)} for signal in sell_signals] hold_signals_data = [{'timestamp': signal[0].strftime('%Y-%m-%d %H:%M:%S'), 'type': 'h', 'price': round(signal[1], 2), 'ema': round(signal[2], 2)} for signal in hold_signals] all_signals_data = buy_signals_data + sell_signals_data + hold_signals_data all_signals_data.sort(key=lambda x: x['timestamp'], reverse=True) ref.child('signals').child('data').set(all_signals_data) time.sleep(3590) except Exception as e: print(f"An error occurred: {e}") break def start_model_in_background(): thread = threading.Thread(target=run_model) thread.daemon = True thread.start() def dummy_interface(): return "Model is running in the background." if __name__ == "__main__": data = fetch_data() data = calculate_ema(data) if len(data) < 50: raise ValueError("Not enough data to fill the window size.") env = DummyVecEnv([lambda: TradingEnv(data)]) model = PPO.load("ppo_trading_agent", env=env) start_model_in_background() gr.Interface(fn=dummy_interface, inputs=[], outputs="text").launch()