| """ |
| Tests for public ComfyAPI and ComfyAPISync functions. |
| |
| These tests verify that the public API methods work correctly in both sync and async contexts, |
| ensuring that the sync wrapper generation (via get_type_hints() in async_to_sync.py) correctly |
| handles string annotations from 'from __future__ import annotations'. |
| """ |
|
|
| import pytest |
| import time |
| import subprocess |
| import torch |
| from pytest import fixture |
| from comfy_execution.graph_utils import GraphBuilder |
| from tests.execution.test_execution import ComfyClient |
|
|
|
|
| @pytest.mark.execution |
| class TestPublicAPI: |
| """Test suite for public ComfyAPI and ComfyAPISync methods.""" |
|
|
| @fixture(scope="class", autouse=True) |
| def _server(self, args_pytest): |
| """Start ComfyUI server for testing.""" |
| pargs = [ |
| 'python', 'main.py', |
| '--output-directory', args_pytest["output_dir"], |
| '--listen', args_pytest["listen"], |
| '--port', str(args_pytest["port"]), |
| '--extra-model-paths-config', 'tests/execution/extra_model_paths.yaml', |
| '--cpu', |
| ] |
| p = subprocess.Popen(pargs) |
| yield |
| p.kill() |
| torch.cuda.empty_cache() |
|
|
| @fixture(scope="class", autouse=True) |
| def shared_client(self, args_pytest, _server): |
| """Create shared client with connection retry.""" |
| client = ComfyClient() |
| n_tries = 5 |
| for i in range(n_tries): |
| time.sleep(4) |
| try: |
| client.connect(listen=args_pytest["listen"], port=args_pytest["port"]) |
| break |
| except ConnectionRefusedError: |
| if i == n_tries - 1: |
| raise |
| yield client |
| del client |
| torch.cuda.empty_cache() |
|
|
| @fixture |
| def client(self, shared_client, request): |
| """Set test name for each test.""" |
| shared_client.set_test_name(f"public_api[{request.node.name}]") |
| yield shared_client |
|
|
| @fixture |
| def builder(self, request): |
| """Create GraphBuilder for each test.""" |
| yield GraphBuilder(prefix=request.node.name) |
|
|
| def test_sync_progress_update_executes(self, client: ComfyClient, builder: GraphBuilder): |
| """Test that TestSyncProgressUpdate executes without errors. |
| |
| This test validates that api_sync.execution.set_progress() works correctly, |
| which is the primary code path fixed by adding get_type_hints() to async_to_sync.py. |
| """ |
| g = builder |
| image = g.node("StubImage", content="BLACK", height=256, width=256, batch_size=1) |
|
|
| |
| progress_node = g.node("TestSyncProgressUpdate", |
| value=image.out(0), |
| sleep_seconds=0.5) |
| output = g.node("SaveImage", images=progress_node.out(0)) |
|
|
| |
| result = client.run(g) |
|
|
| |
| assert result.did_run(progress_node), "Progress node should have executed" |
| assert result.did_run(output), "Output node should have executed" |
|
|
| |
| images = result.get_images(output) |
| assert len(images) == 1, "Should have produced 1 image" |
|
|
| def test_async_progress_update_executes(self, client: ComfyClient, builder: GraphBuilder): |
| """Test that TestAsyncProgressUpdate executes without errors. |
| |
| This test validates that await api.execution.set_progress() works correctly |
| in async contexts. |
| """ |
| g = builder |
| image = g.node("StubImage", content="WHITE", height=256, width=256, batch_size=1) |
|
|
| |
| progress_node = g.node("TestAsyncProgressUpdate", |
| value=image.out(0), |
| sleep_seconds=0.5) |
| output = g.node("SaveImage", images=progress_node.out(0)) |
|
|
| |
| result = client.run(g) |
|
|
| |
| assert result.did_run(progress_node), "Async progress node should have executed" |
| assert result.did_run(output), "Output node should have executed" |
|
|
| |
| images = result.get_images(output) |
| assert len(images) == 1, "Should have produced 1 image" |
|
|
| def test_sync_and_async_progress_together(self, client: ComfyClient, builder: GraphBuilder): |
| """Test both sync and async progress updates in same workflow. |
| |
| This test ensures that both ComfyAPISync and ComfyAPI can coexist and work |
| correctly in the same workflow execution. |
| """ |
| g = builder |
| image1 = g.node("StubImage", content="BLACK", height=256, width=256, batch_size=1) |
| image2 = g.node("StubImage", content="WHITE", height=256, width=256, batch_size=1) |
|
|
| |
| sync_progress = g.node("TestSyncProgressUpdate", |
| value=image1.out(0), |
| sleep_seconds=0.3) |
| async_progress = g.node("TestAsyncProgressUpdate", |
| value=image2.out(0), |
| sleep_seconds=0.3) |
|
|
| |
| output1 = g.node("SaveImage", images=sync_progress.out(0)) |
| output2 = g.node("SaveImage", images=async_progress.out(0)) |
|
|
| |
| result = client.run(g) |
|
|
| |
| assert result.did_run(sync_progress), "Sync progress node should have executed" |
| assert result.did_run(async_progress), "Async progress node should have executed" |
| assert result.did_run(output1), "First output node should have executed" |
| assert result.did_run(output2), "Second output node should have executed" |
|
|
| |
| images1 = result.get_images(output1) |
| images2 = result.get_images(output2) |
| assert len(images1) == 1, "Should have produced 1 image from sync node" |
| assert len(images2) == 1, "Should have produced 1 image from async node" |
|
|