blitz_diffusion_builtin / externalmod3.py
John6666's picture
Upload 5 files
7b3233b verified
"""This module should not be used directly as its API is subject to change. Instead,
use the `gr.Blocks.load()` or `gr.Interface.load()` functions."""
from __future__ import annotations
import json
import re
import uuid
import warnings
from copy import deepcopy
from typing import TYPE_CHECKING, Callable, Dict
import requests
import gradio
from gradio import components, utils
from gradio.exceptions import TooManyRequestsError
from gradio.external_utils import (
cols_to_rows,
encode_to_base64,
get_tabular_examples,
get_ws_fn,
postprocess_label,
rows_to_cols,
streamline_spaces_interface,
use_websocket,
)
from gradio.processing_utils import to_binary
if TYPE_CHECKING:
from gradio.blocks import Blocks
from gradio.interface import Interface
connect_timeout = 30
server_timeout = 600
def load_blocks_from_repo(
name: str, src: str |None = None, api_key: str = None, alias: str = None, **kwargs
) -> Blocks:
"""Creates and returns a Blocks instance from a Hugging Face model or Space repo."""
if src is None:
# Separate the repo type (e.g. "model") from repo name (e.g. "google/vit-base-patch16-224")
tokens = name.split("/")
assert (
len(tokens) > 1
), "Either `src` parameter must be provided, or `name` must be formatted as {src}/{repo name}"
src = tokens[0]
name = "/".join(tokens[1:])
factory_methods: Dict[str, Callable] = {
# for each repo type, we have a method that returns the Interface given the model name & optionally an api_key
"huggingface": from_model,
"models": from_model,
"spaces": from_spaces,
}
assert src.lower() in factory_methods, "parameter: src must be one of {}".format(
factory_methods.keys()
)
blocks: gradio.Blocks = factory_methods[src](name, api_key, alias, **kwargs)
return blocks
def from_model(model_name: str, api_key: str | None, alias: str | None, **kwargs):
model_url = "https://huggingface.co/{}".format(model_name)
api_url = "https://api-inference.huggingface.co/models/{}".format(model_name)
print("Fetching model from: {}".format(model_url))
headers = {"Authorization": f"Bearer {api_key}"} if api_key is not None else {}
# Checking if model exists, and if so, it gets the pipeline
response = requests.request("GET", api_url, headers=headers, timeout=(connect_timeout, server_timeout))
assert (
response.status_code == 200
), f"Could not find model: {model_name}. If it is a private or gated model, please provide your Hugging Face access token (https://huggingface.co/settings/tokens) as the argument for the `api_key` parameter."
p = response.json().get("pipeline_tag")
pipelines = {
"audio-classification": {
# example model: ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition
"inputs": components.Audio(source="upload", type="filepath", label="Input"),
"outputs": components.Label(label="Class"),
"preprocess": lambda i: to_binary,
"postprocess": lambda r: postprocess_label(
{i["label"].split(", ")[0]: i["score"] for i in r.json()}
),
},
"audio-to-audio": {
# example model: facebook/xm_transformer_sm_all-en
"inputs": components.Audio(source="upload", type="filepath", label="Input"),
"outputs": components.Audio(label="Output"),
"preprocess": to_binary,
"postprocess": encode_to_base64,
},
"automatic-speech-recognition": {
# example model: facebook/wav2vec2-base-960h
"inputs": components.Audio(source="upload", type="filepath", label="Input"),
"outputs": components.Textbox(label="Output"),
"preprocess": to_binary,
"postprocess": lambda r: r.json()["text"],
},
"feature-extraction": {
# example model: julien-c/distilbert-feature-extraction
"inputs": components.Textbox(label="Input"),
"outputs": components.Dataframe(label="Output"),
"preprocess": lambda x: {"inputs": x},
"postprocess": lambda r: r.json()[0],
},
"fill-mask": {
"inputs": components.Textbox(label="Input"),
"outputs": components.Label(label="Classification"),
"preprocess": lambda x: {"inputs": x},
"postprocess": lambda r: postprocess_label(
{i["token_str"]: i["score"] for i in r.json()}
),
},
"image-classification": {
# Example: google/vit-base-patch16-224
"inputs": components.Image(type="filepath", label="Input Image"),
"outputs": components.Label(label="Classification"),
"preprocess": to_binary,
"postprocess": lambda r: postprocess_label(
{i["label"].split(", ")[0]: i["score"] for i in r.json()}
),
},
"question-answering": {
# Example: deepset/xlm-roberta-base-squad2
"inputs": [
components.Textbox(lines=7, label="Context"),
components.Textbox(label="Question"),
],
"outputs": [
components.Textbox(label="Answer"),
components.Label(label="Score"),
],
"preprocess": lambda c, q: {"inputs": {"context": c, "question": q}},
"postprocess": lambda r: (r.json()["answer"], {"label": r.json()["score"]}),
},
"summarization": {
# Example: facebook/bart-large-cnn
"inputs": components.Textbox(label="Input"),
"outputs": components.Textbox(label="Summary"),
"preprocess": lambda x: {"inputs": x},
"postprocess": lambda r: r.json()[0]["summary_text"],
},
"text-classification": {
# Example: distilbert-base-uncased-finetuned-sst-2-english
"inputs": components.Textbox(label="Input"),
"outputs": components.Label(label="Classification"),
"preprocess": lambda x: {"inputs": x},
"postprocess": lambda r: postprocess_label(
{i["label"].split(", ")[0]: i["score"] for i in r.json()[0]}
),
},
"text-generation": {
# Example: gpt2
"inputs": components.Textbox(label="Input"),
"outputs": components.Textbox(label="Output"),
"preprocess": lambda x: {"inputs": x},
"postprocess": lambda r: r.json()[0]["generated_text"],
},
"text2text-generation": {
# Example: valhalla/t5-small-qa-qg-hl
"inputs": components.Textbox(label="Input"),
"outputs": components.Textbox(label="Generated Text"),
"preprocess": lambda x: {"inputs": x},
"postprocess": lambda r: r.json()[0]["generated_text"],
},
"translation": {
"inputs": components.Textbox(label="Input"),
"outputs": components.Textbox(label="Translation"),
"preprocess": lambda x: {"inputs": x},
"postprocess": lambda r: r.json()[0]["translation_text"],
},
"zero-shot-classification": {
# Example: facebook/bart-large-mnli
"inputs": [
components.Textbox(label="Input"),
components.Textbox(label="Possible class names (" "comma-separated)"),
components.Checkbox(label="Allow multiple true classes"),
],
"outputs": components.Label(label="Classification"),
"preprocess": lambda i, c, m: {
"inputs": i,
"parameters": {"candidate_labels": c, "multi_class": m},
},
"postprocess": lambda r: postprocess_label(
{
r.json()["labels"][i]: r.json()["scores"][i]
for i in range(len(r.json()["labels"]))
}
),
},
"sentence-similarity": {
# Example: sentence-transformers/distilbert-base-nli-stsb-mean-tokens
"inputs": [
components.Textbox(
value="That is a happy person", label="Source Sentence"
),
components.Textbox(
lines=7,
placeholder="Separate each sentence by a newline",
label="Sentences to compare to",
),
],
"outputs": components.Label(label="Classification"),
"preprocess": lambda src, sentences: {
"inputs": {
"source_sentence": src,
"sentences": [s for s in sentences.splitlines() if s != ""],
}
},
"postprocess": lambda r: postprocess_label(
{f"sentence {i}": v for i, v in enumerate(r.json())}
),
},
"text-to-speech": {
# Example: julien-c/ljspeech_tts_train_tacotron2_raw_phn_tacotron_g2p_en_no_space_train
"inputs": components.Textbox(label="Input"),
"outputs": components.Audio(label="Audio"),
"preprocess": lambda x: {"inputs": x},
"postprocess": encode_to_base64,
},
"text-to-image": {
# example model: osanseviero/BigGAN-deep-128
"inputs": components.Textbox(label="Input"),
"outputs": components.Image(label="Output"),
"preprocess": lambda x: {"inputs": x},
"postprocess": encode_to_base64,
},
"token-classification": {
# example model: huggingface-course/bert-finetuned-ner
"inputs": components.Textbox(label="Input"),
"outputs": components.HighlightedText(label="Output"),
"preprocess": lambda x: {"inputs": x},
"postprocess": lambda r: r, # Handled as a special case in query_huggingface_api()
},
}
if p in ["tabular-classification", "tabular-regression"]:
example_data = get_tabular_examples(model_name)
col_names, example_data = cols_to_rows(example_data)
example_data = [[example_data]] if example_data else None
pipelines[p] = {
"inputs": components.Dataframe(
label="Input Rows",
type="pandas",
headers=col_names,
col_count=(len(col_names), "fixed"),
),
"outputs": components.Dataframe(
label="Predictions", type="array", headers=["prediction"]
),
"preprocess": rows_to_cols,
"postprocess": lambda r: {
"headers": ["prediction"],
"data": [[pred] for pred in json.loads(r.text)],
},
"examples": example_data,
}
if p is None or not (p in pipelines):
raise ValueError("Unsupported pipeline type: {}".format(p))
pipeline = pipelines[p]
# https://github.com/huggingface/huggingface.js/tree/main/packages/tasks/src/tasks/text-to-image
# https://huggingface.co/docs/huggingface_hub/main/package_reference/inference_types
# https://huggingface.co/docs/api-inference/detailed_parameters
def query_huggingface_api(*params, **kwargs):
# Convert to a list of input components
data = pipeline["preprocess"](*params)
if isinstance(
data, dict
): # HF doesn't allow additional parameters for binary files (e.g. images or audio files)
data.update({"options": {"wait_for_model": True}})
if "negative_prompt" in kwargs.keys(): kwargs["negative_prompt"] = [kwargs["negative_prompt"]]
width = kwargs.pop("width") if "width" in kwargs.keys() else None
height = kwargs.pop("height") if "height" in kwargs.keys() else None
if width is not None and height is not None: kwargs["target_size"] = {"height": int(height), "width": int(width)} #
data.update({"parameters": kwargs.copy()})
data = json.dumps(data)
response = requests.request("POST", api_url, headers=headers, data=data, timeout=(connect_timeout, server_timeout))
if not (response.status_code == 200):
errors_json = response.json()
errors, warns = "", ""
if errors_json.get("error"):
errors = f", Error: {errors_json.get('error')}"
if errors_json.get("warnings"):
warns = f", Warnings: {errors_json.get('warnings')}"
raise ValueError(
f"Could not complete request to HuggingFace API, Status Code: {response.status_code}"
+ errors
+ warns
)
if (
p == "token-classification"
): # Handle as a special case since HF API only returns the named entities and we need the input as well
ner_groups = response.json()
input_string = params[0]
response = utils.format_ner_list(input_string, ner_groups)
output = pipeline["postprocess"](response)
return output
if alias is None:
query_huggingface_api.__name__ = model_name
else:
query_huggingface_api.__name__ = alias
interface_info = {
"fn": query_huggingface_api,
"inputs": pipeline["inputs"],
"outputs": pipeline["outputs"],
"title": model_name,
#"examples": pipeline.get("examples"),
}
kwargs = dict(interface_info, **kwargs)
kwargs["_api_mode"] = True # So interface doesn't run pre/postprocess.
interface = gradio.Interface(**kwargs)
return interface
def from_spaces(space_name: str, api_key: str | None, alias: str, **kwargs) -> Blocks:
space_url = "https://huggingface.co/spaces/{}".format(space_name)
print("Fetching Space from: {}".format(space_url))
headers = {}
if api_key is not None:
headers["Authorization"] = f"Bearer {api_key}"
iframe_url = (
requests.get(
f"https://huggingface.co/api/spaces/{space_name}/host", headers=headers
)
.json()
.get("host")
)
if iframe_url is None:
raise ValueError(
f"Could not find Space: {space_name}. If it is a private or gated Space, please provide your Hugging Face access token (https://huggingface.co/settings/tokens) as the argument for the `api_key` parameter."
)
r = requests.get(iframe_url, headers=headers)
result = re.search(
r"window.gradio_config = (.*?);[\s]*</script>", r.text
) # some basic regex to extract the config
try:
config = json.loads(result.group(1))
except AttributeError:
raise ValueError("Could not load the Space: {}".format(space_name))
if "allow_flagging" in config: # Create an Interface for Gradio 2.x Spaces
return from_spaces_interface(
space_name, config, alias, api_key, iframe_url, **kwargs
)
else: # Create a Blocks for Gradio 3.x Spaces
if kwargs:
warnings.warn(
"You cannot override parameters for this Space by passing in kwargs. "
"Instead, please load the Space as a function and use it to create a "
"Blocks or Interface locally. You may find this Guide helpful: "
"https://gradio.app/using_blocks_like_functions/"
)
return from_spaces_blocks(config, api_key, iframe_url)
def from_spaces_blocks(config: Dict, api_key: str | None, iframe_url: str) -> Blocks:
api_url = "{}/api/predict/".format(iframe_url)
headers = {"Content-Type": "application/json"}
if api_key is not None:
headers["Authorization"] = f"Bearer {api_key}"
ws_url = "{}/queue/join".format(iframe_url).replace("https", "wss")
ws_fn = get_ws_fn(ws_url, headers)
fns = []
for d, dependency in enumerate(config["dependencies"]):
if dependency["backend_fn"]:
def get_fn(outputs, fn_index, use_ws):
def fn(*data):
data = json.dumps({"data": data, "fn_index": fn_index})
hash_data = json.dumps(
{"fn_index": fn_index, "session_hash": str(uuid.uuid4())}
)
if use_ws:
result = utils.synchronize_async(ws_fn, data, hash_data)
output = result["data"]
else:
response = requests.post(api_url, headers=headers, data=data)
result = json.loads(response.content.decode("utf-8"))
try:
output = result["data"]
except KeyError:
if "error" in result and "429" in result["error"]:
raise TooManyRequestsError(
"Too many requests to the Hugging Face API"
)
raise KeyError(
f"Could not find 'data' key in response from external Space. Response received: {result}"
)
if len(outputs) == 1:
output = output[0]
return output
return fn
fn = get_fn(
deepcopy(dependency["outputs"]), d, use_websocket(config, dependency)
)
fns.append(fn)
else:
fns.append(None)
return gradio.Blocks.from_config(config, fns, iframe_url)
def from_spaces_interface(
model_name: str,
config: Dict,
alias: str,
api_key: str | None,
iframe_url: str,
**kwargs,
) -> Interface:
config = streamline_spaces_interface(config)
api_url = "{}/api/predict/".format(iframe_url)
headers = {"Content-Type": "application/json"}
if api_key is not None:
headers["Authorization"] = f"Bearer {api_key}"
# The function should call the API with preprocessed data
def fn(*data):
data = json.dumps({"data": data})
response = requests.post(api_url, headers=headers, data=data)
result = json.loads(response.content.decode("utf-8"))
try:
output = result["data"]
except KeyError:
if "error" in result and "429" in result["error"]:
raise TooManyRequestsError("Too many requests to the Hugging Face API")
raise KeyError(
f"Could not find 'data' key in response from external Space. Response received: {result}"
)
if (
len(config["outputs"]) == 1
): # if the fn is supposed to return a single value, pop it
output = output[0]
if len(config["outputs"]) == 1 and isinstance(
output, list
): # Needed to support Output.Image() returning bounding boxes as well (TODO: handle different versions of gradio since they have slightly different APIs)
output = output[0]
return output
fn.__name__ = alias if (alias is not None) else model_name
config["fn"] = fn
kwargs = dict(config, **kwargs)
kwargs["_api_mode"] = True
interface = gradio.Interface(**kwargs)
return interface
def gr_Interface_load(
name: str,
src: str | None = None,
hf_token: str | None = None,
alias: str | None = None,
**kwargs,
) -> Blocks:
try:
return load_blocks_from_repo(name, src, hf_token, alias)
except Exception as e:
print(e)
return gradio.Interface(lambda: None, ['text'], ['image'])
def list_uniq(l):
return sorted(set(l), key=l.index)
def get_status(model_name: str):
from huggingface_hub import InferenceClient
client = InferenceClient(timeout=10)
return client.get_model_status(model_name)
def is_loadable(model_name: str, force_gpu: bool = False):
try:
status = get_status(model_name)
except Exception as e:
print(e)
print(f"Couldn't load {model_name}.")
return False
gpu_state = isinstance(status.compute_type, dict) and "gpu" in status.compute_type.keys()
if status is None or status.state not in ["Loadable", "Loaded"] or (force_gpu and not gpu_state):
print(f"Couldn't load {model_name}. Model state:'{status.state}', GPU:{gpu_state}")
return status is not None and status.state in ["Loadable", "Loaded"] and (not force_gpu or gpu_state)
def find_model_list(author: str="", tags: list[str]=[], not_tag="", sort: str="last_modified", limit: int=30, force_gpu=False, check_status=False):
from huggingface_hub import HfApi
api = HfApi()
default_tags = ["diffusers"]
if not sort: sort = "last_modified"
limit = limit * 20 if check_status and force_gpu else limit * 5
models = []
try:
model_infos = api.list_models(author=author, task="text-to-image",
tags=list_uniq(default_tags + tags), cardData=True, sort=sort, limit=limit)
except Exception as e:
print(f"Error: Failed to list models.")
print(e)
return models
for model in model_infos:
if not model.private and not model.gated:
loadable = is_loadable(model.id, force_gpu) if check_status else True
if not_tag and not_tag in model.tags or not loadable: continue
models.append(model.id)
if len(models) == limit: break
return models