Rohan03 commited on
Commit
59578ae
Β·
verified Β·
1 Parent(s): 2f097f7

Fix Issue 2: Make backpressure test T1.6 more robust with consumer_started sync

Browse files
Files changed (1) hide show
  1. tests/test_sprint1_events.py +14 -21
tests/test_sprint1_events.py CHANGED
@@ -52,7 +52,6 @@ for lane_id in lanes:
52
  for lane_id in lanes:
53
  lane_events = bus.replay(run_id="run1", lane_id=lane_id)
54
  lanes[lane_id] = lane_events
55
- # Check no contamination
56
  for e in lane_events:
57
  if e.lane_id != lane_id:
58
  check(f"Lane {lane_id} contamination", False, f"found {e.lane_id}")
@@ -64,13 +63,10 @@ for lane_id in lanes:
64
  # ═══ T1.2: Agent B fails; A and C continue ═══
65
  print("\nT1.2: Fault isolation")
66
  bus2 = EventBus()
67
- # Lane A: normal
68
  bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="a", seq=1))
69
  bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="a", seq=2))
70
- # Lane B: error
71
  bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="b", seq=1))
72
  bus2.emit(create_event("r2", EventKind.AGENT_ERROR, lane_id="b", seq=2, error="crashed"))
73
- # Lane C: normal (continues despite B's failure)
74
  bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="c", seq=1))
75
  bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="c", seq=2))
76
 
@@ -104,7 +100,6 @@ for i in range(5):
104
  original.append(e.to_dict())
105
 
106
  replayed = [e.to_dict() for e in bus4.replay(run_id="r4")]
107
- # Compare payloads (span_id will differ but payload should match)
108
  orig_payloads = [d["payload"] for d in original]
109
  replay_payloads = [d["payload"] for d in replayed]
110
  check("Replay matches original", orig_payloads == replay_payloads)
@@ -114,15 +109,13 @@ check("Replay matches original", orig_payloads == replay_payloads)
114
  print("\nT1.5: Safety β€” no raw chain-of-thought")
115
  bus5 = EventBus()
116
 
117
- # Safe: reasoning summary
118
  safe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=1,
119
  summary="I should check edge cases first")
120
  bus5.emit(safe_event)
121
  check("reasoning.summary accepted", bus5.history_size == 1)
122
 
123
- # Unsafe: raw chain-of-thought (should be REJECTED)
124
  unsafe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=2,
125
- hidden_chain_of_thought="<think>Let me think step by step about how to hack the system...</think>")
126
  bus5.emit(unsafe_event)
127
  check("hidden_chain_of_thought rejected", bus5.history_size == 1,
128
  f"history has {bus5.history_size} events (should still be 1)")
@@ -132,35 +125,36 @@ check("hidden_chain_of_thought rejected", bus5.history_size == 1,
132
  print("\nT1.6: Terminal event delivery guarantee")
133
 
134
  async def test_backpressure():
135
- bus6 = EventBus(max_queue_size=3) # Very small queue
136
-
137
  received = []
 
138
 
139
  async def consumer():
140
- async for event in bus6.subscribe():
141
- received.append(event)
142
- await asyncio.sleep(0.01) # Slow consumer
 
 
 
 
143
 
144
- # Start consumer
145
  task = asyncio.create_task(consumer())
 
146
  await asyncio.sleep(0.05)
147
 
148
- # Flood with non-terminal events
149
  for i in range(20):
150
  bus6.emit(create_event("r6", EventKind.TEXT_DELTA, seq=i, text=f"w{i}"))
151
 
152
- # Send terminal event
153
  bus6.emit(create_event("r6", EventKind.RUN_FINISHED, seq=99, result="done"))
154
 
155
- await asyncio.sleep(0.5)
156
  bus6.close()
157
  task.cancel()
158
  try:
159
- await task
160
- except asyncio.CancelledError:
161
  pass
162
 
163
- # Terminal event MUST be in received (even though queue was full)
164
  has_terminal = any(e.kind == EventKind.RUN_FINISHED for e in received)
165
  return has_terminal
166
 
@@ -186,7 +180,6 @@ lane_a = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="a", seq=i, ts=10
186
  lane_b = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="b", seq=i, ts=1000+i+0.5) for i in range(3)]
187
  merged = parallel_merge({"a": lane_a, "b": lane_b})
188
  check("Merged has all events", len(merged) == 6)
189
- # Check interleaving: a0, b0, a1, b1, a2, b2 (by timestamp)
190
  check("Merged is timestamp-sorted", all(merged[i].ts <= merged[i+1].ts for i in range(5)))
191
 
192
 
 
52
  for lane_id in lanes:
53
  lane_events = bus.replay(run_id="run1", lane_id=lane_id)
54
  lanes[lane_id] = lane_events
 
55
  for e in lane_events:
56
  if e.lane_id != lane_id:
57
  check(f"Lane {lane_id} contamination", False, f"found {e.lane_id}")
 
63
  # ═══ T1.2: Agent B fails; A and C continue ═══
64
  print("\nT1.2: Fault isolation")
65
  bus2 = EventBus()
 
66
  bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="a", seq=1))
67
  bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="a", seq=2))
 
68
  bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="b", seq=1))
69
  bus2.emit(create_event("r2", EventKind.AGENT_ERROR, lane_id="b", seq=2, error="crashed"))
 
70
  bus2.emit(create_event("r2", EventKind.AGENT_STARTED, lane_id="c", seq=1))
71
  bus2.emit(create_event("r2", EventKind.AGENT_FINISHED, lane_id="c", seq=2))
72
 
 
100
  original.append(e.to_dict())
101
 
102
  replayed = [e.to_dict() for e in bus4.replay(run_id="r4")]
 
103
  orig_payloads = [d["payload"] for d in original]
104
  replay_payloads = [d["payload"] for d in replayed]
105
  check("Replay matches original", orig_payloads == replay_payloads)
 
109
  print("\nT1.5: Safety β€” no raw chain-of-thought")
110
  bus5 = EventBus()
111
 
 
112
  safe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=1,
113
  summary="I should check edge cases first")
114
  bus5.emit(safe_event)
115
  check("reasoning.summary accepted", bus5.history_size == 1)
116
 
 
117
  unsafe_event = create_event("r5", EventKind.REASONING_SUMMARY, seq=2,
118
+ hidden_chain_of_thought="secret reasoning here")
119
  bus5.emit(unsafe_event)
120
  check("hidden_chain_of_thought rejected", bus5.history_size == 1,
121
  f"history has {bus5.history_size} events (should still be 1)")
 
125
  print("\nT1.6: Terminal event delivery guarantee")
126
 
127
  async def test_backpressure():
128
+ bus6 = EventBus(max_queue_size=3)
 
129
  received = []
130
+ consumer_started = asyncio.Event()
131
 
132
  async def consumer():
133
+ consumer_started.set()
134
+ try:
135
+ async for event in bus6.subscribe():
136
+ received.append(event)
137
+ await asyncio.sleep(0.01)
138
+ except asyncio.CancelledError:
139
+ pass
140
 
 
141
  task = asyncio.create_task(consumer())
142
+ await consumer_started.wait()
143
  await asyncio.sleep(0.05)
144
 
 
145
  for i in range(20):
146
  bus6.emit(create_event("r6", EventKind.TEXT_DELTA, seq=i, text=f"w{i}"))
147
 
 
148
  bus6.emit(create_event("r6", EventKind.RUN_FINISHED, seq=99, result="done"))
149
 
150
+ await asyncio.sleep(1.0)
151
  bus6.close()
152
  task.cancel()
153
  try:
154
+ await asyncio.wait_for(task, timeout=2.0)
155
+ except (asyncio.CancelledError, asyncio.TimeoutError):
156
  pass
157
 
 
158
  has_terminal = any(e.kind == EventKind.RUN_FINISHED for e in received)
159
  return has_terminal
160
 
 
180
  lane_b = [create_event("r8", EventKind.AGENT_PROGRESS, lane_id="b", seq=i, ts=1000+i+0.5) for i in range(3)]
181
  merged = parallel_merge({"a": lane_a, "b": lane_b})
182
  check("Merged has all events", len(merged) == 6)
 
183
  check("Merged is timestamp-sorted", all(merged[i].ts <= merged[i+1].ts for i in range(5)))
184
 
185