gradio / client /python /test /test_client.py
mindmime's picture
Upload folder using huggingface_hub
a03b3ba verified
import json
import os
import pathlib
import tempfile
import time
import uuid
from concurrent.futures import CancelledError, TimeoutError, wait
from contextlib import contextmanager
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
import gradio as gr
import huggingface_hub
import pytest
import uvicorn
from fastapi import FastAPI
from gradio.networking import Server
from huggingface_hub import HfFolder
from huggingface_hub.utils import RepositoryNotFoundError
from gradio_client import Client
from gradio_client.client import DEFAULT_TEMP_DIR
from gradio_client.utils import (
Communicator,
ProgressUnit,
QueueError,
Status,
StatusUpdate,
)
HF_TOKEN = os.getenv("HF_TOKEN") or HfFolder.get_token()
@contextmanager
def connect(
demo: gr.Blocks, serialize: bool = True, output_dir: str = DEFAULT_TEMP_DIR
):
_, local_url, _ = demo.launch(prevent_thread_lock=True)
try:
yield Client(local_url, serialize=serialize, output_dir=output_dir)
finally:
# A more verbose version of .close()
# because we should set a timeout
# the tests that call .cancel() can get stuck
# waiting for the thread to join
if demo.enable_queue:
demo._queue.close()
demo.is_running = False
demo.server.should_exit = True
demo.server.thread.join(timeout=1)
class TestClientPredictions:
@pytest.mark.flaky
def test_raise_error_invalid_state(self):
with pytest.raises(ValueError, match="invalid state"):
Client("gradio-tests/paused-space")
@pytest.mark.flaky
def test_numerical_to_label_space(self):
client = Client("gradio-tests/titanic-survival")
label = json.load(
open(client.predict("male", 77, 10, api_name="/predict")) # noqa: SIM115
)
assert label["label"] == "Perishes"
with pytest.raises(
ValueError,
match="This Gradio app might have multiple endpoints. Please specify an `api_name` or `fn_index`",
):
client.predict("male", 77, 10)
with pytest.raises(
ValueError,
match="Cannot find a function with `api_name`: predict. Did you mean to use a leading slash?",
):
client.predict("male", 77, 10, api_name="predict")
@pytest.mark.flaky
def test_numerical_to_label_space_v4(self):
client = Client("gradio-tests/titanic-survivalv4-sse")
label = client.predict("male", 77, 10, api_name="/predict")
assert label["label"] == "Perishes"
@pytest.mark.flaky
def test_private_space(self):
space_id = "gradio-tests/not-actually-private-space"
api = huggingface_hub.HfApi()
assert api.space_info(space_id).private
client = Client(space_id)
output = client.predict("abc", api_name="/predict")
assert output == "abc"
@pytest.mark.flaky
def test_private_space_v4(self):
space_id = "gradio-tests/not-actually-private-spacev4-sse"
api = huggingface_hub.HfApi()
assert api.space_info(space_id).private
client = Client(
space_id,
)
output = client.predict("abc", api_name="/predict")
assert output == "abc"
@pytest.mark.flaky
def test_private_space_v4_sse_v1(self):
space_id = "gradio-tests/not-actually-private-spacev4-sse-v1"
api = huggingface_hub.HfApi()
assert api.space_info(space_id).private
client = Client(
space_id,
)
output = client.predict("abc", api_name="/predict")
assert output == "abc"
def test_state(self, increment_demo):
with connect(increment_demo) as client:
output = client.predict(api_name="/increment_without_queue")
assert output == 1
output = client.predict(api_name="/increment_without_queue")
assert output == 2
output = client.predict(api_name="/increment_without_queue")
assert output == 3
client.reset_session()
output = client.predict(api_name="/increment_without_queue")
assert output == 1
output = client.predict(api_name="/increment_with_queue")
assert output == 2
client.reset_session()
output = client.predict(api_name="/increment_with_queue")
assert output == 1
output = client.predict(api_name="/increment_with_queue")
assert output == 2
def test_job_status(self, calculator_demo):
with connect(calculator_demo) as client:
statuses = []
job = client.submit(5, "add", 4, api_name="/predict")
while not job.done():
time.sleep(0.1)
statuses.append(job.status())
assert statuses
# Messages are sorted by time
assert sorted([s.time for s in statuses if s]) == [
s.time for s in statuses if s
]
assert sorted([s.code for s in statuses if s]) == [
s.code for s in statuses if s
]
@pytest.mark.flaky
def test_intermediate_outputs(self, count_generator_demo):
with connect(count_generator_demo) as client:
job = client.submit(3, fn_index=0)
while not job.done():
time.sleep(0.1)
assert job.outputs() == [str(i) for i in range(3)]
outputs = []
for o in client.submit(3, fn_index=0):
outputs.append(o)
assert outputs == [str(i) for i in range(3)]
@pytest.mark.flaky
def test_intermediate_outputs_with_exception(self, count_generator_demo_exception):
with connect(count_generator_demo_exception) as client:
with pytest.raises(Exception):
client.predict(7, api_name="/count")
with pytest.raises(
ValueError, match="Cannot call predict on this function"
):
client.predict(5, api_name="/count_forever")
def test_break_in_loop_if_error(self, calculator_demo):
with connect(calculator_demo) as client:
job = client.submit("foo", "add", 4, fn_index=0)
output = list(job)
assert output == []
@pytest.mark.flaky
def test_timeout(self, sentiment_classification_demo):
with pytest.raises(TimeoutError):
with connect(sentiment_classification_demo.queue()) as client:
job = client.submit(api_name="/sleep")
job.result(timeout=0.05)
@pytest.mark.flaky
def test_timeout_no_queue(self, sentiment_classification_demo):
with pytest.raises(TimeoutError):
with connect(sentiment_classification_demo) as client:
job = client.submit(api_name="/sleep")
job.result(timeout=0.1)
def test_raises_exception(self, calculator_demo):
with pytest.raises(Exception):
with connect(calculator_demo) as client:
job = client.submit("foo", "add", 9, fn_index=0)
job.result()
def test_raises_exception_no_queue(self, sentiment_classification_demo):
with pytest.raises(Exception):
with connect(sentiment_classification_demo) as client:
job = client.submit([5], api_name="/sleep")
job.result()
def test_job_output_video(self, video_component):
with connect(video_component) as client:
job = client.submit(
{
"video": "https://huggingface.co/spaces/gradio/video_component/resolve/main/files/a.mp4"
},
fn_index=0,
)
assert Path(job.result()["video"]).exists()
assert (
Path(DEFAULT_TEMP_DIR).resolve()
in Path(job.result()["video"]).resolve().parents
)
temp_dir = tempfile.mkdtemp()
with connect(video_component, output_dir=temp_dir) as client:
job = client.submit(
{
"video": "https://huggingface.co/spaces/gradio/video_component/resolve/main/files/a.mp4"
},
fn_index=0,
)
assert Path(job.result()["video"]).exists()
assert (
Path(temp_dir).resolve()
in Path(job.result()["video"]).resolve().parents
)
def test_progress_updates(self, progress_demo):
with connect(progress_demo) as client:
job = client.submit("hello", api_name="/predict")
statuses = []
while not job.done():
statuses.append(job.status())
time.sleep(0.02)
assert any(s.code == Status.PROGRESS for s in statuses)
assert any(s.progress_data is not None for s in statuses)
all_progress_data = [
p for s in statuses if s.progress_data for p in s.progress_data
]
count = 0
for i in range(20):
unit = ProgressUnit(
index=i, length=20, unit="steps", progress=None, desc=None
)
count += unit in all_progress_data
assert count
def test_cancel_from_client_queued(self, cancel_from_client_demo):
with connect(cancel_from_client_demo) as client:
start = time.time()
job = client.submit(api_name="/long")
while not job.done():
if job.status().code == Status.STARTING:
job.cancel()
break
with pytest.raises(CancelledError):
job.result()
# The whole prediction takes 10 seconds to run
# and does not iterate. So this tests that we can cancel
# halfway through a prediction
assert time.time() - start < 10
assert job.status().code == Status.CANCELLED
job = client.submit(api_name="/iterate")
iteration_count = 0
while not job.done():
if job.status().code == Status.ITERATING:
iteration_count += 1
if iteration_count == 3:
job.cancel()
break
time.sleep(0.5)
# Result for iterative jobs will raise there is an exception
with pytest.raises(CancelledError):
job.result()
# The whole prediction takes 10 seconds to run
# and does not iterate. So this tests that we can cancel
# halfway through a prediction
assert time.time() - start < 10
# Test that we did not iterate all the way to the end
assert all(o in [0, 1, 2, 3, 4, 5] for o in job.outputs())
assert job.status().code == Status.CANCELLED
def test_cancel_subsequent_jobs_state_reset(self, yield_demo):
with connect(yield_demo) as client:
job1 = client.submit("abcdefefadsadfs", api_name="/predict")
time.sleep(3)
job1.cancel()
assert len(job1.outputs()) > 0
assert len(job1.outputs()) < len("abcdefefadsadfs")
assert job1.status().code == Status.CANCELLED
job2 = client.submit("abcd", api_name="/predict")
assert len(job2.outputs()) == 0
while not job2.done():
time.sleep(0.1)
# Ran all iterations from scratch
assert job2.status().code == Status.FINISHED
assert len(job2.outputs()) == 4
@pytest.mark.xfail
def test_stream_audio(self, stream_audio):
with connect(stream_audio) as client:
job1 = client.submit(
"https://gradio-builds.s3.amazonaws.com/demo-files/bark_demo.mp4",
api_name="/predict",
)
assert Path(job1.result()).exists()
job2 = client.submit(
"https://gradio-builds.s3.amazonaws.com/demo-files/audio_sample.wav",
api_name="/predict",
)
assert Path(job2.result()).exists()
assert all(Path(p).exists() for p in job2.outputs())
@pytest.mark.xfail
def test_upload_file_private_space_v4(self):
client = Client(
src="gradio-tests/not-actually-private-file-uploadv4-sse",
)
with patch.object(
client.endpoints[0], "_upload", wraps=client.endpoints[0]._upload
) as upload:
with patch.object(
client.endpoints[0], "serialize", wraps=client.endpoints[0].serialize
) as serialize:
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
f.write("Hello from private space!")
output = client.submit(
1, "foo", f.name, api_name="/file_upload"
).result()
with open(output) as f:
assert f.read() == "Hello from private space!"
upload.assert_called_once()
assert all(f["is_file"] for f in serialize.return_value())
with patch.object(
client.endpoints[1], "_upload", wraps=client.endpoints[0]._upload
) as upload:
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
f.write("Hello from private space!")
with open(client.submit(f.name, api_name="/upload_btn").result()) as f:
assert f.read() == "Hello from private space!"
upload.assert_called_once()
with patch.object(
client.endpoints[2], "_upload", wraps=client.endpoints[0]._upload
) as upload:
# `delete=False` is required for Windows compat
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f1:
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f2:
f1.write("File1")
f2.write("File2")
r1, r2 = client.submit(
3,
[f1.name, f2.name],
"hello",
api_name="/upload_multiple",
).result()
with open(r1) as f:
assert f.read() == "File1"
with open(r2) as f:
assert f.read() == "File2"
upload.assert_called_once()
@pytest.mark.flaky
def test_upload_file_private_space(self):
client = Client(
src="gradio-tests/not-actually-private-file-upload",
hf_token=HF_TOKEN,
)
with patch.object(
client.endpoints[0], "serialize", wraps=client.endpoints[0].serialize
) as serialize:
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
f.write("Hello from private space!")
output = client.submit(1, "foo", f.name, api_name="/file_upload").result()
with open(output) as f:
assert f.read() == "Hello from private space!"
assert all(f["is_file"] for f in serialize.return_value())
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
f.write("Hello from private space!")
with open(client.submit(f.name, api_name="/upload_btn").result()) as f:
assert f.read() == "Hello from private space!"
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f1:
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f2:
f1.write("File1")
f2.write("File2")
r1, r2 = client.submit(
3,
[f1.name, f2.name],
"hello",
api_name="/upload_multiple",
).result()
with open(r1) as f:
assert f.read() == "File1"
with open(r2) as f:
assert f.read() == "File2"
@pytest.mark.flaky
def test_upload_file_upload_route_does_not_exist(self):
client = Client(
src="gradio-tests/not-actually-private-file-upload-old-version",
hf_token=HF_TOKEN,
)
with patch.object(
client.endpoints[0], "serialize", wraps=client.endpoints[0].serialize
) as serialize:
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
f.write("Hello from private space!")
client.submit(1, "foo", f.name, fn_index=0).result()
serialize.assert_called_once_with(1, "foo", f.name)
def test_state_without_serialize(self, stateful_chatbot):
with connect(stateful_chatbot, serialize=False) as client:
initial_history = [["", None]]
message = "Hello"
ret = client.predict(message, initial_history, api_name="/submit")
assert ret == ("", [["", None], ["Hello", "I love you"]])
def test_can_call_mounted_app_via_api(self):
def greet(name):
return "Hello " + name + "!"
gradio_app = gr.Interface(
fn=greet,
inputs=gr.Textbox(lines=2, placeholder="Name Here..."),
outputs="text",
)
app = FastAPI()
app = gr.mount_gradio_app(app, gradio_app, path="/test/gradio")
config = uvicorn.Config(
app=app,
port=8000,
log_level="info",
)
server = Server(config=config)
# Using the gradio Server class to not have
# to implement code again to run uvicorn in a separate thread
# However, that means we need to set this flag to prevent
# run_in_thread_from_blocking
server.started = True
try:
server.run_in_thread()
time.sleep(1)
client = Client("http://127.0.0.1:8000/test/gradio/")
assert client.predict("freddy") == "Hello freddy!"
finally:
server.thread.join(timeout=1)
def test_predict_with_space_with_api_name_false(self):
client = Client("gradio-tests/client-bool-api-name-error")
assert client.predict("Hello!", api_name="/run") == "Hello!"
assert client.predict("Freddy", api_name="/say_hello") == "hello"
def test_return_layout_component(self, hello_world_with_group):
with connect(hello_world_with_group) as demo:
assert demo.predict("Freddy", api_name="/greeting") == "Hello Freddy"
assert demo.predict(api_name="/show_group") == ()
def test_return_layout_and_state_components(
self, hello_world_with_state_and_accordion
):
with connect(hello_world_with_state_and_accordion) as demo:
assert demo.predict("Freddy", api_name="/greeting") == ("Hello Freddy", 1)
assert demo.predict("Abubakar", api_name="/greeting") == (
"Hello Abubakar",
2,
)
assert demo.predict(api_name="/open") == 3
assert demo.predict(api_name="/close") == 4
assert demo.predict("Ali", api_name="/greeting") == ("Hello Ali", 5)
def test_long_response_time_with_gr_info_and_big_payload(
self, long_response_with_info
):
with connect(long_response_with_info) as demo:
assert demo.predict(api_name="/predict") == "\ta\nb" * 90000
def test_queue_full_raises_error(self):
demo = gr.Interface(lambda s: f"Hello {s}", "textbox", "textbox").queue(
max_size=1
)
with connect(demo) as client:
with pytest.raises(QueueError):
job1 = client.submit("Freddy", api_name="/predict")
job2 = client.submit("Abubakar", api_name="/predict")
job3 = client.submit("Pete", api_name="/predict")
wait([job1, job2, job3])
job1.result()
job2.result()
job3.result()
def test_json_parse_error(self):
data = (
"Bonjour Olivier, tu as l'air bien r\u00e9veill\u00e9 ce matin. Tu veux que je te pr\u00e9pare tes petits-d\u00e9j.\n",
None,
)
def return_bad():
return data
demo = gr.Interface(return_bad, None, ["text", "text"])
with connect(demo) as client:
pred = client.predict(api_name="/predict")
assert pred[0] == data[0]
class TestStatusUpdates:
@patch("gradio_client.client.Endpoint.make_end_to_end_fn")
def test_messages_passed_correctly(self, mock_make_end_to_end_fn, calculator_demo):
now = datetime.now()
messages = [
StatusUpdate(
code=Status.STARTING,
eta=None,
rank=None,
success=None,
queue_size=None,
time=now,
progress_data=None,
),
StatusUpdate(
code=Status.SENDING_DATA,
eta=None,
rank=None,
success=None,
queue_size=None,
time=now + timedelta(seconds=1),
progress_data=None,
),
StatusUpdate(
code=Status.IN_QUEUE,
eta=3,
rank=2,
queue_size=2,
success=None,
time=now + timedelta(seconds=2),
progress_data=None,
),
StatusUpdate(
code=Status.IN_QUEUE,
eta=2,
rank=1,
queue_size=1,
success=None,
time=now + timedelta(seconds=3),
progress_data=None,
),
StatusUpdate(
code=Status.ITERATING,
eta=None,
rank=None,
queue_size=None,
success=None,
time=now + timedelta(seconds=3),
progress_data=None,
),
StatusUpdate(
code=Status.FINISHED,
eta=None,
rank=None,
queue_size=None,
success=True,
time=now + timedelta(seconds=4),
progress_data=None,
),
]
class MockEndToEndFunction:
def __init__(self, communicator: Communicator):
self.communicator = communicator
def __call__(self, *args, **kwargs):
for m in messages:
with self.communicator.lock:
self.communicator.job.latest_status = m
time.sleep(0.1)
mock_make_end_to_end_fn.side_effect = MockEndToEndFunction
with connect(calculator_demo) as client:
job = client.submit(5, "add", 6, api_name="/predict")
statuses = []
while not job.done():
statuses.append(job.status())
time.sleep(0.09)
assert all(s in messages for s in statuses)
@patch("gradio_client.client.Endpoint.make_end_to_end_fn")
def test_messages_correct_two_concurrent(
self, mock_make_end_to_end_fn, calculator_demo
):
now = datetime.now()
messages_1 = [
StatusUpdate(
code=Status.STARTING,
eta=None,
rank=None,
success=None,
queue_size=None,
time=now,
progress_data=None,
),
StatusUpdate(
code=Status.FINISHED,
eta=None,
rank=None,
queue_size=None,
success=True,
time=now + timedelta(seconds=4),
progress_data=None,
),
]
messages_2 = [
StatusUpdate(
code=Status.IN_QUEUE,
eta=3,
rank=2,
queue_size=2,
success=None,
time=now + timedelta(seconds=2),
progress_data=None,
),
StatusUpdate(
code=Status.IN_QUEUE,
eta=2,
rank=1,
queue_size=1,
success=None,
time=now + timedelta(seconds=3),
progress_data=None,
),
]
class MockEndToEndFunction:
n_counts = 0
def __init__(self, communicator: Communicator):
self.communicator = communicator
self.messages = (
messages_1 if MockEndToEndFunction.n_counts == 0 else messages_2
)
MockEndToEndFunction.n_counts += 1
def __call__(self, *args, **kwargs):
for m in self.messages:
with self.communicator.lock:
print(f"here: {m}")
self.communicator.job.latest_status = m
time.sleep(0.1)
mock_make_end_to_end_fn.side_effect = MockEndToEndFunction
with connect(calculator_demo) as client:
job_1 = client.submit(5, "add", 6, api_name="/predict")
job_2 = client.submit(11, "subtract", 1, api_name="/predict")
statuses_1 = []
statuses_2 = []
while not (job_1.done() and job_2.done()):
statuses_1.append(job_1.status())
statuses_2.append(job_2.status())
time.sleep(0.05)
assert all(s in messages_1 for s in statuses_1)
class TestAPIInfo:
@pytest.mark.parametrize("trailing_char", ["/", ""])
def test_test_endpoint_src(self, trailing_char):
src = "https://gradio-calculator.hf.space" + trailing_char
client = Client(src=src)
assert client.endpoints[0].root_url == "https://gradio-calculator.hf.space/"
@pytest.mark.flaky
def test_numerical_to_label_space(self):
client = Client("gradio-tests/titanic-survival")
assert client.view_api(return_format="dict") == {
"named_endpoints": {
"/predict": {
"parameters": [
{
"label": "Sex",
"type": {"type": "string"},
"python_type": {"type": "str", "description": ""},
"component": "Radio",
"example_input": "Howdy!",
"serializer": "StringSerializable",
},
{
"label": "Age",
"type": {"type": "number"},
"python_type": {"type": "int | float", "description": ""},
"component": "Slider",
"example_input": 5,
"serializer": "NumberSerializable",
},
{
"label": "Fare (british pounds)",
"type": {"type": "number"},
"python_type": {"type": "int | float", "description": ""},
"component": "Slider",
"example_input": 5,
"serializer": "NumberSerializable",
},
],
"returns": [
{
"label": "output",
"type": {"type": {}, "description": "any valid json"},
"python_type": {
"type": "str",
"description": "filepath to JSON file",
},
"component": "Label",
"serializer": "JSONSerializable",
}
],
},
"/predict_1": {
"parameters": [
{
"label": "Sex",
"type": {"type": "string"},
"python_type": {"type": "str", "description": ""},
"component": "Radio",
"example_input": "Howdy!",
"serializer": "StringSerializable",
},
{
"label": "Age",
"type": {"type": "number"},
"python_type": {"type": "int | float", "description": ""},
"component": "Slider",
"example_input": 5,
"serializer": "NumberSerializable",
},
{
"label": "Fare (british pounds)",
"type": {"type": "number"},
"python_type": {"type": "int | float", "description": ""},
"component": "Slider",
"example_input": 5,
"serializer": "NumberSerializable",
},
],
"returns": [
{
"label": "output",
"type": {"type": {}, "description": "any valid json"},
"python_type": {
"type": "str",
"description": "filepath to JSON file",
},
"component": "Label",
"serializer": "JSONSerializable",
}
],
},
"/predict_2": {
"parameters": [
{
"label": "Sex",
"type": {"type": "string"},
"python_type": {"type": "str", "description": ""},
"component": "Radio",
"example_input": "Howdy!",
"serializer": "StringSerializable",
},
{
"label": "Age",
"type": {"type": "number"},
"python_type": {"type": "int | float", "description": ""},
"component": "Slider",
"example_input": 5,
"serializer": "NumberSerializable",
},
{
"label": "Fare (british pounds)",
"type": {"type": "number"},
"python_type": {"type": "int | float", "description": ""},
"component": "Slider",
"example_input": 5,
"serializer": "NumberSerializable",
},
],
"returns": [
{
"label": "output",
"type": {"type": {}, "description": "any valid json"},
"python_type": {
"type": "str",
"description": "filepath to JSON file",
},
"component": "Label",
"serializer": "JSONSerializable",
}
],
},
},
"unnamed_endpoints": {},
}
def test_state_does_not_appear(self, state_demo):
with connect(state_demo) as client:
api_info = client.view_api(return_format="dict")
assert isinstance(api_info, dict)
for parameter in api_info["named_endpoints"]["/predict"]["parameters"]:
assert parameter["component"] != "State"
@pytest.mark.flaky
def test_private_space(self):
client = Client(
"gradio-tests/not-actually-private-space",
)
assert len(client.endpoints) == 3
assert len([e for e in client.endpoints if e.is_valid]) == 2
assert len([e for e in client.endpoints if e.is_valid and e.api_name]) == 1
assert client.view_api(return_format="dict") == {
"named_endpoints": {
"/predict": {
"parameters": [
{
"label": "x",
"type": {"type": "string"},
"python_type": {"type": "str", "description": ""},
"component": "Textbox",
"example_input": "Howdy!",
"serializer": "StringSerializable",
}
],
"returns": [
{
"label": "output",
"type": {"type": "string"},
"python_type": {"type": "str", "description": ""},
"component": "Textbox",
"serializer": "StringSerializable",
}
],
}
},
"unnamed_endpoints": {},
}
def test_api_info_of_local_demo(self, calculator_demo):
with connect(calculator_demo) as client:
api_info = client.view_api(return_format="dict")
assert isinstance(api_info, dict)
assert api_info["named_endpoints"]["/predict"] == {
"parameters": [
{
"label": "num1",
"type": {"type": "number"},
"python_type": {"type": "float", "description": ""},
"component": "Number",
"example_input": 3,
},
{
"label": "operation",
"type": {
"enum": ["add", "subtract", "multiply", "divide"],
"title": "Radio",
"type": "string",
},
"python_type": {
"type": "Literal['add', 'subtract', 'multiply', 'divide']",
"description": "",
},
"component": "Radio",
"example_input": "add",
},
{
"label": "num2",
"type": {"type": "number"},
"python_type": {"type": "float", "description": ""},
"component": "Number",
"example_input": 3,
},
],
"returns": [
{
"label": "output",
"type": {"type": "number"},
"python_type": {"type": "float", "description": ""},
"component": "Number",
}
],
}
assert api_info["unnamed_endpoints"] == {}
def test_unnamed_endpoints_use_fn_index(self, count_generator_demo):
with connect(count_generator_demo) as client:
info = client.view_api(return_format="str")
assert "fn_index" not in info
assert "api_name" in info
def test_api_false_endpoints_do_not_appear(self, count_generator_no_api):
with connect(count_generator_no_api) as client:
info = client.view_api(return_format="dict")
assert len(info["named_endpoints"]) == 0
def test_api_false_endpoints_cannot_be_accessed_with_fn_index(self, increment_demo):
with connect(increment_demo) as client:
with pytest.raises(ValueError):
client.submit(1, fn_index=2)
def test_file_io(self, file_io_demo):
with connect(file_io_demo) as client:
info = client.view_api(return_format="dict")
inputs = info["named_endpoints"]["/predict"]["parameters"]
outputs = info["named_endpoints"]["/predict"]["returns"]
assert inputs[0]["type"]["type"] == "array"
assert inputs[0]["python_type"] == {
"type": "List[filepath]",
"description": "",
}
assert isinstance(inputs[0]["example_input"], list)
assert isinstance(inputs[0]["example_input"][0], str)
assert inputs[1]["python_type"] == {
"type": "filepath",
"description": "",
}
assert isinstance(inputs[1]["example_input"], str)
assert outputs[0]["python_type"] == {
"type": "List[filepath]",
"description": "",
}
assert outputs[0]["type"]["type"] == "array"
assert outputs[1]["python_type"] == {
"type": "filepath",
"description": "",
}
def test_layout_components_in_output(self, hello_world_with_group):
with connect(hello_world_with_group) as client:
info = client.view_api(return_format="dict")
assert info == {
"named_endpoints": {
"/greeting": {
"parameters": [
{
"label": "name",
"type": {"type": "string"},
"python_type": {"type": "str", "description": ""},
"component": "Textbox",
"example_input": "Hello!!",
}
],
"returns": [
{
"label": "greeting",
"type": {"type": "string"},
"python_type": {"type": "str", "description": ""},
"component": "Textbox",
}
],
},
"/show_group": {"parameters": [], "returns": []},
},
"unnamed_endpoints": {},
}
def test_layout_and_state_components_in_output(
self, hello_world_with_state_and_accordion
):
with connect(hello_world_with_state_and_accordion) as client:
info = client.view_api(return_format="dict")
assert info == {
"named_endpoints": {
"/greeting": {
"parameters": [
{
"label": "name",
"type": {"type": "string"},
"python_type": {"type": "str", "description": ""},
"component": "Textbox",
"example_input": "Hello!!",
}
],
"returns": [
{
"label": "greeting",
"type": {"type": "string"},
"python_type": {"type": "str", "description": ""},
"component": "Textbox",
},
{
"label": "count",
"type": {"type": "number"},
"python_type": {
"type": "float",
"description": "",
},
"component": "Number",
},
],
},
"/open": {
"parameters": [],
"returns": [
{
"label": "count",
"type": {"type": "number"},
"python_type": {
"type": "float",
"description": "",
},
"component": "Number",
}
],
},
"/close": {
"parameters": [],
"returns": [
{
"label": "count",
"type": {"type": "number"},
"python_type": {
"type": "float",
"description": "",
},
"component": "Number",
}
],
},
},
"unnamed_endpoints": {},
}
class TestEndpoints:
@pytest.mark.flaky
def test_upload(self):
client = Client(
src="gradio-tests/not-actually-private-file-upload",
)
response = MagicMock(status_code=200)
response.json.return_value = [
"file1",
"file2",
"file3",
"file4",
"file5",
"file6",
"file7",
]
with patch("httpx.post", MagicMock(return_value=response)):
with patch("builtins.open", MagicMock()):
with patch.object(pathlib.Path, "name") as mock_name:
mock_name.side_effect = lambda x: x
results = client.endpoints[0]._upload(
["pre1", ["pre2", "pre3", "pre4"], ["pre5", "pre6"], "pre7"]
)
res = []
for re in results:
if isinstance(re, list):
res.append([r["name"] for r in re])
else:
res.append(re["name"])
assert res == [
"file1",
["file2", "file3", "file4"],
["file5", "file6"],
"file7",
]
@pytest.mark.flaky
def test_upload_v4(self):
client = Client(
src="gradio-tests/not-actually-private-file-uploadv4-sse",
)
response = MagicMock(status_code=200)
response.json.return_value = [
"file1",
"file2",
"file3",
"file4",
"file5",
"file6",
"file7",
]
with patch("httpx.post", MagicMock(return_value=response)):
with patch("builtins.open", MagicMock()):
with patch.object(pathlib.Path, "name") as mock_name:
mock_name.side_effect = lambda x: x
results = client.endpoints[0]._upload(
["pre1", ["pre2", "pre3", "pre4"], ["pre5", "pre6"], "pre7"]
)
res = []
for re in results:
if isinstance(re, list):
res.append([r["path"] for r in re])
else:
res.append(re["path"])
assert res == [
"file1",
["file2", "file3", "file4"],
["file5", "file6"],
"file7",
]
cpu = huggingface_hub.SpaceHardware.CPU_BASIC
class TestDuplication:
@pytest.mark.flaky
@patch("huggingface_hub.get_space_runtime", return_value=MagicMock(hardware=cpu))
@patch("gradio_client.client.Client.__init__", return_value=None)
def test_new_space_id(self, mock_init, mock_runtime):
Client.duplicate(
"gradio/calculator",
"test",
hf_token=HF_TOKEN,
)
mock_runtime.assert_any_call("gradio/calculator", token=HF_TOKEN)
mock_init.assert_called()
Client.duplicate(
"gradio/calculator",
"gradio-tests/test",
hf_token=HF_TOKEN,
)
mock_runtime.assert_any_call("gradio/calculator", token=HF_TOKEN)
mock_init.assert_called()
@pytest.mark.flaky
@patch("gradio_client.utils.set_space_timeout")
@patch("huggingface_hub.get_space_runtime", return_value=MagicMock(hardware=cpu))
@patch("gradio_client.client.Client.__init__", return_value=None)
def test_dont_set_timeout_if_default_hardware(
self, mock_init, mock_runtime, mock_set_timeout
):
Client.duplicate(
"gradio/calculator",
"test",
)
mock_set_timeout.assert_not_called()
@pytest.mark.flaky
@patch("huggingface_hub.request_space_hardware")
@patch("gradio_client.utils.set_space_timeout")
@patch(
"huggingface_hub.get_space_runtime",
return_value=MagicMock(hardware=huggingface_hub.SpaceHardware.CPU_UPGRADE),
)
@patch("gradio_client.client.Client.__init__", return_value=None)
def test_set_timeout_if_not_default_hardware(
self, mock_init, mock_runtime, mock_set_timeout, mock_request_hardware
):
Client.duplicate(
"gradio/calculator",
"test",
hardware="cpu-upgrade",
sleep_timeout=15,
hf_token=HF_TOKEN,
)
assert mock_set_timeout.call_count == 1
_, called_kwargs = mock_set_timeout.call_args
assert called_kwargs["timeout_in_seconds"] == 15 * 60
@pytest.mark.flaky
@patch("huggingface_hub.add_space_secret")
@patch("huggingface_hub.duplicate_space")
@patch("gradio_client.client.Client.__init__", return_value=None)
@patch("gradio_client.utils.set_space_timeout")
def test_add_secrets(self, mock_time, mock_init, mock_duplicate, mock_add_secret):
with pytest.raises(RepositoryNotFoundError):
name = str(uuid.uuid4())
Client.duplicate(
"gradio/calculator",
name,
hf_token=HF_TOKEN,
secrets={"test_key": "test_value", "test_key2": "test_value2"},
)
mock_add_secret.assert_called_with(
f"gradio-tests/{name}",
"test_key",
"test_value",
token=HF_TOKEN,
)
mock_add_secret.assert_any_call(
f"gradio-tests/{name}",
"test_key2",
"test_value2",
token=HF_TOKEN,
)