algorembrant's picture
Upload 31 files
d8bad25 verified
"""
FastAPI Server for Gemini 3 Flash AI Trading Platform.
Connects the MCP MT5 bridge, Gemini Agent, and Next.js frontend via WebSockets.
"""
import os
import asyncio
import json
from contextlib import asynccontextmanager
from typing import List, Optional
from datetime import datetime
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
from models import (
CandleData, TickData, PositionInfo, AccountInfo, AgentDecision,
TradeRequest, WSMessage
)
from mt5_mcp import MT5Bridge
from agent import TradingAgent
from ws_manager import ConnectionManager
load_dotenv()
# Initialize components
mt5_bridge = MT5Bridge()
agent = TradingAgent()
ws_manager = ConnectionManager()
# Global state
agent_running = False
background_task_handle = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifecycle events: connect details on startup, cleanup on shutdown."""
print("[Main] Starting up...")
# 1. Connect to MT5
result = mt5_bridge.initialize()
if not result["success"]:
print(f"[Main] SEVERE: {result['message']}")
# In strict mode, we should ideally stop here
if os.getenv("FORCE_MT5_DATA", "false").lower() == "true":
raise RuntimeError(f"STRICT MODE: Failed to connect to MT5. {result['message']}")
else:
print(f"[Main] {result['message']}")
# 2. Start background data loop (non-blocking)
asyncio.create_task(market_data_loop())
yield
print("[Main] Shutting down...")
mt5_bridge.shutdown()
app = FastAPI(title="Gemini 3 Flash Trader", lifespan=lifespan)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify frontend URL
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ── REST API ────────────────────────────────────────────────
@app.get("/api/health")
async def health_check():
"""Check system health and connection status."""
return {
"status": "online",
"mt5_connected": mt5_bridge.connected,
"agent_running": agent_running,
"mode": "simulation" if mt5_bridge.simulation_mode else "live"
}
@app.get("/api/account", response_model=AccountInfo)
async def get_account():
"""Get current account information."""
info = mt5_bridge.get_account_info()
if "error" in info:
raise HTTPException(status_code=500, detail=info["error"])
return info
@app.get("/api/positions", response_model=List[PositionInfo])
async def get_positions():
"""Get all open positions."""
return mt5_bridge.get_positions()
@app.get("/api/candles", response_model=List[CandleData])
async def get_candles(timeframe: str = "M5", count: int = 200):
"""Get historical candle data."""
return mt5_bridge.get_rates(timeframe=timeframe, count=count)
@app.post("/api/trade")
async def execute_trade(trade: TradeRequest):
"""Execute a manual trade."""
if trade.action == "close":
if not trade.ticket:
raise HTTPException(status_code=400, detail="Ticket required for close")
result = mt5_bridge.close_position(trade.ticket)
else:
result = mt5_bridge.place_order(
action=trade.action,
symbol=trade.symbol,
volume=trade.volume,
sl=trade.sl or 0.0,
tp=trade.tp or 0.0
)
if not result.get("success"):
raise HTTPException(status_code=400, detail=result.get("message"))
# Broadcast trade event
await ws_manager.broadcast("trade_event", result)
return result
@app.post("/api/agent/toggle")
async def toggle_agent(enable: bool):
"""Start or stop the autonomous trading agent."""
global agent_running
agent_running = enable
status = "running" if agent_running else "stopped"
print(f"[Main] Agent {status}")
await ws_manager.broadcast("agent_status", {"status": status})
return {"status": status}
# ── WebSocket ───────────────────────────────────────────────
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await ws_manager.connect(websocket)
try:
# Send initial state
await ws_manager.send_personal(websocket, "status", {
"mt5": mt5_bridge.connected,
"agent": agent_running
})
while True:
# Keep connection alive + handle incoming messages (e.g. ping)
data = await websocket.receive_text()
# Parse if needed, currently we just listen
except WebSocketDisconnect:
ws_manager.disconnect(websocket)
# ── Background Loops ────────────────────────────────────────
async def market_data_loop():
"""
Main loop:
1. Stream prices (candles/ticks) to frontend
2. Invoke Agent if enabled
"""
print("[Main] Market data loop started")
last_candle_time = 0
while True:
try:
# 1. Fetch & Broadcast Market Data
current_tick = mt5_bridge.get_tick()
candles = mt5_bridge.get_rates(count=2) # Just need latest for updates
if candles:
latest_candle = candles[-1]
await ws_manager.broadcast("candle_update", latest_candle)
if current_tick:
await ws_manager.broadcast("tick_update", current_tick)
# 2. Agent Logic (if running)
if agent_running:
# Run agent roughly every new candle or every N seconds
# For this demo, we'll run it every 10s to see activity
if int(datetime.now().timestamp()) % 10 == 0:
await run_agent_cycle(current_tick)
# 3. Broadcast Account/Positions periodically
if int(datetime.now().timestamp()) % 2 == 0:
positions = mt5_bridge.get_positions()
account = mt5_bridge.get_account_info()
await ws_manager.broadcast("positions", positions)
await ws_manager.broadcast("account", account)
await asyncio.sleep(1) # 1Hz update rate
except Exception as e:
print(f"[Loop Error] {e}")
await asyncio.sleep(5)
async def run_agent_cycle(tick: dict):
"""One cycle of the AI agent: Analyze -> Reason -> Act."""
print("[Agent] Analyzing market...")
# Gather context
candles = mt5_bridge.get_rates(count=50) # Give agent more history
positions = mt5_bridge.get_positions()
account = mt5_bridge.get_account_info()
# 1. Ask Gemini
decision = await agent.analyze(candles, tick, account, positions)
# 2. Broadcast Reasoning
await ws_manager.broadcast("reasoning", {
"action": decision["action"],
"reasoning": decision["reasoning"],
"confidence": decision["confidence"],
"timestamp": datetime.now().isoformat()
})
# 3. Execute Decision
if decision["action"] in ["BUY", "SELL"]:
# Check confidence threshold
if decision["confidence"] >= 0.7:
print(f"[Agent] Executing {decision['action']} ({decision['confidence']})")
result = mt5_bridge.place_order(
action=decision["action"],
volume=decision.get("volume", 0.01),
sl=decision.get("sl", 0.0),
tp=decision.get("tp", 0.0)
)
await ws_manager.broadcast("trade_event", result)
elif decision["action"] == "CLOSE":
# Close all relevant positions
for p in positions:
res = mt5_bridge.close_position(p["ticket"])
await ws_manager.broadcast("trade_event", res)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)