Spaces:
Runtime error
Runtime error
File size: 6,993 Bytes
53d77b0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# generation_utils.py
from threading import Thread
from time import perf_counter
from typing import List
import gradio as gr
from transformers import AutoTokenizer, TextIteratorStreamer
import numpy as np
import os
def get_special_token_id(tokenizer: AutoTokenizer, key: str) -> int:
"""
Gets the token ID for a given string that has been added to the tokenizer as a special token.
Args:
tokenizer (PreTrainedTokenizer): the tokenizer
key (str): the key to convert to a single token
Raises:
ValueError: if more than one ID was generated
Returns:
int: the token ID for the given key
"""
token_ids = tokenizer.encode(key)
if len(token_ids) > 1:
raise ValueError(f"Expected only a single token for '{key}' but found {token_ids}")
return token_ids[0]
def estimate_latency(
current_time: float,
current_perf_text: str,
new_gen_text: str,
per_token_time: List[float],
num_tokens: int,
) -> tuple:
"""
Helper function for performance estimation
Parameters:
current_time (float): This step time in seconds.
current_perf_text (str): Current content of performance UI field.
new_gen_text (str): New generated text.
per_token_time (List[float]): history of performance from previous steps.
num_tokens (int): Total number of generated tokens.
Returns:
update for performance text field
update for a total number of tokens
"""
num_current_toks = len(tokenizer.encode(new_gen_text))
num_tokens += num_current_toks
per_token_time.append(num_current_toks / current_time)
if len(per_token_time) > 10 and len(per_token_time) % 4 == 0:
current_bucket = per_token_time[:-10]
return (
f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}",
num_tokens,
)
return current_perf_text, num_tokens
def run_generation(
user_text: str,
top_p: float,
temperature: float,
top_k: int,
max_new_tokens: int,
perf_text: str,
tokenizer: AutoTokenizer,
tokenizer_kwargs: dict,
model_configuration: dict,
ov_model,
) -> tuple:
"""
Text generation function
Parameters:
user_text (str): User-provided instruction for generation.
top_p (float): Nucleus sampling. If < 1, keeps smallest set of most probable tokens.
temperature (float): Modulates logits distribution.
top_k (int): Number of highest probability vocabulary tokens to keep for top-k-filtering.
max_new_tokens (int): Maximum length of generated sequence.
perf_text (str): Content of text field for performance results.
tokenizer (AutoTokenizer): The tokenizer object.
tokenizer_kwargs (dict): Additional kwargs for tokenizer.
model_configuration (dict): Configuration for the model.
ov_model: Your OpenVINO model object.
Returns:
model_output (str): Model-generated text.
perf_text (str): Updated performance text.
"""
# Extract necessary configurations from model_configuration
response_key = model_configuration.get("response_key")
prompt_template = model_configuration.get("prompt_template", "{instruction}")
end_key = model_configuration.get("end_key")
end_key_token_id = None
# Handle special tokens
if response_key:
tokenizer_response_key = next(
(token for token in tokenizer.additional_special_tokens if token.startswith(response_key)),
None,
)
if tokenizer_response_key and end_key:
try:
end_key_token_id = get_special_token_id(tokenizer, end_key)
except ValueError:
pass
# Ensure defaults for token IDs
end_key_token_id = end_key_token_id or tokenizer.eos_token_id
pad_token_id = end_key_token_id or tokenizer.pad_token_id
# Prepare input prompt according to model expected template
prompt_text = prompt_template.format(instruction=user_text)
# Tokenize the user text.
model_inputs = tokenizer(prompt_text, return_tensors="pt", **tokenizer_kwargs)
# Start generation on a separate thread, so that we don't block the UI.
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
generate_kwargs = {
**model_inputs,
"streamer": streamer,
"max_new_tokens": max_new_tokens,
"do_sample": True,
"top_p": top_p,
"temperature": float(temperature),
"top_k": top_k,
"eos_token_id": end_key_token_id,
"pad_token_id": pad_token_id,
}
# Start generation in a separate thread
t = Thread(target=ov_model.generate, kwargs=generate_kwargs)
t.start()
# Pull the generated text from the streamer and update model output
model_output = ""
per_token_time = []
num_tokens = 0
start = perf_counter()
for new_text in streamer:
current_time = perf_counter() - start
model_output += new_text
perf_text, num_tokens = estimate_latency(current_time, perf_text, new_text, per_token_time, num_tokens)
yield model_output, perf_text
start = perf_counter()
return model_output, perf_text
def estimate_latency(
current_time: float,
current_perf_text: str,
new_gen_text: str,
per_token_time: List[float],
num_tokens: int,
):
"""
Helper function for performance estimation
Parameters:
current_time (float): This step time in seconds.
current_perf_text (str): Current content of performance UI field.
new_gen_text (str): New generated text.
per_token_time (List[float]): history of performance from previous steps.
num_tokens (int): Total number of generated tokens.
Returns:
update for performance text field
update for a total number of tokens
"""
num_current_toks = len(tokenizer.encode(new_gen_text))
num_tokens += num_current_toks
per_token_time.append(num_current_toks / current_time)
if len(per_token_time) > 10 and len(per_token_time) % 4 == 0:
current_bucket = per_token_time[:-10]
return (
f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}",
num_tokens,
)
return current_perf_text, num_tokens
def reset_textbox(instruction: str, response: str, perf: str):
"""
Helper function for resetting content of all text fields
Parameters:
instruction (str): Content of user instruction field.
response (str): Content of model response field.
perf (str): Content of performance info filed
Returns:
empty string for each placeholder
"""
return "", "", "" |