Spaces:
Running
Running
| import asyncio | |
| import numpy as np | |
| import pytest | |
| from reachy_mini.daemon.daemon import Daemon | |
| from reachy_mini.reachy_mini import ReachyMini | |
| async def test_daemon_start_stop() -> None: | |
| from reachy_mini.daemon.daemon import Daemon | |
| daemon = Daemon() | |
| await daemon.start( | |
| sim=True, | |
| headless=True, | |
| wake_up_on_start=False, | |
| ) | |
| await daemon.stop(goto_sleep_on_stop=False) | |
| async def test_daemon_multiple_start_stop() -> None: | |
| daemon = Daemon() | |
| for _ in range(3): | |
| await daemon.start( | |
| sim=True, | |
| headless=True, | |
| wake_up_on_start=False, | |
| ) | |
| await daemon.stop(goto_sleep_on_stop=False) | |
| async def test_daemon_client_disconnection() -> None: | |
| daemon = Daemon() | |
| await daemon.start( | |
| sim=True, | |
| headless=True, | |
| wake_up_on_start=False, | |
| ) | |
| client_connected = asyncio.Event() | |
| async def simple_client() -> None: | |
| with ReachyMini(media_backend="no_media") as mini: | |
| status = mini.client.get_status() | |
| assert status['state'] == "running" | |
| assert status['simulation_enabled'] | |
| assert status['error'] is None | |
| assert status['backend_status']['motor_control_mode'] == "enabled" | |
| assert status['backend_status']['error'] is None | |
| assert status['wlan_ip'] is None | |
| client_connected.set() | |
| async def wait_for_client() -> None: | |
| await client_connected.wait() | |
| await daemon.stop(goto_sleep_on_stop=False) | |
| await asyncio.gather(simple_client(), wait_for_client()) | |
| async def test_daemon_early_stop() -> None: | |
| daemon = Daemon() | |
| await daemon.start( | |
| sim=True, | |
| headless=True, | |
| wake_up_on_start=False, | |
| ) | |
| client_connected = asyncio.Event() | |
| daemon_stopped = asyncio.Event() | |
| async def client_bg() -> None: | |
| with ReachyMini(media_backend="no_media") as reachy: | |
| client_connected.set() | |
| await daemon_stopped.wait() | |
| # Make sure the keep-alive check runs at least once | |
| reachy.client._check_alive_evt.clear() | |
| reachy.client._check_alive_evt.wait(timeout=100.0) | |
| with pytest.raises(ConnectionError, match="Lost connection with the server."): | |
| reachy.set_target(head=np.eye(4)) | |
| async def will_stop_soon() -> None: | |
| await client_connected.wait() | |
| await daemon.stop(goto_sleep_on_stop=False) | |
| daemon_stopped.set() | |
| await asyncio.gather(client_bg(), will_stop_soon()) | |