INTEL / generation_utils.py
malvika2003's picture
Upload folder using huggingface_hub
53d77b0 verified
raw
history blame
6.99 kB
# generation_utils.py
from threading import Thread
from time import perf_counter
from typing import List
import gradio as gr
from transformers import AutoTokenizer, TextIteratorStreamer
import numpy as np
import os
def get_special_token_id(tokenizer: AutoTokenizer, key: str) -> int:
"""
Gets the token ID for a given string that has been added to the tokenizer as a special token.
Args:
tokenizer (PreTrainedTokenizer): the tokenizer
key (str): the key to convert to a single token
Raises:
ValueError: if more than one ID was generated
Returns:
int: the token ID for the given key
"""
token_ids = tokenizer.encode(key)
if len(token_ids) > 1:
raise ValueError(f"Expected only a single token for '{key}' but found {token_ids}")
return token_ids[0]
def estimate_latency(
current_time: float,
current_perf_text: str,
new_gen_text: str,
per_token_time: List[float],
num_tokens: int,
) -> tuple:
"""
Helper function for performance estimation
Parameters:
current_time (float): This step time in seconds.
current_perf_text (str): Current content of performance UI field.
new_gen_text (str): New generated text.
per_token_time (List[float]): history of performance from previous steps.
num_tokens (int): Total number of generated tokens.
Returns:
update for performance text field
update for a total number of tokens
"""
num_current_toks = len(tokenizer.encode(new_gen_text))
num_tokens += num_current_toks
per_token_time.append(num_current_toks / current_time)
if len(per_token_time) > 10 and len(per_token_time) % 4 == 0:
current_bucket = per_token_time[:-10]
return (
f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}",
num_tokens,
)
return current_perf_text, num_tokens
def run_generation(
user_text: str,
top_p: float,
temperature: float,
top_k: int,
max_new_tokens: int,
perf_text: str,
tokenizer: AutoTokenizer,
tokenizer_kwargs: dict,
model_configuration: dict,
ov_model,
) -> tuple:
"""
Text generation function
Parameters:
user_text (str): User-provided instruction for generation.
top_p (float): Nucleus sampling. If < 1, keeps smallest set of most probable tokens.
temperature (float): Modulates logits distribution.
top_k (int): Number of highest probability vocabulary tokens to keep for top-k-filtering.
max_new_tokens (int): Maximum length of generated sequence.
perf_text (str): Content of text field for performance results.
tokenizer (AutoTokenizer): The tokenizer object.
tokenizer_kwargs (dict): Additional kwargs for tokenizer.
model_configuration (dict): Configuration for the model.
ov_model: Your OpenVINO model object.
Returns:
model_output (str): Model-generated text.
perf_text (str): Updated performance text.
"""
# Extract necessary configurations from model_configuration
response_key = model_configuration.get("response_key")
prompt_template = model_configuration.get("prompt_template", "{instruction}")
end_key = model_configuration.get("end_key")
end_key_token_id = None
# Handle special tokens
if response_key:
tokenizer_response_key = next(
(token for token in tokenizer.additional_special_tokens if token.startswith(response_key)),
None,
)
if tokenizer_response_key and end_key:
try:
end_key_token_id = get_special_token_id(tokenizer, end_key)
except ValueError:
pass
# Ensure defaults for token IDs
end_key_token_id = end_key_token_id or tokenizer.eos_token_id
pad_token_id = end_key_token_id or tokenizer.pad_token_id
# Prepare input prompt according to model expected template
prompt_text = prompt_template.format(instruction=user_text)
# Tokenize the user text.
model_inputs = tokenizer(prompt_text, return_tensors="pt", **tokenizer_kwargs)
# Start generation on a separate thread, so that we don't block the UI.
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
generate_kwargs = {
**model_inputs,
"streamer": streamer,
"max_new_tokens": max_new_tokens,
"do_sample": True,
"top_p": top_p,
"temperature": float(temperature),
"top_k": top_k,
"eos_token_id": end_key_token_id,
"pad_token_id": pad_token_id,
}
# Start generation in a separate thread
t = Thread(target=ov_model.generate, kwargs=generate_kwargs)
t.start()
# Pull the generated text from the streamer and update model output
model_output = ""
per_token_time = []
num_tokens = 0
start = perf_counter()
for new_text in streamer:
current_time = perf_counter() - start
model_output += new_text
perf_text, num_tokens = estimate_latency(current_time, perf_text, new_text, per_token_time, num_tokens)
yield model_output, perf_text
start = perf_counter()
return model_output, perf_text
def estimate_latency(
current_time: float,
current_perf_text: str,
new_gen_text: str,
per_token_time: List[float],
num_tokens: int,
):
"""
Helper function for performance estimation
Parameters:
current_time (float): This step time in seconds.
current_perf_text (str): Current content of performance UI field.
new_gen_text (str): New generated text.
per_token_time (List[float]): history of performance from previous steps.
num_tokens (int): Total number of generated tokens.
Returns:
update for performance text field
update for a total number of tokens
"""
num_current_toks = len(tokenizer.encode(new_gen_text))
num_tokens += num_current_toks
per_token_time.append(num_current_toks / current_time)
if len(per_token_time) > 10 and len(per_token_time) % 4 == 0:
current_bucket = per_token_time[:-10]
return (
f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}",
num_tokens,
)
return current_perf_text, num_tokens
def reset_textbox(instruction: str, response: str, perf: str):
"""
Helper function for resetting content of all text fields
Parameters:
instruction (str): Content of user instruction field.
response (str): Content of model response field.
perf (str): Content of performance info filed
Returns:
empty string for each placeholder
"""
return "", "", ""