RayMelius Claude Sonnet 4.6 commited on
Commit
1f55553
·
1 Parent(s): 6ddeb15

Add Start/End of Day controls and price history chart

Browse files

- shared/config.py: add CONTROL_TOPIC for start/stop signals
- md_feeder: listen on control topic; pause/resume simulation on stop/start signals, reload securities on start
- dashboard: add SQLite OHLCV history DB (1-min buckets), record every Kafka trade
- dashboard: POST /session/start (reset prices → start), POST /session/end (save closing prices → stop)
- dashboard: GET /history/<symbol>?period=1h|8h|1d|1w|1m returns aggregated OHLCV candles (≤150 bars)
- dashboard UI: Start/End of Day buttons + session status badge in header
- dashboard UI: full-width Price History panel with candlestick + volume chart (canvas)
- docker-compose: mount shared_data volume into dashboard so it can read/write securities.txt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

dashboard/dashboard.py CHANGED
@@ -2,7 +2,7 @@ import sys
2
  sys.path.insert(0, "/app")
3
 
4
  from flask import Flask, render_template, jsonify, Response, request
5
- import threading, json, os, time, requests
6
  from queue import Queue, Empty
7
 
8
  from shared.config import Config
@@ -19,48 +19,136 @@ lock = threading.Lock()
19
  sse_clients = []
20
  sse_clients_lock = threading.Lock()
21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
  def broadcast_event(event_type, data):
23
- """Send an event to all connected SSE clients."""
24
  message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
25
  with sse_clients_lock:
26
- dead_clients = []
27
  for q in sse_clients:
28
  try:
29
  q.put_nowait(message)
30
- except:
31
- dead_clients.append(q)
32
- for q in dead_clients:
33
  sse_clients.remove(q)
34
 
35
- # Kafka consumer thread
 
 
36
  def consume_kafka():
37
  consumer = create_consumer(
38
  topics=[Config.ORDERS_TOPIC, Config.SNAPSHOTS_TOPIC, Config.TRADES_TOPIC],
39
  group_id="dashboard",
40
- component_name="Dashboard"
41
  )
42
  for msg in consumer:
43
  with lock:
44
  if msg.topic == "orders":
45
  order = msg.value
46
  orders.insert(0, order)
47
- orders[:] = orders[:50] # keep last 50
48
  broadcast_event("order", order)
49
 
50
  elif msg.topic == "snapshots":
51
  snap = msg.value
52
- # Handle both simple snapshots and MDF snapshots
53
  symbol = snap.get("symbol")
54
  if not symbol:
55
  continue
56
-
57
  bbo_data = {
58
  "best_bid": snap.get("best_bid"),
59
  "best_ask": snap.get("best_ask"),
60
  "bid_size": snap.get("bid_size"),
61
  "ask_size": snap.get("ask_size"),
62
  "timestamp": snap.get("timestamp", time.time()),
63
- "source": snap.get("source", "unknown")
64
  }
65
  bbos[symbol] = bbo_data
66
  broadcast_event("snapshot", {"symbol": symbol, **bbo_data})
@@ -68,18 +156,30 @@ def consume_kafka():
68
  elif msg.topic == "trades":
69
  trade = msg.value
70
  trades_cache.insert(0, trade)
71
- trades_cache[:] = trades_cache[:200] # keep last 200
72
  broadcast_event("trade", trade)
 
 
 
 
 
 
 
73
 
 
 
74
  threading.Thread(target=consume_kafka, daemon=True).start()
75
 
 
 
 
76
  @app.route("/")
77
  def index():
78
  return render_template("index.html")
79
 
 
80
  @app.route("/health")
81
  def health():
82
- """Health check endpoint."""
83
  status = {
84
  "status": "healthy",
85
  "service": "dashboard",
@@ -88,147 +188,228 @@ def health():
88
  "orders_cached": len(orders),
89
  "trades_cached": len(trades_cache),
90
  "symbols_tracked": len(bbos),
91
- "sse_clients": len(sse_clients)
92
- }
93
  }
94
- # Check matcher connectivity
95
  try:
96
  r = requests.get(f"{Config.MATCHER_URL}/health", timeout=2)
97
- if r.status_code == 200:
98
- status["matcher"] = "connected"
99
- else:
100
- status["matcher"] = f"error: status {r.status_code}"
101
- status["status"] = "degraded"
102
  except Exception as e:
103
  status["matcher"] = f"error: {e}"
104
  status["status"] = "degraded"
105
-
106
  return jsonify(status)
107
 
 
108
  @app.route("/data")
109
  def data():
110
- """Fallback polling endpoint (still useful for initial load or SSE fallback)."""
111
- # Always fetch trades from matcher (more reliable)
112
  try:
113
  r = requests.get(f"{Config.MATCHER_URL}/trades", timeout=2)
114
  resp = r.json()
115
- trades = resp.get('trades', []) if isinstance(resp, dict) else resp
116
  except Exception as e:
117
  print("Cannot fetch trades:", e)
118
  with lock:
119
  trades = list(trades_cache)
120
 
121
- # Order book for dropdown (fetch latest for all symbols)
122
  try:
123
  r = requests.get(f"{Config.MATCHER_URL}/orderbook/ALPHA", timeout=2)
124
  book = r.json()
125
- except Exception as e:
126
- print("Cannot fetch orderbook:", e)
127
  book = {"bids": [], "asks": []}
128
 
129
  with lock:
130
- return jsonify({
131
- "orders": list(orders),
132
- "bbos": dict(bbos),
133
- "trades": trades,
134
- "book": book
135
- })
136
 
137
  @app.route("/orderbook/<symbol>")
138
  def orderbook(symbol):
139
- """Proxy to matcher orderbook API."""
140
  try:
141
  r = requests.get(f"{Config.MATCHER_URL}/orderbook/{symbol}", timeout=2)
142
  return (r.text, r.status_code, {"Content-Type": "application/json"})
143
  except Exception as e:
144
  return jsonify({"error": str(e), "bids": [], "asks": []}), 500
145
 
146
- # Kafka producer for order management (lazy init)
 
147
  _producer = None
 
 
148
  def get_producer():
149
  global _producer
150
  if _producer is None:
151
  _producer = create_producer(component_name="Dashboard")
152
  return _producer
153
 
 
154
  @app.route("/order/cancel", methods=["POST"])
155
  def cancel_order():
156
- """Send cancel request to matcher via Kafka."""
157
  try:
158
  data = request.get_json()
159
  orig_cl_ord_id = data.get("orig_cl_ord_id")
160
  symbol = data.get("symbol")
161
-
162
  if not orig_cl_ord_id:
163
  return jsonify({"status": "error", "error": "Missing orig_cl_ord_id"}), 400
164
-
165
  cancel_msg = {
166
  "type": "cancel",
167
  "orig_cl_ord_id": orig_cl_ord_id,
168
  "symbol": symbol,
169
- "timestamp": time.time()
170
  }
171
-
172
- producer = get_producer()
173
- producer.send(Config.ORDERS_TOPIC, cancel_msg)
174
- producer.flush()
175
-
176
  return jsonify({"status": "ok", "message": f"Cancel request sent for {orig_cl_ord_id}"})
177
  except Exception as e:
178
  return jsonify({"status": "error", "error": str(e)}), 500
179
 
 
180
  @app.route("/order/amend", methods=["POST"])
181
  def amend_order():
182
- """Send amend request to matcher via Kafka."""
183
  try:
184
  data = request.get_json()
185
  orig_cl_ord_id = data.get("orig_cl_ord_id")
186
- symbol = data.get("symbol")
187
- quantity = data.get("quantity")
188
- price = data.get("price")
189
-
190
  if not orig_cl_ord_id:
191
  return jsonify({"status": "error", "error": "Missing orig_cl_ord_id"}), 400
192
-
193
  amend_msg = {
194
  "type": "amend",
195
  "orig_cl_ord_id": orig_cl_ord_id,
196
  "cl_ord_id": f"amend-{int(time.time()*1000)}",
197
- "symbol": symbol,
198
- "quantity": quantity,
199
- "price": price,
200
- "timestamp": time.time()
201
  }
 
 
 
 
 
 
202
 
203
- producer = get_producer()
204
- producer.send(Config.ORDERS_TOPIC, amend_msg)
205
- producer.flush()
206
 
207
- return jsonify({"status": "ok", "message": f"Amend request sent for {orig_cl_ord_id}"})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
208
  except Exception as e:
209
  return jsonify({"status": "error", "error": str(e)}), 500
210
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
211
  @app.route("/stream")
212
  def stream():
213
- """SSE endpoint for real-time updates."""
214
  def event_stream():
215
  q = Queue(maxsize=100)
216
  with sse_clients_lock:
217
  sse_clients.append(q)
218
  try:
219
- # Send initial connection event
220
  yield f"event: connected\ndata: {json.dumps({'status': 'connected'})}\n\n"
221
-
222
- # Send current state on connect (including trades)
223
  with lock:
224
- yield f"event: init\ndata: {json.dumps({'orders': list(orders), 'bbos': dict(bbos), 'trades': list(trades_cache)})}\n\n"
225
-
 
 
 
 
226
  while True:
227
  try:
228
- message = q.get(timeout=30) # 30s timeout for keepalive
229
  yield message
230
  except Empty:
231
- # Send keepalive comment
232
  yield ": keepalive\n\n"
233
  finally:
234
  with sse_clients_lock:
@@ -240,10 +421,11 @@ def stream():
240
  mimetype="text/event-stream",
241
  headers={
242
  "Cache-Control": "no-cache",
243
- "X-Accel-Buffering": "no", # Disable nginx buffering
244
- "Connection": "keep-alive"
245
- }
246
  )
247
 
 
248
  if __name__ == "__main__":
249
  app.run(host="0.0.0.0", port=5000, debug=True, use_reloader=False)
 
2
  sys.path.insert(0, "/app")
3
 
4
  from flask import Flask, render_template, jsonify, Response, request
5
+ import threading, json, os, time, requests, sqlite3
6
  from queue import Queue, Empty
7
 
8
  from shared.config import Config
 
19
  sse_clients = []
20
  sse_clients_lock = threading.Lock()
21
 
22
+ # Session state
23
+ session_state = {"active": False, "start_time": None}
24
+
25
+ # ── OHLCV History ──────────────────────────────────────────────────────────────
26
+ HISTORY_DB = os.getenv("HISTORY_DB", "/app/data/dashboard_history.db")
27
+ BUCKET_SIZE = 60 # 1-minute candles
28
+
29
+ PERIOD_SECONDS = {
30
+ "1h": 3600,
31
+ "8h": 28800,
32
+ "1d": 86400,
33
+ "1w": 604800,
34
+ "1m": 2592000,
35
+ }
36
+
37
+
38
+ def init_history_db():
39
+ os.makedirs(os.path.dirname(HISTORY_DB), exist_ok=True)
40
+ conn = sqlite3.connect(HISTORY_DB)
41
+ conn.execute("""
42
+ CREATE TABLE IF NOT EXISTS ohlcv (
43
+ symbol TEXT,
44
+ bucket INTEGER,
45
+ open REAL,
46
+ high REAL,
47
+ low REAL,
48
+ close REAL,
49
+ volume INTEGER,
50
+ PRIMARY KEY (symbol, bucket)
51
+ )
52
+ """)
53
+ conn.commit()
54
+ conn.close()
55
+
56
+
57
+ def record_trade(symbol, price, qty, ts):
58
+ if not symbol or price <= 0:
59
+ return
60
+ bucket = int(ts // BUCKET_SIZE) * BUCKET_SIZE
61
+ try:
62
+ conn = sqlite3.connect(HISTORY_DB)
63
+ existing = conn.execute(
64
+ "SELECT open FROM ohlcv WHERE symbol=? AND bucket=?", (symbol, bucket)
65
+ ).fetchone()
66
+ if existing:
67
+ conn.execute(
68
+ "UPDATE ohlcv SET high=MAX(high,?), low=MIN(low,?), close=?, volume=volume+? "
69
+ "WHERE symbol=? AND bucket=?",
70
+ (price, price, price, qty, symbol, bucket),
71
+ )
72
+ else:
73
+ conn.execute(
74
+ "INSERT INTO ohlcv VALUES (?,?,?,?,?,?,?)",
75
+ (symbol, bucket, price, price, price, price, qty),
76
+ )
77
+ conn.commit()
78
+ conn.close()
79
+ except Exception as e:
80
+ print(f"[History] Error recording trade: {e}")
81
+
82
+
83
+ def load_securities_file():
84
+ securities = {}
85
+ try:
86
+ with open(Config.SECURITIES_FILE) as f:
87
+ for line in f:
88
+ line = line.strip()
89
+ if not line or line.startswith("#"):
90
+ continue
91
+ parts = line.split()
92
+ if len(parts) >= 3:
93
+ securities[parts[0]] = {
94
+ "start": float(parts[1]),
95
+ "current": float(parts[2]),
96
+ }
97
+ except Exception as e:
98
+ print(f"[Dashboard] Cannot read securities: {e}")
99
+ return securities
100
+
101
+
102
+ def save_securities_file(securities):
103
+ with open(Config.SECURITIES_FILE, "w") as f:
104
+ f.write("#SYMBOL\t<start_price>\t<current_price>\n")
105
+ for sym, vals in securities.items():
106
+ f.write(f"{sym}\t{vals['start']:.2f}\t{vals['current']:.2f}\n")
107
+
108
+
109
+ # ── SSE broadcast ──────────────────────────────────────────────────────────────
110
+
111
  def broadcast_event(event_type, data):
 
112
  message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
113
  with sse_clients_lock:
114
+ dead = []
115
  for q in sse_clients:
116
  try:
117
  q.put_nowait(message)
118
+ except Exception:
119
+ dead.append(q)
120
+ for q in dead:
121
  sse_clients.remove(q)
122
 
123
+
124
+ # ── Kafka consumer thread ──────────────────────────────────────────────────────
125
+
126
  def consume_kafka():
127
  consumer = create_consumer(
128
  topics=[Config.ORDERS_TOPIC, Config.SNAPSHOTS_TOPIC, Config.TRADES_TOPIC],
129
  group_id="dashboard",
130
+ component_name="Dashboard",
131
  )
132
  for msg in consumer:
133
  with lock:
134
  if msg.topic == "orders":
135
  order = msg.value
136
  orders.insert(0, order)
137
+ orders[:] = orders[:50]
138
  broadcast_event("order", order)
139
 
140
  elif msg.topic == "snapshots":
141
  snap = msg.value
 
142
  symbol = snap.get("symbol")
143
  if not symbol:
144
  continue
 
145
  bbo_data = {
146
  "best_bid": snap.get("best_bid"),
147
  "best_ask": snap.get("best_ask"),
148
  "bid_size": snap.get("bid_size"),
149
  "ask_size": snap.get("ask_size"),
150
  "timestamp": snap.get("timestamp", time.time()),
151
+ "source": snap.get("source", "unknown"),
152
  }
153
  bbos[symbol] = bbo_data
154
  broadcast_event("snapshot", {"symbol": symbol, **bbo_data})
 
156
  elif msg.topic == "trades":
157
  trade = msg.value
158
  trades_cache.insert(0, trade)
159
+ trades_cache[:] = trades_cache[:200]
160
  broadcast_event("trade", trade)
161
+ # Record for OHLCV history
162
+ sym = trade.get("symbol", "")
163
+ price = float(trade.get("price") or 0)
164
+ qty = int(trade.get("quantity") or trade.get("qty") or 0)
165
+ ts = float(trade.get("timestamp") or time.time())
166
+ record_trade(sym, price, qty, ts)
167
+
168
 
169
+ # Initialise DB then start consumer thread
170
+ init_history_db()
171
  threading.Thread(target=consume_kafka, daemon=True).start()
172
 
173
+
174
+ # ── Flask routes ───────────────────────────────────────────────────────────────
175
+
176
  @app.route("/")
177
  def index():
178
  return render_template("index.html")
179
 
180
+
181
  @app.route("/health")
182
  def health():
 
183
  status = {
184
  "status": "healthy",
185
  "service": "dashboard",
 
188
  "orders_cached": len(orders),
189
  "trades_cached": len(trades_cache),
190
  "symbols_tracked": len(bbos),
191
+ "sse_clients": len(sse_clients),
192
+ },
193
  }
 
194
  try:
195
  r = requests.get(f"{Config.MATCHER_URL}/health", timeout=2)
196
+ status["matcher"] = "connected" if r.status_code == 200 else f"error: {r.status_code}"
 
 
 
 
197
  except Exception as e:
198
  status["matcher"] = f"error: {e}"
199
  status["status"] = "degraded"
 
200
  return jsonify(status)
201
 
202
+
203
  @app.route("/data")
204
  def data():
 
 
205
  try:
206
  r = requests.get(f"{Config.MATCHER_URL}/trades", timeout=2)
207
  resp = r.json()
208
+ trades = resp.get("trades", []) if isinstance(resp, dict) else resp
209
  except Exception as e:
210
  print("Cannot fetch trades:", e)
211
  with lock:
212
  trades = list(trades_cache)
213
 
 
214
  try:
215
  r = requests.get(f"{Config.MATCHER_URL}/orderbook/ALPHA", timeout=2)
216
  book = r.json()
217
+ except Exception:
 
218
  book = {"bids": [], "asks": []}
219
 
220
  with lock:
221
+ return jsonify({"orders": list(orders), "bbos": dict(bbos), "trades": trades, "book": book})
222
+
 
 
 
 
223
 
224
  @app.route("/orderbook/<symbol>")
225
  def orderbook(symbol):
 
226
  try:
227
  r = requests.get(f"{Config.MATCHER_URL}/orderbook/{symbol}", timeout=2)
228
  return (r.text, r.status_code, {"Content-Type": "application/json"})
229
  except Exception as e:
230
  return jsonify({"error": str(e), "bids": [], "asks": []}), 500
231
 
232
+
233
+ # Kafka producer (lazy init)
234
  _producer = None
235
+
236
+
237
  def get_producer():
238
  global _producer
239
  if _producer is None:
240
  _producer = create_producer(component_name="Dashboard")
241
  return _producer
242
 
243
+
244
  @app.route("/order/cancel", methods=["POST"])
245
  def cancel_order():
 
246
  try:
247
  data = request.get_json()
248
  orig_cl_ord_id = data.get("orig_cl_ord_id")
249
  symbol = data.get("symbol")
 
250
  if not orig_cl_ord_id:
251
  return jsonify({"status": "error", "error": "Missing orig_cl_ord_id"}), 400
 
252
  cancel_msg = {
253
  "type": "cancel",
254
  "orig_cl_ord_id": orig_cl_ord_id,
255
  "symbol": symbol,
256
+ "timestamp": time.time(),
257
  }
258
+ p = get_producer()
259
+ p.send(Config.ORDERS_TOPIC, cancel_msg)
260
+ p.flush()
 
 
261
  return jsonify({"status": "ok", "message": f"Cancel request sent for {orig_cl_ord_id}"})
262
  except Exception as e:
263
  return jsonify({"status": "error", "error": str(e)}), 500
264
 
265
+
266
  @app.route("/order/amend", methods=["POST"])
267
  def amend_order():
 
268
  try:
269
  data = request.get_json()
270
  orig_cl_ord_id = data.get("orig_cl_ord_id")
 
 
 
 
271
  if not orig_cl_ord_id:
272
  return jsonify({"status": "error", "error": "Missing orig_cl_ord_id"}), 400
 
273
  amend_msg = {
274
  "type": "amend",
275
  "orig_cl_ord_id": orig_cl_ord_id,
276
  "cl_ord_id": f"amend-{int(time.time()*1000)}",
277
+ "symbol": data.get("symbol"),
278
+ "quantity": data.get("quantity"),
279
+ "price": data.get("price"),
280
+ "timestamp": time.time(),
281
  }
282
+ p = get_producer()
283
+ p.send(Config.ORDERS_TOPIC, amend_msg)
284
+ p.flush()
285
+ return jsonify({"status": "ok", "message": f"Amend request sent for {orig_cl_ord_id}"})
286
+ except Exception as e:
287
+ return jsonify({"status": "error", "error": str(e)}), 500
288
 
 
 
 
289
 
290
+ # ── Session endpoints ──────────────────────────────────────────────────────────
291
+
292
+ @app.route("/session/start", methods=["POST"])
293
+ def session_start():
294
+ try:
295
+ # Reset current prices to start prices
296
+ securities = load_securities_file()
297
+ if securities:
298
+ for sym in securities:
299
+ securities[sym]["current"] = securities[sym]["start"]
300
+ save_securities_file(securities)
301
+
302
+ # Signal md_feeder to start
303
+ p = get_producer()
304
+ p.send(Config.CONTROL_TOPIC, {"action": "start"})
305
+ p.flush()
306
+
307
+ session_state["active"] = True
308
+ session_state["start_time"] = time.time()
309
+ broadcast_event("session", {"status": "started", "time": session_state["start_time"]})
310
+ return jsonify({"status": "ok", "message": "Day started"})
311
+ except Exception as e:
312
+ return jsonify({"status": "error", "error": str(e)}), 500
313
+
314
+
315
+ @app.route("/session/end", methods=["POST"])
316
+ def session_end():
317
+ try:
318
+ securities = load_securities_file()
319
+ # Update current prices from BBO mid where available
320
+ for sym in list(securities.keys()):
321
+ try:
322
+ r = requests.get(f"{Config.MATCHER_URL}/orderbook/{sym}", timeout=2)
323
+ book = r.json()
324
+ bids = book.get("bids", [])
325
+ asks = book.get("asks", [])
326
+ if bids and asks:
327
+ best_bid = max(b["price"] for b in bids)
328
+ best_ask = min(a["price"] for a in asks)
329
+ securities[sym]["current"] = round((best_bid + best_ask) / 2, 2)
330
+ except Exception:
331
+ pass
332
+ if securities:
333
+ save_securities_file(securities)
334
+
335
+ # Signal md_feeder to stop
336
+ p = get_producer()
337
+ p.send(Config.CONTROL_TOPIC, {"action": "stop"})
338
+ p.flush()
339
+
340
+ session_state["active"] = False
341
+ broadcast_event("session", {"status": "ended", "time": time.time()})
342
+ return jsonify({"status": "ok", "message": "Day ended, closing prices saved"})
343
  except Exception as e:
344
  return jsonify({"status": "error", "error": str(e)}), 500
345
 
346
+
347
+ @app.route("/session/status")
348
+ def session_status():
349
+ return jsonify(session_state)
350
+
351
+
352
+ # ── History endpoint ───────────────────────────────────────────────────────────
353
+
354
+ @app.route("/history/<symbol>")
355
+ def history(symbol):
356
+ period = request.args.get("period", "1d")
357
+ seconds = PERIOD_SECONDS.get(period, 86400)
358
+ since = int(time.time()) - seconds
359
+ try:
360
+ conn = sqlite3.connect(HISTORY_DB)
361
+ rows = conn.execute(
362
+ "SELECT bucket, open, high, low, close, volume FROM ohlcv "
363
+ "WHERE symbol=? AND bucket>=? ORDER BY bucket ASC",
364
+ (symbol, since),
365
+ ).fetchall()
366
+ conn.close()
367
+ except Exception as e:
368
+ return jsonify({"error": str(e), "candles": []}), 500
369
+
370
+ candles = [{"t": r[0], "o": r[1], "h": r[2], "l": r[3], "c": r[4], "v": r[5]} for r in rows]
371
+
372
+ # Aggregate to at most 150 bars for display
373
+ if len(candles) > 150:
374
+ step = len(candles) // 150 + 1
375
+ agg = []
376
+ for i in range(0, len(candles), step):
377
+ chunk = candles[i : i + step]
378
+ agg.append({
379
+ "t": chunk[0]["t"],
380
+ "o": chunk[0]["o"],
381
+ "h": max(c["h"] for c in chunk),
382
+ "l": min(c["l"] for c in chunk),
383
+ "c": chunk[-1]["c"],
384
+ "v": sum(c["v"] for c in chunk),
385
+ })
386
+ candles = agg
387
+
388
+ return jsonify({"symbol": symbol, "period": period, "candles": candles})
389
+
390
+
391
+ # ── SSE stream ─────────────────────────────────────────────────────────────────
392
+
393
  @app.route("/stream")
394
  def stream():
 
395
  def event_stream():
396
  q = Queue(maxsize=100)
397
  with sse_clients_lock:
398
  sse_clients.append(q)
399
  try:
 
400
  yield f"event: connected\ndata: {json.dumps({'status': 'connected'})}\n\n"
 
 
401
  with lock:
402
+ yield (
403
+ f"event: init\ndata: "
404
+ f"{json.dumps({'orders': list(orders), 'bbos': dict(bbos), 'trades': list(trades_cache)})}\n\n"
405
+ )
406
+ # Also send current session state
407
+ yield f"event: session\ndata: {json.dumps({'status': 'started' if session_state['active'] else 'ended'})}\n\n"
408
  while True:
409
  try:
410
+ message = q.get(timeout=30)
411
  yield message
412
  except Empty:
 
413
  yield ": keepalive\n\n"
414
  finally:
415
  with sse_clients_lock:
 
421
  mimetype="text/event-stream",
422
  headers={
423
  "Cache-Control": "no-cache",
424
+ "X-Accel-Buffering": "no",
425
+ "Connection": "keep-alive",
426
+ },
427
  )
428
 
429
+
430
  if __name__ == "__main__":
431
  app.run(host="0.0.0.0", port=5000, debug=True, use_reloader=False)
dashboard/templates/index.html CHANGED
@@ -127,10 +127,43 @@
127
  .ticker-change.up { color: #4caf50; }
128
  .ticker-change.down { color: #f44336; }
129
  .ticker-arrow { margin-right: 3px; }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
130
  </style>
131
  </head>
132
  <body>
133
- <h1>Trading Dashboard <span id="status" class="status connecting"><span class="dot"></span><span id="status-text">Connecting...</span></span></h1>
 
 
 
 
 
 
134
  <div class="container">
135
 
136
  <div class="panel">
@@ -266,6 +299,24 @@
266
 
267
  </div>
268
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
269
  <script>
270
  // State
271
  const state = {
@@ -598,6 +649,25 @@
598
  }
599
  });
600
  if (chartCurrent) chartSel.value = chartCurrent;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
601
  }
602
 
603
  async function renderOrderBookPanel() {
@@ -916,6 +986,11 @@
916
  updateBBO(snap.symbol, snap);
917
  });
918
 
 
 
 
 
 
919
  eventSource.onerror = () => {
920
  setStatus("disconnected", "Disconnected");
921
  state.connected = false;
@@ -943,6 +1018,158 @@
943
  }
944
  }
945
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
946
  // Initial load: fetch data via REST, then connect SSE
947
  async function init() {
948
  await fetchData();
 
127
  .ticker-change.up { color: #4caf50; }
128
  .ticker-change.down { color: #f44336; }
129
  .ticker-arrow { margin-right: 3px; }
130
+
131
+ /* Start/End of Day buttons */
132
+ .btn-day {
133
+ padding: 4px 14px;
134
+ border: none;
135
+ border-radius: 20px;
136
+ cursor: pointer;
137
+ font-size: 12px;
138
+ font-weight: bold;
139
+ }
140
+ .btn-start { background: #28a745; color: #fff; }
141
+ .btn-start:hover { background: #218838; }
142
+ .btn-end { background: #ff9800; color: #fff; }
143
+ .btn-end:hover { background: #e68900; }
144
+ .status.active { background: #d4edda; color: #155724; }
145
+ .status.active .dot { background: #28a745; }
146
+ .status.idle { background: #e8e8e8; color: #666; }
147
+ .status.idle .dot { background: #9e9e9e; }
148
+
149
+ /* Price History panel */
150
+ .panel-full {
151
+ background: #fff;
152
+ border-radius: 8px;
153
+ padding: 10px;
154
+ box-shadow: 0 2px 4px rgba(0,0,0,0.1);
155
+ margin-top: 20px;
156
+ }
157
  </style>
158
  </head>
159
  <body>
160
+ <h1>
161
+ Trading Dashboard
162
+ <span id="status" class="status connecting"><span class="dot"></span><span id="status-text">Connecting...</span></span>
163
+ <span id="session-badge" class="status idle"><span class="dot"></span><span id="session-text">IDLE</span></span>
164
+ <button onclick="startDay()" class="btn-day btn-start">Start of Day</button>
165
+ <button onclick="endDay()" class="btn-day btn-end">End of Day</button>
166
+ </h1>
167
  <div class="container">
168
 
169
  <div class="panel">
 
299
 
300
  </div>
301
 
302
+ <!-- Price History panel (full width) -->
303
+ <div class="panel-full">
304
+ <h2>Price History
305
+ <select id="history-symbol" onchange="loadHistory()" style="margin-left:10px; padding:2px 5px;">
306
+ <option value="">Select symbol...</option>
307
+ </select>
308
+ <select id="history-period" onchange="loadHistory()" style="margin-left:5px; padding:2px 5px;">
309
+ <option value="1h">1 Hour</option>
310
+ <option value="8h">8 Hours</option>
311
+ <option value="1d" selected>1 Day</option>
312
+ <option value="1w">1 Week</option>
313
+ <option value="1m">1 Month</option>
314
+ </select>
315
+ <span id="history-status" style="font-size:12px; color:#999; margin-left:12px;"></span>
316
+ </h2>
317
+ <canvas id="history-chart" style="width:100%; height:350px; display:block;"></canvas>
318
+ </div>
319
+
320
  <script>
321
  // State
322
  const state = {
 
649
  }
650
  });
651
  if (chartCurrent) chartSel.value = chartCurrent;
652
+
653
+ // Update history symbol select
654
+ const histSel = document.getElementById("history-symbol");
655
+ const histCurrent = histSel.value;
656
+ const histOpts = Array.from(histSel.options).map(o => o.value).filter(v => v !== "");
657
+ symbols.forEach(sym => {
658
+ if (!histOpts.includes(sym)) {
659
+ const opt = document.createElement("option");
660
+ opt.value = sym;
661
+ opt.textContent = sym;
662
+ histSel.appendChild(opt);
663
+ }
664
+ });
665
+ if (!histCurrent && histSel.options.length > 1) {
666
+ histSel.value = histSel.options[1].value;
667
+ loadHistory();
668
+ } else if (histCurrent) {
669
+ histSel.value = histCurrent;
670
+ }
671
  }
672
 
673
  async function renderOrderBookPanel() {
 
986
  updateBBO(snap.symbol, snap);
987
  });
988
 
989
+ eventSource.addEventListener("session", (e) => {
990
+ const data = JSON.parse(e.data);
991
+ updateSessionBadge(data.status === "started");
992
+ });
993
+
994
  eventSource.onerror = () => {
995
  setStatus("disconnected", "Disconnected");
996
  state.connected = false;
 
1018
  }
1019
  }
1020
 
1021
+ // ── Session control ────────────────────────────────────────────────────────
1022
+
1023
+ function updateSessionBadge(active) {
1024
+ const badge = document.getElementById("session-badge");
1025
+ const text = document.getElementById("session-text");
1026
+ badge.className = "status " + (active ? "active" : "idle");
1027
+ text.textContent = active ? "ACTIVE" : "IDLE";
1028
+ }
1029
+
1030
+ async function startDay() {
1031
+ if (!confirm("Start of Day: reset all security prices to their opening values and begin simulation?")) return;
1032
+ try {
1033
+ const r = await fetch("/session/start", { method: "POST" });
1034
+ const result = await r.json();
1035
+ if (result.status === "ok") {
1036
+ updateSessionBadge(true);
1037
+ } else {
1038
+ alert("Error: " + (result.error || "Unknown error"));
1039
+ }
1040
+ } catch (e) {
1041
+ alert("Error: " + e.message);
1042
+ }
1043
+ }
1044
+
1045
+ async function endDay() {
1046
+ if (!confirm("End of Day: stop simulation and save closing prices?")) return;
1047
+ try {
1048
+ const r = await fetch("/session/end", { method: "POST" });
1049
+ const result = await r.json();
1050
+ if (result.status === "ok") {
1051
+ updateSessionBadge(false);
1052
+ alert("Day ended. Closing prices saved.");
1053
+ } else {
1054
+ alert("Error: " + (result.error || "Unknown error"));
1055
+ }
1056
+ } catch (e) {
1057
+ alert("Error: " + e.message);
1058
+ }
1059
+ }
1060
+
1061
+ // ── Price History chart ────────────────────────────────────────────────────
1062
+
1063
+ async function loadHistory() {
1064
+ const sym = document.getElementById("history-symbol").value;
1065
+ const period = document.getElementById("history-period").value;
1066
+ const statusEl = document.getElementById("history-status");
1067
+ if (!sym) return;
1068
+ statusEl.textContent = "Loading...";
1069
+ try {
1070
+ const r = await fetch(`/history/${sym}?period=${period}`);
1071
+ const data = await r.json();
1072
+ drawHistoryChart(data.candles || []);
1073
+ statusEl.textContent = (data.candles && data.candles.length)
1074
+ ? `${data.candles.length} candles`
1075
+ : "No data yet";
1076
+ } catch (e) {
1077
+ statusEl.textContent = "Error: " + e.message;
1078
+ }
1079
+ }
1080
+
1081
+ function drawHistoryChart(candles) {
1082
+ const canvas = document.getElementById("history-chart");
1083
+ const ctx = canvas.getContext("2d");
1084
+ canvas.width = canvas.offsetWidth || 900;
1085
+ canvas.height = 350;
1086
+
1087
+ ctx.clearRect(0, 0, canvas.width, canvas.height);
1088
+
1089
+ if (!candles || candles.length === 0) {
1090
+ ctx.fillStyle = "#999";
1091
+ ctx.font = "14px Arial";
1092
+ ctx.textAlign = "center";
1093
+ ctx.fillText("No history data for this period", canvas.width / 2, canvas.height / 2);
1094
+ return;
1095
+ }
1096
+
1097
+ const pad = { top: 25, right: 20, bottom: 40, left: 65 };
1098
+ const W = canvas.width - pad.left - pad.right;
1099
+ const H = canvas.height - pad.top - pad.bottom;
1100
+ const priceH = H * 0.70;
1101
+ const gapH = H * 0.05;
1102
+ const volH = H * 0.25;
1103
+ const priceY = pad.top;
1104
+ const volY = pad.top + priceH + gapH;
1105
+
1106
+ const maxP = Math.max(...candles.map(c => c.h)) * 1.002;
1107
+ const minP = Math.min(...candles.map(c => c.l)) * 0.998;
1108
+ const maxV = Math.max(...candles.map(c => c.v), 1);
1109
+
1110
+ // Price grid lines
1111
+ ctx.strokeStyle = "#eee"; ctx.lineWidth = 1;
1112
+ for (let i = 0; i <= 5; i++) {
1113
+ const y = priceY + (priceH * i / 5);
1114
+ ctx.beginPath(); ctx.moveTo(pad.left, y); ctx.lineTo(canvas.width - pad.right, y); ctx.stroke();
1115
+ const p = maxP - (maxP - minP) * i / 5;
1116
+ ctx.fillStyle = "#666"; ctx.font = "10px Arial"; ctx.textAlign = "right";
1117
+ ctx.fillText(p.toFixed(2), pad.left - 5, y + 4);
1118
+ }
1119
+
1120
+ // Divider between price and volume areas
1121
+ ctx.strokeStyle = "#ddd"; ctx.lineWidth = 1;
1122
+ ctx.beginPath(); ctx.moveTo(pad.left, volY); ctx.lineTo(canvas.width - pad.right, volY); ctx.stroke();
1123
+
1124
+ const n = candles.length;
1125
+ const slotW = W / n;
1126
+ const barW = Math.max(1, Math.floor(slotW * 0.7));
1127
+
1128
+ candles.forEach((c, i) => {
1129
+ const x = pad.left + i * slotW + (slotW - barW) / 2;
1130
+ const isUp = c.c >= c.o;
1131
+ const color = isUp ? "#26a69a" : "#ef5350";
1132
+ const toY = p => priceY + priceH - ((p - minP) / (maxP - minP)) * priceH;
1133
+
1134
+ // Wick
1135
+ const midX = x + barW / 2;
1136
+ ctx.strokeStyle = color; ctx.lineWidth = 1;
1137
+ ctx.beginPath(); ctx.moveTo(midX, toY(c.h)); ctx.lineTo(midX, toY(c.l)); ctx.stroke();
1138
+
1139
+ // Candle body
1140
+ const bodyTop = Math.min(toY(c.o), toY(c.c));
1141
+ const bodyH = Math.max(1, Math.abs(toY(c.c) - toY(c.o)));
1142
+ ctx.fillStyle = color;
1143
+ ctx.fillRect(x, bodyTop, barW, bodyH);
1144
+
1145
+ // Volume bar
1146
+ const vH = (c.v / maxV) * volH;
1147
+ ctx.fillStyle = isUp ? "rgba(38,166,154,0.5)" : "rgba(239,83,80,0.5)";
1148
+ ctx.fillRect(x, volY + volH - vH, barW, vH);
1149
+ });
1150
+
1151
+ // Time axis labels (up to 6 evenly spaced)
1152
+ ctx.fillStyle = "#666"; ctx.font = "10px Arial"; ctx.textAlign = "center";
1153
+ const step = Math.max(1, Math.floor(n / 6));
1154
+ for (let i = 0; i < n; i += step) {
1155
+ const dt = new Date(candles[i].t * 1000);
1156
+ const lbl = (dt.getMonth()+1) + "/" + dt.getDate() + " "
1157
+ + dt.getHours().toString().padStart(2,"0") + ":"
1158
+ + dt.getMinutes().toString().padStart(2,"0");
1159
+ const x = pad.left + i * slotW + slotW / 2;
1160
+ ctx.fillText(lbl, x, canvas.height - pad.bottom + 15);
1161
+ }
1162
+
1163
+ // VOL label rotated
1164
+ ctx.save();
1165
+ ctx.translate(10, volY + volH / 2);
1166
+ ctx.rotate(-Math.PI / 2);
1167
+ ctx.textAlign = "center";
1168
+ ctx.fillStyle = "#aaa"; ctx.font = "9px Arial";
1169
+ ctx.fillText("VOL", 0, 0);
1170
+ ctx.restore();
1171
+ }
1172
+
1173
  // Initial load: fetch data via REST, then connect SSE
1174
  async function init() {
1175
  await fetchData();
docker-compose.yml CHANGED
@@ -155,6 +155,7 @@ services:
155
  volumes:
156
  - ./dashboard:/app
157
  - ./shared:/app/shared
 
158
  depends_on:
159
  - kafka
160
  - matcher
 
155
  volumes:
156
  - ./dashboard:/app
157
  - ./shared:/app/shared
158
+ - ./shared_data:/app/data
159
  depends_on:
160
  - kafka
161
  - matcher
md_feeder/mdf_simulator.py CHANGED
@@ -2,19 +2,23 @@
2
  import sys
3
  sys.path.insert(0, "/app")
4
 
5
- import json, time, random, os
6
 
7
  from shared.config import Config
8
- from shared.kafka_utils import create_producer
9
 
10
  ORDER_INTERVAL = 60.0 / Config.ORDERS_PER_MIN
11
 
 
 
 
 
 
12
  def load_securities():
13
  """Load securities from file: SYMBOL start_price current_price"""
14
  securities = {}
15
  if not os.path.exists(Config.SECURITIES_FILE):
16
  raise FileNotFoundError(f"{Config.SECURITIES_FILE} not found")
17
-
18
  with open(Config.SECURITIES_FILE) as f:
19
  for line in f:
20
  if not line.strip() or line.startswith("#"):
@@ -25,6 +29,7 @@ def load_securities():
25
  securities[symbol] = {"start": start, "current": current}
26
  return securities
27
 
 
28
  def save_securities(securities):
29
  """Persist securities with header line"""
30
  with open(Config.SECURITIES_FILE, "w") as f:
@@ -32,8 +37,10 @@ def save_securities(securities):
32
  for sym, vals in securities.items():
33
  f.write(f"{sym}\t{vals['start']:.2f}\t{vals['current']:.2f}\n")
34
 
 
35
  _order_counter = 0
36
 
 
37
  def make_order(symbol, side, price, qty):
38
  global _order_counter
39
  _order_counter += 1
@@ -47,6 +54,7 @@ def make_order(symbol, side, price, qty):
47
  "source": "MDF"
48
  }
49
 
 
50
  def make_snapshot(symbol, best_bid, best_ask, bid_size, ask_size):
51
  return {
52
  "symbol": symbol,
@@ -58,65 +66,96 @@ def make_snapshot(symbol, best_bid, best_ask, bid_size, ask_size):
58
  "source": "MDF"
59
  }
60
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
  if __name__ == "__main__":
62
  producer = create_producer(component_name="MDF")
63
 
64
- # Load securities and reset start=current
65
- securities = load_securities()
66
- for sym in securities:
67
- securities[sym]["start"] = securities[sym]["current"]
68
- save_securities(securities)
69
- print(f"[MDF] Loaded securities: {list(securities.keys())}")
 
 
 
 
 
 
 
 
 
 
 
 
70
 
71
  try:
72
  while True:
73
- for sym, vals in securities.items():
 
 
 
 
 
 
 
74
  mid = vals["current"]
75
- # Define spread: bids below (mid - spread), asks above (mid + spread)
76
- half_spread = 0.10 # 10 cents spread
77
 
78
- # --- generate order ---
79
- # 90% passive orders (build book), 10% aggressive (create trades)
80
  side = random.choice(["BUY", "SELL"])
81
  rand = random.random()
82
 
83
  if rand < 0.90:
84
  # Passive: place orders away from mid to rest on book
85
  if side == "BUY":
86
- # Bids: from (mid - spread) down to (mid - spread - 0.50)
87
  base = mid - half_spread
88
  offset = random.randint(1, 50) * Config.TICK_SIZE
89
  price = round(base - offset, 2)
90
  else:
91
- # Asks: from (mid + spread) up to (mid + spread + 0.50)
92
  base = mid + half_spread
93
  offset = random.randint(1, 50) * Config.TICK_SIZE
94
  price = round(base + offset, 2)
95
  else:
96
- # Aggressive: cross the spread to create trades (rare)
97
  if side == "BUY":
98
- # Buy into asks
99
  price = round(mid + half_spread + random.randint(1, 5) * Config.TICK_SIZE, 2)
100
  else:
101
- # Sell into bids
102
  price = round(mid - half_spread - random.randint(1, 5) * Config.TICK_SIZE, 2)
103
 
104
  qty = random.choice([50, 100, 150, 200, 250])
105
-
106
  order = make_order(sym, side, price, qty)
107
  producer.send(Config.ORDERS_TOPIC, order)
108
  print(f"[MDF] Order: {order}")
109
 
110
- # --- simulate small price drift (10% chance, max 2 ticks) ---
111
  if random.random() < 0.10:
112
  drift = random.choice([-2, -1, 1, 2]) * Config.TICK_SIZE
113
  new_price = vals["current"] + drift
114
- # Keep price within reasonable bounds (min 1.00)
115
  if new_price >= 1.00:
116
  vals["current"] = round(new_price, 2)
117
- save_securities(securities)
118
 
119
- # --- snapshot around mid price ---
120
  best_bid = mid - half_spread
121
  best_ask = mid + half_spread
122
  bid_size = random.choice([50, 100, 200])
 
2
  import sys
3
  sys.path.insert(0, "/app")
4
 
5
+ import json, time, random, os, threading
6
 
7
  from shared.config import Config
8
+ from shared.kafka_utils import create_producer, create_consumer
9
 
10
  ORDER_INTERVAL = 60.0 / Config.ORDERS_PER_MIN
11
 
12
+ # Module-level state (shared with control listener thread)
13
+ _securities = {}
14
+ _running = True
15
+
16
+
17
  def load_securities():
18
  """Load securities from file: SYMBOL start_price current_price"""
19
  securities = {}
20
  if not os.path.exists(Config.SECURITIES_FILE):
21
  raise FileNotFoundError(f"{Config.SECURITIES_FILE} not found")
 
22
  with open(Config.SECURITIES_FILE) as f:
23
  for line in f:
24
  if not line.strip() or line.startswith("#"):
 
29
  securities[symbol] = {"start": start, "current": current}
30
  return securities
31
 
32
+
33
  def save_securities(securities):
34
  """Persist securities with header line"""
35
  with open(Config.SECURITIES_FILE, "w") as f:
 
37
  for sym, vals in securities.items():
38
  f.write(f"{sym}\t{vals['start']:.2f}\t{vals['current']:.2f}\n")
39
 
40
+
41
  _order_counter = 0
42
 
43
+
44
  def make_order(symbol, side, price, qty):
45
  global _order_counter
46
  _order_counter += 1
 
54
  "source": "MDF"
55
  }
56
 
57
+
58
  def make_snapshot(symbol, best_bid, best_ask, bid_size, ask_size):
59
  return {
60
  "symbol": symbol,
 
66
  "source": "MDF"
67
  }
68
 
69
+
70
+ def listen_control(ctrl_consumer):
71
+ """Background thread: listen for start/stop control messages."""
72
+ global _running, _securities
73
+ print("[MDF] Control listener started")
74
+ for msg in ctrl_consumer:
75
+ action = (msg.value or {}).get("action")
76
+ if action == "stop":
77
+ _running = False
78
+ print("[MDF] STOP signal received – pausing simulation")
79
+ elif action == "start":
80
+ try:
81
+ new_secs = load_securities()
82
+ _securities.clear()
83
+ _securities.update(new_secs)
84
+ print(f"[MDF] START signal – reloaded securities: {list(_securities.keys())}")
85
+ except Exception as e:
86
+ print(f"[MDF] Error reloading securities on start: {e}")
87
+ _running = True
88
+ print("[MDF] Simulation resumed")
89
+
90
+
91
  if __name__ == "__main__":
92
  producer = create_producer(component_name="MDF")
93
 
94
+ # Load securities and snapshot start prices
95
+ _securities = load_securities()
96
+ for sym in _securities:
97
+ _securities[sym]["start"] = _securities[sym]["current"]
98
+ save_securities(_securities)
99
+ print(f"[MDF] Loaded securities: {list(_securities.keys())}")
100
+
101
+ # Start control consumer in background thread
102
+ try:
103
+ ctrl_consumer = create_consumer(
104
+ topics=[Config.CONTROL_TOPIC],
105
+ group_id="md-feeder-control",
106
+ component_name="MDF-Control",
107
+ auto_offset_reset="latest",
108
+ )
109
+ threading.Thread(target=listen_control, args=(ctrl_consumer,), daemon=True).start()
110
+ except Exception as e:
111
+ print(f"[MDF] Warning: could not start control consumer: {e}")
112
 
113
  try:
114
  while True:
115
+ if not _running:
116
+ time.sleep(0.5)
117
+ continue
118
+
119
+ for sym, vals in list(_securities.items()):
120
+ if not _running:
121
+ break
122
+
123
  mid = vals["current"]
124
+ half_spread = 0.10
 
125
 
 
 
126
  side = random.choice(["BUY", "SELL"])
127
  rand = random.random()
128
 
129
  if rand < 0.90:
130
  # Passive: place orders away from mid to rest on book
131
  if side == "BUY":
 
132
  base = mid - half_spread
133
  offset = random.randint(1, 50) * Config.TICK_SIZE
134
  price = round(base - offset, 2)
135
  else:
 
136
  base = mid + half_spread
137
  offset = random.randint(1, 50) * Config.TICK_SIZE
138
  price = round(base + offset, 2)
139
  else:
140
+ # Aggressive: cross the spread to create trades
141
  if side == "BUY":
 
142
  price = round(mid + half_spread + random.randint(1, 5) * Config.TICK_SIZE, 2)
143
  else:
 
144
  price = round(mid - half_spread - random.randint(1, 5) * Config.TICK_SIZE, 2)
145
 
146
  qty = random.choice([50, 100, 150, 200, 250])
 
147
  order = make_order(sym, side, price, qty)
148
  producer.send(Config.ORDERS_TOPIC, order)
149
  print(f"[MDF] Order: {order}")
150
 
151
+ # Simulate small price drift (10% chance, max 2 ticks)
152
  if random.random() < 0.10:
153
  drift = random.choice([-2, -1, 1, 2]) * Config.TICK_SIZE
154
  new_price = vals["current"] + drift
 
155
  if new_price >= 1.00:
156
  vals["current"] = round(new_price, 2)
157
+ save_securities(_securities)
158
 
 
159
  best_bid = mid - half_spread
160
  best_ask = mid + half_spread
161
  bid_size = random.choice([50, 100, 200])
shared/config.py CHANGED
@@ -26,6 +26,9 @@ class Config:
26
  SECURITIES_FILE: str = os.getenv("SECURITIES_FILE", "/app/data/securities.txt")
27
  ORDER_ID_FILE: str = os.getenv("ORDER_ID_FILE", "/app/data/order_id.txt")
28
 
 
 
 
29
  # Trading simulation
30
  TICK_SIZE: float = float(os.getenv("TICK_SIZE", "0.05"))
31
  ORDERS_PER_MIN: int = int(os.getenv("ORDERS_PER_MIN", "8"))
 
26
  SECURITIES_FILE: str = os.getenv("SECURITIES_FILE", "/app/data/securities.txt")
27
  ORDER_ID_FILE: str = os.getenv("ORDER_ID_FILE", "/app/data/order_id.txt")
28
 
29
+ # Control topic for start/end of day signals
30
+ CONTROL_TOPIC: str = os.getenv("CONTROL_TOPIC", "control")
31
+
32
  # Trading simulation
33
  TICK_SIZE: float = float(os.getenv("TICK_SIZE", "0.05"))
34
  ORDERS_PER_MIN: int = int(os.getenv("ORDERS_PER_MIN", "8"))
shared_data/securities.txt CHANGED
@@ -1,6 +1,6 @@
1
  #SYMBOL <start_price> <current_price>
2
- ALPHA 24.90 24.95
3
- PEIR 18.50 18.05
4
- EXAE 42.00 42.05
5
- QUEST 12.70 12.60
6
- NBG 18.50 18.05
 
1
  #SYMBOL <start_price> <current_price>
2
+ ALPHA 24.95 25.15
3
+ PEIR 18.05 18.05
4
+ EXAE 42.05 42.05
5
+ QUEST 12.60 12.65
6
+ NBG 18.05 18.05