Parallelism and batch jobs
In order to get your answers as quickly as possible, you probably want to run some kind of parallelism on your jobs.
There are two options at your disposal for this.
- The streaming option
- The dataset option
Streaming
In order to maximize the speed of inference, instead of running many HTTP requests it will be more efficient to stream your data to the API. This will require the use of websockets on your end. Internally we’re using a queue system where machines can variously pull work, seamlessly using parallelism for you. This is meant as a batching mechanism and a single stream should be open at any give time. If multiple streams are open, requests will go to either without any guarantee. This is intended as it allows recovering from a stream cut. Simply reinitializing the stream will recover results, everything that was sent is being processed even if you are not connected anymore. So make sure you don’t send item multiple times other wise you will be billed multiple times.
Here is a small example:
# !pip install websockets
import asyncio
import json
import uuid
import websockets
MODEL_ID = "facebook/bart-large-mnli"
COMPUTE_TYPE = "cpu" # or "gpu"
async def send(websocket, payloads):
# You need to login with a first message as headers are not forwarded
# for websockets
await websocket.send(f"Bearer {API_TOKEN}".encode("utf-8"))
for payload in payloads:
await websocket.send(json.dumps(payload).encode("utf-8"))
print("Sent ")
async def recv(websocket, last_id):
outputs = []
while True:
data = await websocket.recv()
payload = json.loads(data)
if payload["type"] == "results":
# {"type": "results", "outputs": JSONFormatted results, "id": the id we sent}
print(payload["outputs"])
outputs.append(payload["outputs"])
if payload["id"] == last_id:
return outputs
else:
# {"type": "status", "message": "Some information about the queue"}
print(f"< {payload['message']}")
pass
async def main():
uri = f"wss://api-inference.huggingface.co/bulk/stream/{COMPUTE_TYPE}/{MODEL_ID}"
async with websockets.connect(uri) as websocket:
# inputs and parameters are classic, "id" is a way to track that query
payloads = [
{
"id": str(uuid.uuid4()),
"inputs": "This is a test about a new dress shop than opened up on 5th avenue",
"parameters": {"candidate_labels": ["medical", "fashion", "politics"]},
}
for i in range(10)
]
last_id = payloads[-1]["id"]
future = send(websocket, payloads)
future_r = recv(websocket, last_id)
_, outputs = await asyncio.gather(future, future_r)
results = [out["labels"][0] for out in outputs]
return results
loop = asyncio.get_event_loop()
if loop.is_running():
# When running in notebooks
import nest_asyncio
nest_asyncio.apply()
results = loop.run_until_complete(main())
# All results are "fashion"
The messages you need to send need to contain inputs keys.
Optionnally you can specifiy id key that will be sent back with the result. We try to maintain the ordering of results as you sent, but it’s better to be sure, the id key is there for that.
Optionnally, you can specify parameters key that
corresponds to detailed_parameters
of
the pipeline you are using.
The received messages will always contain a type key.
- status message: These messages will contain a message key that will inform you about the current status of the job
- results message: These messages will contain the
actual results of the computation. id will contain the
id you have sent (or one will be generated automatically).
outputs will contain the result like it would be sent
by the HTTP endpoint. See
detailed_parameters
for more information.
Dataset
If you are running regularly against the same dataset to check differences between models or drifts we recommend using a dataset .
Or use any of the 2000 available datasets: here.
The outputs of this method will automatically create a private dataset on your account, and use git mechanisms to store versions of the various outputs.
import requests
model_id = "facebook/wav2vec2-base-960h"
API_URL = f"https://api-inference.huggingface.co/bulk/run/cpu/{model_id}"
headers = {"Authorization": f"Bearer {API_TOKEN}"}
data = json.dumps(
{
"dataset_name": "ApiInferenceTest/asr_dummy",
"dataset_config": "asr",
"dataset_split": "test",
"dataset_column": "file",
}
)
response = requests.request("POST", API_URL, headers=headers, data=data)
data = json.loads(response.content)
self.assertIn("jobid", data, f"Expected some job id, got {data}")
STATUS_URL = f"https://api-inference.huggingface.co/bulk/status/{data['jobid']}"
status = requests.request("GET", STATUS_URL, headers=headers)
# results will contain a `jobid` to track the job, and `bulk_name` the name of the output
# dataset, accessible at https://huggingface.co/datasets/{bulk_name}
# status == {"status":{"active":1,"start_time":1627479091.0}}