| |
| import gradio as gr |
| import asyncio |
| import threading |
| import json |
| import sys |
| import os |
| import importlib |
| import random |
| from datetime import datetime |
|
|
| |
| theme = gr.themes.Soft() |
| css = """ |
| :root { --radius: 14px; } |
| #topbar { |
| display:flex; align-items:center; justify-content:space-between; |
| padding:12px 16px; |
| background:linear-gradient(90deg,#f8fafc,#eef2ff,#e0f2fe); |
| color:#0f172a; |
| border:1px solid #e5e7eb; |
| border-radius: var(--radius); |
| box-shadow: 0 4px 14px rgba(15,23,42,.06); |
| margin-bottom:12px; |
| width: 100%; |
| box-sizing: border-box; |
| } |
| #topbar h1 { font-size:18px; margin:0; font-weight:800; letter-spacing:.5px } |
| #topbar .right { display:flex; gap:8px; align-items:center } |
| .badge { padding:4px 10px; border-radius:999px; background:#e0f2fe; color:#0369a1; font-weight:700; border:1px solid #bae6fd } |
| .footer-note { opacity:.7; font-size:12px; } |
| """ |
|
|
| |
| def _修复导入路径与别名(): |
| root = os.path.dirname(os.path.abspath(__file__)) |
| if root not in sys.path: |
| sys.path.insert(0, root) |
| for d in ['插件_引擎', '插件_交易', '插件_行情', '插件_资产']: |
| p = os.path.join(root, d) |
| if os.path.isdir(p) and p not in sys.path: |
| sys.path.insert(0, p) |
| try: |
| mod = importlib.import_module('插件_交易.策略基类') |
| if '策略基类' not in sys.modules: |
| sys.modules['策略基类'] = mod |
| except Exception: |
| pass |
|
|
| _修复导入路径与别名() |
|
|
| |
| class _LiteExchange: |
| def __init__(self): |
| self.净值 = 1.0 |
| self.峰值 = 1.0 |
| self.回撤 = 0.0 |
| def tick(self): |
| drift = 0.0002 |
| noise = random.gauss(0, 0.002) |
| self.净值 = max(0.5, self.净值 * (1 + drift + noise)) |
| self.峰值 = max(self.峰值, self.净值) |
| if self.峰值 > 0: |
| self.回撤 = (self.峰值 - self.净值) / self.峰值 |
|
|
| class 轻量自适应策略: |
| def __init__(self, 名称: str, 参数: dict): |
| self.名称 = 名称 |
| self.参数 = 参数 |
|
|
| class 轻量引擎: |
| def __init__(self, 记录函数): |
| self._停 = False |
| self.交易所 = _LiteExchange() |
| self._log = 记录函数 |
| self._策略 = {} |
| def 注册策略(self, 策略ID: str, 策略对象, 合约: str, 周期分钟: int): |
| self._策略[策略ID] = {"对象": 策略对象, "合约": 合约, "周期": 周期分钟} |
| self._log(f"[轻量引擎] 已注册策略: {策略ID} -> {合约} / {周期分钟}m") |
| async def 启动(self, 合约列表): |
| self._log("[轻量引擎] 启动(演示模式,无真实行情订阅)") |
| try: |
| i = 0 |
| while not self._停: |
| self.交易所.tick() |
| i += 1 |
| if i % 5 == 0: |
| self._log(f"[轻量引擎] 心跳 | 净值={self.交易所.净值:.4f} 回撤={self.交易所.回撤:.2%}") |
| await asyncio.sleep(2) |
| finally: |
| self._log("[轻量引擎] 已停止") |
| def 停止(self): |
| self._停 = True |
|
|
| |
| 引擎实例 = None |
| 运行线程 = None |
| 状态锁 = threading.Lock() |
| 运行状态 = { |
| "运行中": False, |
| "合约": "", |
| "周期": 0, |
| "净值": 1.0, |
| "回撤": 0.0, |
| "日志": [] |
| } |
|
|
| 默认参数 = { |
| "ver": "v1", |
| "adx_thr": 25, |
| "atr_buf_tr": 0.1, |
| "rsi_low": 30, |
| "rsi_high": 70, |
| "mr_stop_atr": 2.0, |
| "trend_logic": "adx", |
| "alloc_mode": "fixed" |
| } |
|
|
| def 从参数推断周期及选中(JSON文本: str): |
| try: |
| d = json.loads(JSON文本) |
| except Exception: |
| d = {} |
| tf = None |
| for k in ('tf', 'period', '周期', 'bar', 'bar_min', 'timeframe', 'k', 'k_min'): |
| v = d.get(k) |
| if isinstance(v, (int, float)) and int(v) > 0: |
| tf = int(v); break |
| if isinstance(v, str) and v.strip().isdigit(): |
| tf = int(v.strip()); break |
| if not tf: |
| tf = 60 |
| base = [30, 60, 120, 240] |
| if tf not in base: |
| base.append(tf) |
| base = sorted(set(base)) |
| return [str(x) for x in base], str(tf) |
|
|
| def 参数变更更新周期(参数JSON文本: str): |
| choices, value = 从参数推断周期及选中(参数JSON文本) |
| import gradio as gr |
| return gr.update(choices=choices, value=value) |
|
|
| def _记录(msg: str): |
| with 状态锁: |
| 运行状态["日志"].append(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") |
| if len(运行状态["日志"]) > 500: |
| 运行状态["日志"] = 运行状态["日志"][-300:] |
|
|
| |
| async def _用真实引擎运行(合约: str, 周期: int, 参数: dict): |
| global 引擎实例 |
| _修复导入路径与别名() |
| from 插件_引擎.多策略引擎 import 多策略引擎 |
| from 插件_交易.策略_自适应 import 自适应策略 |
| 引擎实例 = 多策略引擎() |
| 策略 = 自适应策略(f"{合约}_{周期}分", 参数) |
| 引擎实例.注册策略(f"{合约}_{周期}", 策略, 合约, int(周期)) |
| _记录(f"引擎启动:{合约} / {周期} 分(真实引擎)") |
| await 引擎实例.启动([合约]) |
|
|
| async def _用轻量引擎运行(合约: str, 周期: int, 参数: dict): |
| global 引擎实例 |
| 引擎实例 = 轻量引擎(_记录) |
| 策略 = 轻量自适应策略(f"{合约}_{周期}分", 参数) |
| 引擎实例.注册策略(f"{合约}_{周期}", 策略, 合约, int(周期)) |
| _记录(f"引擎启动:{合约} / {周期} 分(轻量引擎:演示模式)") |
| await 引擎实例.启动([合约]) |
|
|
| def _后台任务(合约: str, 周期: int, 参数: dict): |
| global 引擎实例 |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
| async def 主循环(): |
| try: |
| await _用真实引擎运行(合约, 周期, 参数) |
| except Exception as e: |
| _记录(f"真实引擎启动失败,切换至内置轻量引擎。原因:{e}") |
| try: |
| await _用轻量引擎运行(合约, 周期, 参数) |
| except Exception as e2: |
| _记录(f"轻量引擎也启动失败:{e2}") |
| finally: |
| with 状态锁: |
| 运行状态["运行中"] = False |
| _记录("引擎结束") |
| loop.run_until_complete(主循环()) |
| finally: |
| try: |
| loop.run_until_complete(asyncio.sleep(0)) |
| except Exception: |
| pass |
| loop.close() |
|
|
| |
| def 启动模拟(合约: str, 周期: str, 参数json: str): |
| global 运行线程 |
| with 状态锁: |
| if 运行状态["运行中"]: |
| return f"已有任务运行中:{运行状态['合约']} / {运行状态['周期']}分", "\n".join(运行状态["日志"][-100:]) |
| if 参数json.strip(): |
| try: |
| 参数 = json.loads(参数json) |
| except Exception as e: |
| return f"参数JSON格式错误:{e}", "\n".join(运行状态["日志"][-100:]) |
| else: |
| 参数 = 默认参数 |
| try: |
| p = int(周期) |
| except: |
| return "周期必须是数字(30/60/120/240)", "\n".join(运行状态["日志"][-100:]) |
| with 状态锁: |
| 运行状态["运行中"] = True |
| 运行状态["合约"] = 合约 |
| 运行状态["周期"] = p |
| 运行状态["净值"] = 1.0 |
| 运行状态["回撤"] = 0.0 |
| 运行状态["日志"].clear() |
| _记录(f"准备启动:合约={合约} 周期={p} 参数={参数}") |
| 运行线程 = threading.Thread(target=_后台任务, args=(合约, p, 参数), daemon=True) |
| 运行线程.start() |
| return "模拟启动成功", "\n".join(运行状态["日志"][-100:]) |
|
|
| def 停止模拟(): |
| global 引擎实例 |
| if 引擎实例 is None: |
| _记录("当前没有运行中的任务") |
| with 状态锁: |
| 运行状态["运行中"] = False |
| return "当前没有运行中的任务" |
| try: |
| 引擎实例.停止() |
| _记录("已请求停止(引擎将不再处理新信号/演示将退出)") |
| return "已发送停止信号" |
| except Exception as e: |
| _记录(f"停止失败:{e}") |
| return f"停止失败:{e}" |
|
|
| def 获取状态(): |
| with 状态锁: |
| try: |
| if 运行状态["运行中"] and 引擎实例 is not None and hasattr(引擎实例, '交易所') and 引擎实例.交易所 is not None: |
| nv = float(getattr(引擎实例.交易所, "净值", 1.0) or 1.0) |
| dd = float(getattr(引擎实例.交易所, "回撤", 0.0) or 0.0) |
| 运行状态["净值"] = nv |
| 运行状态["回撤"] = dd |
| except Exception as e: |
| _记录(f"读取状态失败:{e}") |
| 状态文本 = f"运行中: {运行状态['运行中']} | 合约: {运行状态['合约']} | 周期: {运行状态['周期']}m | 净值: {运行状态['净值']:.4f} | 回撤: {运行状态['回撤']:.2%}" |
| 日志文本 = "\n".join(运行状态["日志"][-120:]) |
| return 状态文本, 日志文本 |
|
|
| |
| try: |
| import 配置 as 配置模块 |
| except Exception: |
| 配置模块 = None |
|
|
| |
| with gr.Blocks(title="模拟实盘(OKX 公共WS + 多策略引擎)", theme=theme, css=css) as demo: |
| |
| gr.HTML( |
| '<div id="topbar"><h1>📈 模拟实盘控制台</h1><div class="right"><span class="badge">中文界面</span></div></div>', |
| elem_id="topbar_wrap", |
| elem_classes=["prose", "gr-prose"] |
| ) |
| gr.Markdown("提示:点击“⚙️ 设置”进入配置页(上传CSV、参数加载/保存)。若后端模块缺失将自动进入演示引擎。") |
|
|
| with gr.Row(): |
| with gr.Column(scale=7): |
| with gr.Group(visible=True) as 运行面板: |
| with gr.Row(): |
| with gr.Column(scale=1): |
| 合约输入 = gr.Textbox(label="合约ID", value="UXLINK-USDT-SWAP", placeholder="例:BTC-USDT-SWAP / ETH-USDT-SWAP") |
| 周期选择 = gr.Radio(label="周期(分钟)", choices=["30", "60", "120", "240"], value="60") |
| 参数输入 = gr.Textbox(label="策略参数(JSON)", value=json.dumps(默认参数, ensure_ascii=False, indent=2), lines=12) |
| with gr.Row(): |
| 启动按钮 = gr.Button("🚀 启动模拟", variant="primary") |
| 停止按钮 = gr.Button("⏹️ 停止模拟", variant="stop") |
| 刷新按钮 = gr.Button("🔄 刷新状态") |
| 设置按钮 = gr.Button("⚙️ 设置", variant="secondary") |
| with gr.Column(scale=1): |
| 状态显示 = gr.Textbox(label="运行状态", lines=2, interactive=False) |
| 日志显示 = gr.Textbox(label="实时日志(最近120行)", lines=22, interactive=False) |
|
|
| with gr.Group(visible=False) as 设置面板: |
| 返回按钮 = gr.Button("← 返回运行页", variant="secondary") |
| gr.Markdown("## 设置") |
| if 配置模块 is not None: |
| 构件 = 配置模块.构建设置面板(容器=设置面板, 默认参数=默认参数, 记录函数=_记录) |
| if isinstance(构件, dict): |
| |
| if "参数编辑框" in 构件 and ("复制按钮" in 构件 or "复制参数按钮" in 构件): |
| 回填参数按钮 = 构件.get("复制按钮") or 构件.get("复制参数按钮") |
| 回填参数按钮.click(lambda s: s, inputs=构件["参数编辑框"], outputs=参数输入) |
| |
| if "回填合约按钮" in 构件 and "合约ID框" in 构件: |
| 构件["回填合约按钮"].click(lambda s: s.strip(), inputs=构件["合约ID框"], outputs=合约输入) |
| else: |
| gr.Markdown("未找到 配置.py,无法加载设置面板。") |
|
|
| with gr.Column(scale=5): |
| gr.Markdown("### 使用说明") |
| gr.Markdown("- 在设置页上传CSV并完成列映射与合约登记\n- 将参数与合约一键回填到运行页\n- 启动后可在右侧查看状态和日志\n- 空闲时 Space 会休眠,策略不会持续运行(演示环境)") |
| gr.Markdown("<div class='footer-note'>提示:若需长期运行,请在自有服务器部署引擎,Space 仅作前端/演示。</div>") |
|
|
| |
| 设置按钮.click(lambda: (gr.update(visible=False), gr.update(visible=True)), outputs=[运行面板, 设置面板]) |
| 返回按钮.click(lambda: (gr.update(visible=True), gr.update(visible=False)), outputs=[运行面板, 设置面板]) |
|
|
| |
| 启动按钮.click(fn=启动模拟, inputs=[合约输入, 周期选择, 参数输入], outputs=[状态显示, 日志显示]) |
| 停止按钮.click(fn=停止模拟, outputs=状态显示) |
| 刷新按钮.click(fn=获取状态, outputs=[状态显示, 日志显示]) |
|
|
| |
| 参数输入.change(fn=参数变更更新周期, inputs=参数输入, outputs=周期选择) |
|
|
| |
| demo.load(fn=获取状态, outputs=[状态显示, 日志显示]) |
| |
| demo.load(fn=lambda s: 参数变更更新周期(s), inputs=参数输入, outputs=周期选择) |
|
|
| timer = gr.Timer(2) |
| timer.tick(fn=获取状态, outputs=[状态显示, 日志显示]) |
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860) |