Spaces:
Running
Running
File size: 11,877 Bytes
02eac4b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
import asyncio
import pytest
from lerobot_arena_client import (
RoboticsProducer,
create_consumer_client,
create_producer_client,
)
class TestIntegration:
"""End-to-end integration tests."""
@pytest.mark.asyncio
async def test_full_producer_consumer_workflow(self):
"""Test complete producer-consumer workflow."""
# Create producer and room
producer = await create_producer_client("http://localhost:8000")
room_id = producer.room_id
try:
# Create consumer and connect to same room
consumer = await create_consumer_client(room_id, "http://localhost:8000")
try:
# Set up consumer to collect messages
received_states = []
received_updates = []
received_errors = []
def on_state_sync(state):
received_states.append(state)
def on_joint_update(joints):
received_updates.append(joints)
def on_error(error):
received_errors.append(error)
consumer.on_state_sync(on_state_sync)
consumer.on_joint_update(on_joint_update)
consumer.on_error(on_error)
# Wait for connections to stabilize
await asyncio.sleep(0.2)
# Producer sends initial state
initial_state = {"shoulder": 0.0, "elbow": 0.0, "wrist": 0.0}
await producer.send_state_sync(initial_state)
await asyncio.sleep(0.1)
# Producer sends series of joint updates
joint_sequences = [
[{"name": "shoulder", "value": 45.0}],
[{"name": "elbow", "value": -30.0}],
[{"name": "wrist", "value": 15.0}],
[
{"name": "shoulder", "value": 90.0},
{"name": "elbow", "value": -60.0},
],
]
for joints in joint_sequences:
await producer.send_joint_update(joints)
await asyncio.sleep(0.1)
# Wait for all messages to be received
await asyncio.sleep(0.3)
# Verify consumer received messages
assert len(received_updates) >= 4 # At least the joint updates
# Verify final state
final_state = await consumer.get_state_sync()
expected_final_state = {"shoulder": 90.0, "elbow": -60.0, "wrist": 15.0}
assert final_state == expected_final_state
finally:
await consumer.disconnect()
finally:
await producer.disconnect()
await producer.delete_room(room_id)
@pytest.mark.asyncio
async def test_multiple_consumers_same_room(self):
"""Test multiple consumers receiving same messages."""
producer = await create_producer_client("http://localhost:8000")
room_id = producer.room_id
try:
# Create multiple consumers
consumer1 = await create_consumer_client(room_id, "http://localhost:8000")
consumer2 = await create_consumer_client(room_id, "http://localhost:8000")
try:
# Set up message collection for both consumers
consumer1_updates = []
consumer2_updates = []
consumer1.on_joint_update(
lambda joints: consumer1_updates.append(joints)
)
consumer2.on_joint_update(
lambda joints: consumer2_updates.append(joints)
)
# Wait for connections
await asyncio.sleep(0.2)
# Producer sends updates
test_joints = [
{"name": "joint1", "value": 10.0},
{"name": "joint2", "value": 20.0},
]
await producer.send_joint_update(test_joints)
# Wait for message propagation
await asyncio.sleep(0.2)
# Both consumers should receive the same update
assert len(consumer1_updates) >= 1
assert len(consumer2_updates) >= 1
# Verify both received same data
if consumer1_updates and consumer2_updates:
assert consumer1_updates[-1] == consumer2_updates[-1]
finally:
await consumer1.disconnect()
await consumer2.disconnect()
finally:
await producer.disconnect()
await producer.delete_room(room_id)
@pytest.mark.asyncio
async def test_emergency_stop_propagation(self):
"""Test emergency stop propagation to all consumers."""
producer = await create_producer_client("http://localhost:8000")
room_id = producer.room_id
try:
# Create consumers
consumer1 = await create_consumer_client(room_id, "http://localhost:8000")
consumer2 = await create_consumer_client(room_id, "http://localhost:8000")
try:
# Set up error collection
consumer1_errors = []
consumer2_errors = []
consumer1.on_error(lambda error: consumer1_errors.append(error))
consumer2.on_error(lambda error: consumer2_errors.append(error))
# Wait for connections
await asyncio.sleep(0.2)
# Producer sends emergency stop
await producer.send_emergency_stop("Integration test emergency stop")
# Wait for message propagation
await asyncio.sleep(0.2)
# Both consumers should receive emergency stop
assert len(consumer1_errors) >= 1
assert len(consumer2_errors) >= 1
# Verify error messages contain emergency stop info
if consumer1_errors:
assert "emergency stop" in consumer1_errors[-1].lower()
if consumer2_errors:
assert "emergency stop" in consumer2_errors[-1].lower()
finally:
await consumer1.disconnect()
await consumer2.disconnect()
finally:
await producer.disconnect()
await producer.delete_room(room_id)
@pytest.mark.asyncio
async def test_producer_reconnection_workflow(self):
"""Test producer reconnecting and resuming operation."""
# Create room first
temp_producer = RoboticsProducer("http://localhost:8000")
room_id = await temp_producer.create_room()
try:
# Create consumer first
consumer = await create_consumer_client(room_id, "http://localhost:8000")
try:
received_updates = []
consumer.on_joint_update(lambda joints: received_updates.append(joints))
# Create producer and connect
producer = RoboticsProducer("http://localhost:8000")
await producer.connect(room_id)
# Send initial update
await producer.send_state_sync({"joint1": 10.0})
await asyncio.sleep(0.1)
# Disconnect producer
await producer.disconnect()
# Reconnect producer
await producer.connect(room_id)
# Send another update
await producer.send_state_sync({"joint1": 20.0})
await asyncio.sleep(0.2)
# Consumer should have received both updates
assert len(received_updates) >= 2
await producer.disconnect()
finally:
await consumer.disconnect()
finally:
await temp_producer.delete_room(room_id)
@pytest.mark.asyncio
async def test_consumer_late_join(self):
"""Test consumer joining room after producer has sent updates."""
producer = await create_producer_client("http://localhost:8000")
room_id = producer.room_id
try:
# Producer sends some updates before consumer joins
await producer.send_state_sync({"joint1": 10.0, "joint2": 20.0})
await asyncio.sleep(0.1)
await producer.send_joint_update([{"name": "joint3", "value": 30.0}])
await asyncio.sleep(0.1)
# Now consumer joins
consumer = await create_consumer_client(room_id, "http://localhost:8000")
try:
# Consumer should be able to get current state
current_state = await consumer.get_state_sync()
# Should contain all previously sent updates
expected_state = {"joint1": 10.0, "joint2": 20.0, "joint3": 30.0}
assert current_state == expected_state
finally:
await consumer.disconnect()
finally:
await producer.disconnect()
await producer.delete_room(room_id)
@pytest.mark.asyncio
async def test_room_cleanup_on_producer_disconnect(self):
"""Test room state when producer disconnects."""
producer = await create_producer_client("http://localhost:8000")
room_id = producer.room_id
try:
consumer = await create_consumer_client(room_id, "http://localhost:8000")
try:
# Send some state
await producer.send_state_sync({"joint1": 42.0})
await asyncio.sleep(0.1)
# Verify state exists
state_before = await consumer.get_state_sync()
assert state_before == {"joint1": 42.0}
# Producer disconnects
await producer.disconnect()
await asyncio.sleep(0.1)
# State should still be accessible to consumer
state_after = await consumer.get_state_sync()
assert state_after == {"joint1": 42.0}
finally:
await consumer.disconnect()
finally:
# Clean up room manually since producer disconnected
temp_producer = RoboticsProducer("http://localhost:8000")
await temp_producer.delete_room(room_id)
@pytest.mark.asyncio
async def test_high_frequency_updates(self):
"""Test handling high frequency updates."""
producer = await create_producer_client("http://localhost:8000")
room_id = producer.room_id
try:
consumer = await create_consumer_client(room_id, "http://localhost:8000")
try:
received_updates = []
consumer.on_joint_update(lambda joints: received_updates.append(joints))
# Wait for connection
await asyncio.sleep(0.1)
# Send rapid updates
for i in range(20):
await producer.send_state_sync({"joint1": float(i), "timestamp": i})
await asyncio.sleep(0.01) # 10ms intervals
# Wait for all messages
await asyncio.sleep(0.5)
# Should have received multiple updates
# (exact number may vary due to change detection)
assert len(received_updates) >= 5
# Final state should reflect last update
final_state = await consumer.get_state_sync()
assert final_state["joint1"] == 19.0
assert final_state["timestamp"] == 19
finally:
await consumer.disconnect()
finally:
await producer.disconnect()
await producer.delete_room(room_id)
|