|
|
|
|
|
|
|
import os |
|
import sys |
|
import time |
|
from pathlib import Path |
|
from typing import Any, Optional |
|
|
|
import torch |
|
|
|
|
|
wd = Path(__file__).parent.parent.resolve() |
|
sys.path.append(str(wd)) |
|
|
|
from modello_italia import Italia, ItaliaConfig, Tokenizer |
|
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
|
MI_SYSTEM_PROMPT_SHORT = ( |
|
"Tu sei Modello Italia, un modello di linguaggio naturale addestrato da iGenius." |
|
) |
|
|
|
|
|
def multinomial_num_samples_1(probs: torch.Tensor) -> torch.Tensor: |
|
if torch._dynamo.is_compiling(): |
|
|
|
distribution = torch.empty_like(probs).exponential_(1) |
|
return torch.argmax(probs / distribution, dim=-1, keepdim=True) |
|
return torch.multinomial(probs, num_samples=1) |
|
|
|
|
|
def sample( |
|
logits: torch.Tensor, temperature: float = 1.0, top_k: Optional[int] = None |
|
) -> torch.Tensor: |
|
logits = logits[0, -1] |
|
|
|
if top_k is not None: |
|
v, i = torch.topk(logits, min(top_k, logits.size(-1))) |
|
|
|
logits = torch.full_like(logits, float("-inf")).scatter_(-1, i, v) |
|
|
|
if temperature > 0.0: |
|
probs = torch.nn.functional.softmax(logits / temperature, dim=-1) |
|
return multinomial_num_samples_1(probs) |
|
return torch.argmax(logits, dim=-1, keepdim=True) |
|
|
|
|
|
def next_token( |
|
model: Italia, input_pos: torch.Tensor, x: torch.Tensor, **kwargs: Any |
|
) -> torch.Tensor: |
|
logits = model(x, input_pos) |
|
next = sample(logits, **kwargs) |
|
return next.to(dtype=x.dtype) |
|
|
|
|
|
@torch.inference_mode() |
|
def generate( |
|
model: Italia, |
|
prompt: torch.Tensor, |
|
tokenizer: Tokenizer, |
|
max_returned_tokens: int, |
|
*, |
|
temperature: float = 1.0, |
|
top_k: Optional[int] = None, |
|
eos_id: Optional[int] = None, |
|
) -> torch.Tensor: |
|
"""Takes a conditioning sequence (prompt) as input and continues to generate as many tokens as requested. |
|
|
|
The implementation of this function is modified from A. Karpathy's nanoGPT. |
|
|
|
Args: |
|
model: The model to use. |
|
prompt: Tensor of shape (T) with indices of the prompt sequence. |
|
max_returned_tokens: The maximum number of tokens to return (given plus generated). |
|
tokenizer: Tokenizer instance to decode generated tokens |
|
temperature: Scales the predicted logits by 1 / temperature. |
|
top_k: If specified, only sample among the tokens with the k highest probabilities. |
|
""" |
|
T = prompt.size(0) |
|
assert max_returned_tokens > T |
|
|
|
device = prompt.device |
|
tokens = [prompt] |
|
input_pos = torch.tensor([T], device=device) |
|
token = next_token( |
|
model, |
|
torch.arange(0, T, device=device), |
|
prompt.view(1, -1), |
|
temperature=temperature, |
|
top_k=top_k, |
|
).clone() |
|
tokens.append(token) |
|
for _ in range(2, max_returned_tokens - T + 1): |
|
token = next_token( |
|
model, input_pos, token.view(1, -1), temperature=temperature, top_k=top_k |
|
).clone() |
|
tokens.append(token) |
|
|
|
if token == tokenizer.eos_id: |
|
break |
|
os.system('cls' if os.name == 'nt' else 'clear') |
|
print(tokenizer.decode(torch.cat(tokens)[T:])) |
|
input_pos = input_pos.add_(1) |
|
return torch.cat(tokens) |
|
|
|
|
|
@torch.inference_mode() |
|
def main( |
|
prompt: str = "Ciao, chi sei?", |
|
*, |
|
num_samples: int = 1, |
|
max_new_tokens: int = 200, |
|
top_k: Optional[int] = 200, |
|
temperature: float = 0.4, |
|
checkpoint_dir: Path = Path("."), |
|
) -> None: |
|
"""Generates text samples based on a pre-trained model and tokenizer. |
|
|
|
Args: |
|
prompt: The prompt string to use for generating the samples. |
|
num_samples: The number of text samples to generate. |
|
max_new_tokens: The number of generation steps to take. |
|
top_k: The number of top most probable tokens to consider in the sampling process. |
|
temperature: A value controlling the randomness of the sampling process. Higher values result in more random |
|
samples. |
|
checkpoint_dir: The checkpoint directory to load. |
|
""" |
|
|
|
config = ItaliaConfig() |
|
checkpoint_path = checkpoint_dir / "italia.bin" |
|
tokenizer = Tokenizer(checkpoint_dir) |
|
prompt = f"<|system|>{MI_SYSTEM_PROMPT_SHORT}\n<|user|>{prompt}\n<|assistant|>" |
|
encoded = tokenizer.encode(prompt, device=device) |
|
prompt_length = encoded.size(0) |
|
max_returned_tokens = prompt_length + max_new_tokens |
|
|
|
print(f"Loading model {str(checkpoint_path)!r}") |
|
|
|
t0 = time.perf_counter() |
|
|
|
model = Italia(config) |
|
model.load_state_dict(torch.load(checkpoint_path, mmap=True)) |
|
model.to(device) |
|
|
|
print( |
|
f"Time to instantiate model: {time.perf_counter() - t0:.02f} seconds.", |
|
file=sys.stderr, |
|
) |
|
model.max_seq_length = max_returned_tokens |
|
model.set_kv_cache(batch_size=1, device=device) |
|
model.eval() |
|
|
|
for _ in range(num_samples): |
|
t0 = time.perf_counter() |
|
y = generate( |
|
model, |
|
encoded, |
|
tokenizer, |
|
max_returned_tokens, |
|
temperature=temperature, |
|
top_k=top_k, |
|
) |
|
t = time.perf_counter() - t0 |
|
for block in model.transformer.h: |
|
block.attn.kv_cache.reset_parameters() |
|
|
|
|
|
tokens_generated = y.size(0) - prompt_length |
|
print(f"\nTime for inference: {t:.02f} sec total, {tokens_generated / t:.02f} tokens/sec") |
|
|
|
|
|
if __name__ == "__main__": |
|
from jsonargparse import CLI |
|
|
|
torch.set_float32_matmul_precision("high") |
|
CLI(main) |