Transformers documentation

Webサーバー用のパイプラインの使用

You are viewing v4.37.1 version. A newer version v4.47.1 is available.
Hugging Face's logo
Join the Hugging Face community

and get access to the augmented documentation experience

to get started

Webサーバー用のパイプラインの使用

推論エンジンの作成は複雑なトピックであり、"最適な"ソリューションはおそらく問題の領域に依存するでしょう。CPUまたはGPUを使用していますか?最低のレイテンシ、最高のスループット、多くのモデルのサポート、または特定のモデルの高度な最適化を望んでいますか? このトピックに取り組むための多くの方法があり、私たちが紹介するのは、おそらく最適なソリューションではないかもしれないが、始めるための良いデフォルトです。

重要なことは、Webサーバーはリクエストを待機し、受信したように扱うシステムであるため、データセットのように、イテレータを使用できることです。

通常、Webサーバーは並列処理(マルチスレッド、非同期など)されて、さまざまなリクエストを同時に処理します。一方、パイプライン(および主にその基礎となるモデル)は並列処理にはあまり適していません。それらは多くのRAMを使用するため、実行中に利用可能なリソースをすべて提供するか、計算集約型のジョブである場合に最適です。

Webサーバーは受信と送信の軽い負荷を処理し、実際の作業を1つのスレッドで処理するようにします。この例ではstarletteを使用します。実際のフレームワークはあまり重要ではありませんが、別のフレームワークを使用している場合は、同じ効果を得るためにコードを調整または変更する必要があるかもしれません。

server.pyを作成してください:

from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
from transformers import pipeline
import asyncio


async def homepage(request):
    payload = await request.body()
    string = payload.decode("utf-8")
    response_q = asyncio.Queue()
    await request.app.model_queue.put((string, response_q))
    output = await response_q.get()
    return JSONResponse(output)


async def server_loop(q):
    pipe = pipeline(model="bert-base-uncased")
    while True:
        (string, response_q) = await q.get()
        out = pipe(string)
        await response_q.put(out)


app = Starlette(
    routes=[
        Route("/", homepage, methods=["POST"]),
    ],
)


@app.on_event("startup")
async def startup_event():
    q = asyncio.Queue()
    app.model_queue = q
    asyncio.create_task(server_loop(q))

ここから始めることができます:

uvicorn server:app

そして、次のようにクエリできます:

curl -X POST -d "test [MASK]" http://localhost:8000/
#[{"score":0.7742936015129089,"token":1012,"token_str":".","sequence":"test."},...]

そして、これでウェブサーバーを作成する方法の良いアイデアを持っています!

本当に重要なのは、モデルを一度だけロードすることです。これにより、ウェブサーバー上にモデルのコピーがないため、不必要なRAMが使用されなくなります。 その後、キューイングメカニズムを使用して、動的バッチ処理を行うなど、いくつかのアイテムを蓄積してから推論を行うなど、高度な処理を行うことができます:

以下のコードサンプルは、可読性のために擬似コードのように書かれています。システムリソースに合理的かどうかを確認せずに実行しないでください!

(string, rq) = await q.get()
strings = []
queues = []
while True:
    try:
        (string, rq) = await asyncio.wait_for(q.get(), timeout=0.001)  # 1ms
    except asyncio.exceptions.TimeoutError:
        break
    strings.append(string)
    queues.append(rq)
strings
outs = pipe(strings, batch_size=len(strings))
for rq, out in zip(queues, outs):
    await rq.put(out)

まず第一に、通常はあまり良いアイデアではないバッチサイズの制限がありません。次に、タイムアウトはキューの取得ごとにリセットされるため、推論を実行する前に1ms以上待つ可能性があります(最初のリクエストの遅延に1ms分遅れが生じます)。

1msの締め切りを1回だけ持つのが良いでしょう。

これは、キューに何もない場合でも常に1ms待機しますが、キューに何もない場合に推論を開始したい場合は適していないかもしれません。ただし、バッチ処理が本当に重要な場合には意味があるかもしれません。再度、1つの最適な解決策は存在しません。

Few things you might want to consider

Error checking

本番環境では多くの問題が発生する可能性があります:メモリ不足、スペース不足、モデルの読み込みが失敗するかもしれません、クエリが誤っているかもしれません、クエリが正しい場合でもモデルの構成エラーのために実行に失敗するかもしれませんなど。

一般的には、サーバーがエラーをユーザーに出力すると良いため、これらのエラーを表示するための多くのtry..exceptステートメントを追加することは良いアイデアです。ただし、セキュリティコンテキストに応じてこれらのエラーをすべて表示することはセキュリティリスクになる可能性があることに注意してください。

Circuit breaking

Webサーバーは通常、過負荷時に正しいエラーを返す方が良いです。クエリを無期限に待つ代わりに適切なエラーを返します。長時間待つ代わりに503エラーを返すか、長時間待ってから504エラーを返すかです。

提案されたコードでは単一のキューがあるため、キューサイズを見ることは、Webサーバーが負荷に耐える前にエラーを返すための基本的な方法です。

Blocking the main thread

現在、PyTorchは非同期を認識していないため、計算はメインスレッドをブロックします。つまり、PyTorchが独自のスレッド/プロセスで実行されるようにすると良いでしょう。提案されたコードは、スレッドと非同期とキューがうまく連携しないため、これは行われていませんが、最終的には同じことを行います。

これは、単一のアイテムの推論が長い場合(>1秒)に重要です。この場合、推論中にすべてのクエリが1秒待たなければならないことを意味します。

Dynamic batching

一般的に、バッチ処理は1回のアイテムを1回渡すよりも改善されることは必ずしもありません(詳細はバッチ処理の詳細を参照)。しかし、正しい設定で使用すると非常に効果的です。APIではデフォルトで動的バッチ処理は行われません(遅延の機会が多すぎます)。しかし、非常に大規模なモデルであるBLOOM推論の場合、動的バッチ処理は重要です。これにより、すべてのユーザーにとってまともなエクスペリエンスを提供できます。

以上が、提供されたテキストのMarkdown形式の翻訳です。