Test2 / app.py
johnpaulbin's picture
Update app.py
67c8829
#!/usr/bin/env python3
'''A native client simulating the plugin to use for testing the server'''
import asyncio
import itertools
import struct
import json
import time
import sys
import csv
from pathlib import Path
from pprint import pprint
from tqdm import tqdm
from flask import Flask, request, jsonify
import asyncio
from hypercorn.asyncio import serve
from hypercorn.config import Config
app = Flask(__name__)
class Timer:
"""Little helper class top measure runtime of async function calls and dump
all of those to a CSV.
"""
def __init__(self):
self.measurements = []
async def measure(self, coro, *details):
start = time.perf_counter()
result = await coro
end = time.perf_counter()
self.measurements.append([end - start, *details])
return result
def dump(self, fh):
# TODO stats? For now I just export to Excel or something
writer = csv.writer(fh)
writer.writerows(self.measurements)
class Client:
"""asyncio based native messaging client. Main interface is just calling
`request()` with the right parameters and awaiting the future it returns.
"""
def __init__(self, *args):
self.serial = itertools.count(1)
self.futures = {}
self.args = args
async def __aenter__(self):
self.proc = await asyncio.create_subprocess_exec(*self.args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
self.read_task = asyncio.create_task(self.reader())
return self
async def __aexit__(self, *args):
self.proc.stdin.close()
await self.proc.wait()
def request(self, command, data, *, update=lambda data: None):
message_id = next(self.serial)
message = json.dumps({"command": command, "id": message_id, "data": data}).encode()
# print(f"Sending: {message}", file=sys.stderr)
future = asyncio.get_running_loop().create_future()
self.futures[message_id] = future, update
self.proc.stdin.write(struct.pack("@I", len(message)))
self.proc.stdin.write(message)
return future
async def reader(self):
while True:
try:
raw_length = await self.proc.stdout.readexactly(4)
length = struct.unpack("@I", raw_length)[0]
raw_message = await self.proc.stdout.readexactly(length)
# print(f"Receiving: {raw_message.decode()}", file=sys.stderr)
message = json.loads(raw_message)
# Not cool if there is no response message "id" here
if not "id" in message:
continue
# print(f"Receiving response to {message['id']}", file=sys.stderr)
future, update = self.futures[message["id"]]
if "success" in message:
del self.futures[message["id"]]
if message["success"]:
future.set_result(message["data"])
else:
future.set_exception(Exception(message["error"]))
elif "update" in message:
update(message["data"])
except asyncio.IncompleteReadError:
break # Stop read loop if EOF is reached
except asyncio.CancelledError:
break # Also stop reading if we're cancelled
class TranslateLocally(Client):
"""TranslateLocally wrapper around Client that translates
our defined messages into functions with arguments.
"""
async def list_models(self, *, include_remote=False):
return await self.request("ListModels", {"includeRemote": bool(include_remote)})
async def translate(self, text, src=None, trg=None, *, model=None, pivot=None, html=False):
if src and trg:
if model or pivot:
raise InvalidArgumentException("Cannot combine src + trg and model + pivot arguments")
spec = {"src": str(src), "trg": str(trg)}
elif model:
if pivot:
spec = {"model": str(model), "pivot": str(pivot)}
else:
spec = {"model": str(model)}
else:
raise InvalidArgumentException("Missing src + trg or model argument")
result = await self.request("Translate", {**spec, "text": str(text), "html": bool(html)})
return result["target"]["text"]
async def download_model(self, model_id, *, update=lambda data: None):
return await self.request("DownloadModel", {"modelID": str(model_id)}, update=update)
def first(iterable, *default):
"""Returns the first value of anything iterable, or throws StopIteration
if it is empty. Or, if you specify a default argument, it will return that.
"""
return next(iter(iterable), *default) # passing as rest argument so it can be nothing and trigger StopIteration exception
def get_build():
"""Instantiate an asyncio TranslateLocally client that connects to
tranlateLocally in your local build directory.
"""
paths = [
Path("./translateLocally"),
Path(__file__).resolve().parent / Path("../build/translateLocally")
];
for path in paths:
if path.exists():
return TranslateLocally(path.resolve(), "-p", "--debug")
raise RuntimeError("Could not find translateLocally binary")
async def download_with_progress(tl, model, position):
"""tl.download but with a tqdm powered progress bar."""
with tqdm(position=position, desc=model["modelName"], unit="b", unit_scale=True, leave=False) as bar:
def update(data):
assert data["read"] <= data["size"]
bar.total = data["size"]
diff = data["read"] - bar.n
bar.update(diff)
return await tl.download_model(model["id"], update=update)
async def test():
"""Test TranslateLocally functionality."""
async with get_build() as tl:
models = await tl.list_models(include_remote=True)
pprint(models)
# Models necessary for tests, both direct & pivot
necessary_models = {("en", "de"), ("en", "es"), ("es", "en")}
# From all models available, pick one for every necessary language pair
# (preferring tiny ones) so we can make sure these are downloaded.
selected_models = {
(src,trg): first(sorted(
(
model
for model in models
if src in model["srcTags"] and trg == model["trgTag"]
),
key=lambda model: 0 if model["type"] == "tiny" else 1
))
for src, trg in necessary_models
}
pprint(selected_models)
# Download them. Even if they're already model['local'] == True, to test
# that in that case this is a no-op.
await asyncio.gather(*(
download_with_progress(tl, model, position)
for position, model in enumerate(selected_models.values())
))
print() # tqdm messes a lot with the print position, this makes it less bad
# Test whether the model list has been updated to reflect that the
# downloaded models are now local.
models = await tl.list_models(include_remote=True)
assert all(
model["local"]
for selected_model in selected_models.values()
for model in models
if model["id"] == selected_model["id"]
)
# Perform some translations, switching between the models
translations = await asyncio.gather(
tl.translate("Hello world!", "en", "de"),
tl.translate("Let's translate another sentence to German.", "en", "de"),
tl.translate("Sticks and stones may break my bones but words WILL NEVER HURT ME!", "en", "es"),
tl.translate("I <i>like</i> to drive my car. But I don't have one.", "en", "de", html=True),
tl.translate("¿Por qué no funciona bien?", "es", "de"),
tl.translate("This will be the last sentence of the day.", "en", "de"),
)
pprint(translations)
assert translations == [
"Hallo Welt!",
"Übersetzen wir einen weiteren Satz mit Deutsch.",
"Palos y piedras pueden romper mis huesos, pero las palabras NUNCA HURT ME.",
"Ich <i>fahre gerne</i> mein Auto. Aber ich habe keine.", #<i>fahre</i>???
"Warum funktioniert es nicht gut?",
"Dies wird der letzte Satz des Tages sein.",
]
# Test bad input
try:
await tl.translate("This is impossible to translate", "en", "xx")
assert False, "How are we able to translate to 'xx'???"
except Exception as e:
assert "Could not find the necessary translation models" in str(e)
print("Fin")
async def test_third_party():
"""Test whether TranslateLocally can switch between different types of
models. This test assumes you have the OPUS repository in your list:
https://object.pouta.csc.fi/OPUS-MT-models/app/models.json
"""
async with get_build() as tl:
models_to_try = [
'en-de-tiny',
'en-de-base',
'eng-fin-tiny', # model has broken model_info.json so won't work anyway :(
'eng-ukr-tiny',
]
models = await tl.list_models(include_remote=True)
# Select a model from the model list for each of models_to_try, but
# leave it out if there is no model available.
selected_models = {
shortname: model
for shortname in models_to_try
if (model := first((model for model in models if model["shortname"] == shortname), None))
}
await asyncio.gather(*(
download_with_progress(tl, model, position)
for position, model in enumerate(selected_models.values())
))
# TODO: Temporary filter to figure out 'failed' downloads. eng-fin-tiny
# has a broken JSON file so it will download correctly, but still not
# be available or show up in this list. We should probably make the
# download fail in that scenario.
models = await tl.list_models(include_remote=False)
for shortname in list(selected_models.keys()):
if not any(True for model in models if model["shortname"] == shortname):
print(f"Skipping {shortname} because it didn't show up in model list after downloading", file=sys.stderr)
del selected_models[shortname]
translations = await asyncio.gather(*[
tl.translate("This is a very simple test sentence", model=model["id"])
for model in selected_models.values()
])
pprint(list(zip(selected_models.keys(), translations)))
async def test_latency():
timer = Timer()
# Our line generator: just read Crime & Punishment from stdin :D
lines = (line.strip() for line in sys.stdin)
async with get_build() as tl:
for epoch in range(100):
print(f"Epoch {epoch}...", file=sys.stderr)
for batch_size in [1, 5, 10, 20, 50, 100]:
await asyncio.gather(*(
timer.measure(
tl.translate(line, "en", "de"),
epoch,
batch_size,
len(line.split(' ')))
for n, line in zip(range(batch_size), lines)
))
timer.dump(sys.stdout)
async def test_concurrency():
async with get_build() as tl:
fetch_one = tl.list_models(include_remote=True)
fetch_two = tl.list_models(include_remote=False)
fetch_three = tl.list_models(include_remote=True)
await asyncio.gather(fetch_one, fetch_two, fetch_three)
async def test_shutdown():
tasks = []
async with get_build() as tl:
for n in range(10):
print(f"Requesting translation {n}")
tasks.append(tl.request("Translate", {
"src": "en",
"trg": "de",
"text": f"This is simple sentence number {n}!",
"html": False
}))
print("Shutting down")
print("Shutdown complete")
for translation in asyncio.as_completed(tasks):
print(await translation)
print("Fin.")
async def test_concurrent_download():
"""Test parallel downloads."""
async with get_build() as tl:
models = await tl.list_models(include_remote=True)
remote = [model for model in models if not model["local"]]
downloads = [
tl.download_model(model["id"])
for model, _ in zip(remote, range(3))
]
await asyncio.gather(*downloads)
def run_async_function(func):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(func)
@app.route('/list_models', methods=['GET'])
def list_models():
include_remote = request.args.get('include_remote', 'false').lower() == 'true'
async def async_func():
async with get_build() as tl:
models = await tl.list_models(include_remote=include_remote)
return models
result = run_async_function(async_func())
return jsonify(result)
@app.route('/translate', methods=['POST'])
def translate():
data = request.get_json()
async def async_func():
async with get_build() as tl:
result = await tl.translate(
data['text'],
src=data.get('src'),
trg=data.get('trg'),
model=data.get('model'),
pivot=data.get('pivot'),
html=data.get('html', False)
)
return result
result = run_async_function(async_func())
return jsonify(result)
# Define more routes for other operations like download_model, etc.
if __name__ == "__main__":
config = Config()
config.bind = ["0.0.0.0:7860"] # You can specify the host and port here
asyncio.run(serve(app, config))