File size: 6,993 Bytes
53d77b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# generation_utils.py

from threading import Thread
from time import perf_counter
from typing import List
import gradio as gr
from transformers import AutoTokenizer, TextIteratorStreamer
import numpy as np
import os

def get_special_token_id(tokenizer: AutoTokenizer, key: str) -> int:
    """

    Gets the token ID for a given string that has been added to the tokenizer as a special token.



    Args:

        tokenizer (PreTrainedTokenizer): the tokenizer

        key (str): the key to convert to a single token



    Raises:

        ValueError: if more than one ID was generated



    Returns:

        int: the token ID for the given key

    """
    token_ids = tokenizer.encode(key)
    if len(token_ids) > 1:
        raise ValueError(f"Expected only a single token for '{key}' but found {token_ids}")
    return token_ids[0]


def estimate_latency(

    current_time: float,

    current_perf_text: str,

    new_gen_text: str,

    per_token_time: List[float],

    num_tokens: int,

) -> tuple:
    """

    Helper function for performance estimation



    Parameters:

      current_time (float): This step time in seconds.

      current_perf_text (str): Current content of performance UI field.

      new_gen_text (str): New generated text.

      per_token_time (List[float]): history of performance from previous steps.

      num_tokens (int): Total number of generated tokens.



    Returns:

      update for performance text field

      update for a total number of tokens

    """
    num_current_toks = len(tokenizer.encode(new_gen_text))
    num_tokens += num_current_toks
    per_token_time.append(num_current_toks / current_time)
    if len(per_token_time) > 10 and len(per_token_time) % 4 == 0:
        current_bucket = per_token_time[:-10]
        return (
            f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}",
            num_tokens,
        )
    return current_perf_text, num_tokens


def run_generation(

    user_text: str,

    top_p: float,

    temperature: float,

    top_k: int,

    max_new_tokens: int,

    perf_text: str,

    tokenizer: AutoTokenizer,

    tokenizer_kwargs: dict,

    model_configuration: dict,

    ov_model,

) -> tuple:
    """

    Text generation function



    Parameters:

      user_text (str): User-provided instruction for generation.

      top_p (float): Nucleus sampling. If < 1, keeps smallest set of most probable tokens.

      temperature (float): Modulates logits distribution.

      top_k (int): Number of highest probability vocabulary tokens to keep for top-k-filtering.

      max_new_tokens (int): Maximum length of generated sequence.

      perf_text (str): Content of text field for performance results.

      tokenizer (AutoTokenizer): The tokenizer object.

      tokenizer_kwargs (dict): Additional kwargs for tokenizer.

      model_configuration (dict): Configuration for the model.

      ov_model: Your OpenVINO model object.



    Returns:

      model_output (str): Model-generated text.

      perf_text (str): Updated performance text.

    """

    # Extract necessary configurations from model_configuration
    response_key = model_configuration.get("response_key")
    prompt_template = model_configuration.get("prompt_template", "{instruction}")
    end_key = model_configuration.get("end_key")
    end_key_token_id = None

    # Handle special tokens
    if response_key:
        tokenizer_response_key = next(
            (token for token in tokenizer.additional_special_tokens if token.startswith(response_key)),
            None,
        )
        if tokenizer_response_key and end_key:
            try:
                end_key_token_id = get_special_token_id(tokenizer, end_key)
            except ValueError:
                pass

    # Ensure defaults for token IDs
    end_key_token_id = end_key_token_id or tokenizer.eos_token_id
    pad_token_id = end_key_token_id or tokenizer.pad_token_id

    # Prepare input prompt according to model expected template
    prompt_text = prompt_template.format(instruction=user_text)

    # Tokenize the user text.
    model_inputs = tokenizer(prompt_text, return_tensors="pt", **tokenizer_kwargs)

    # Start generation on a separate thread, so that we don't block the UI.
    streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
    generate_kwargs = {
        **model_inputs,
        "streamer": streamer,
        "max_new_tokens": max_new_tokens,
        "do_sample": True,
        "top_p": top_p,
        "temperature": float(temperature),
        "top_k": top_k,
        "eos_token_id": end_key_token_id,
        "pad_token_id": pad_token_id,
    }

    # Start generation in a separate thread
    t = Thread(target=ov_model.generate, kwargs=generate_kwargs)
    t.start()

    # Pull the generated text from the streamer and update model output
    model_output = ""
    per_token_time = []
    num_tokens = 0
    start = perf_counter()

    for new_text in streamer:
        current_time = perf_counter() - start
        model_output += new_text
        perf_text, num_tokens = estimate_latency(current_time, perf_text, new_text, per_token_time, num_tokens)
        yield model_output, perf_text
        start = perf_counter()

    return model_output, perf_text
def estimate_latency(

    current_time: float,

    current_perf_text: str,

    new_gen_text: str,

    per_token_time: List[float],

    num_tokens: int,

):
    """

    Helper function for performance estimation



    Parameters:

      current_time (float): This step time in seconds.

      current_perf_text (str): Current content of performance UI field.

      new_gen_text (str): New generated text.

      per_token_time (List[float]): history of performance from previous steps.

      num_tokens (int): Total number of generated tokens.



    Returns:

      update for performance text field

      update for a total number of tokens

    """
    num_current_toks = len(tokenizer.encode(new_gen_text))
    num_tokens += num_current_toks
    per_token_time.append(num_current_toks / current_time)
    if len(per_token_time) > 10 and len(per_token_time) % 4 == 0:
        current_bucket = per_token_time[:-10]
        return (
            f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}",
            num_tokens,
        )
    return current_perf_text, num_tokens


def reset_textbox(instruction: str, response: str, perf: str):
    """

    Helper function for resetting content of all text fields



    Parameters:

      instruction (str): Content of user instruction field.

      response (str): Content of model response field.

      perf (str): Content of performance info filed



    Returns:

      empty string for each placeholder

    """
    return "", "", ""