|
import sys |
|
import time |
|
from pathlib import Path |
|
from typing import Literal, Optional |
|
|
|
import lightning as L |
|
import torch |
|
from lightning.fabric.plugins import BitsandbytesPrecision |
|
from lightning.fabric.strategies import FSDPStrategy |
|
|
|
|
|
wd = Path(__file__).parent.parent.resolve() |
|
sys.path.append(str(wd)) |
|
|
|
|
|
from .config import Config |
|
from .tokenizer import Tokenizer |
|
from .model import GPT, Block |
|
from .utils import ( |
|
check_valid_checkpoint_dir, |
|
get_default_supported_precision, |
|
gptq_quantization, |
|
load_checkpoint, |
|
) |
|
|
|
|
|
def sample(logits: torch.Tensor, temperature: float = 1.0, top_k: Optional[int] = None) -> torch.Tensor: |
|
logits = logits[0, -1] |
|
|
|
if top_k is not None: |
|
v, i = torch.topk(logits, min(top_k, logits.size(-1))) |
|
|
|
logits = torch.full_like(logits, float("-inf")).scatter_(-1, i, v) |
|
|
|
if temperature > 0.0: |
|
probs = torch.nn.functional.softmax(logits / temperature, dim=-1) |
|
return torch.multinomial(probs, num_samples=1) |
|
return torch.argmax(logits, dim=-1, keepdim=True) |
|
|
|
|
|
@torch.inference_mode() |
|
def generate( |
|
model: GPT, |
|
idx: torch.Tensor, |
|
max_returned_tokens: int, |
|
*, |
|
temperature: float = 1.0, |
|
top_k: Optional[int] = None, |
|
eos_id: Optional[int] = None, |
|
) -> torch.Tensor: |
|
"""Takes a conditioning sequence (prompt) as input and continues to generate as many tokens as requested. |
|
|
|
The implementation of this function is modified from A. Karpathy's nanoGPT. |
|
|
|
Args: |
|
model: The model to use. |
|
idx: Tensor of shape (T) with indices of the prompt sequence. |
|
max_returned_tokens: The maximum number of tokens to return (given plus generated). |
|
temperature: Scales the predicted logits by 1 / temperature. |
|
top_k: If specified, only sample among the tokens with the k highest probabilities. |
|
eos_id: If specified, stop generating any more token once the <eos> token is triggered. |
|
""" |
|
T = idx.size(0) |
|
assert max_returned_tokens > T |
|
if model.max_seq_length < max_returned_tokens - 1: |
|
|
|
|
|
|
|
raise NotImplementedError(f"max_seq_length {model.max_seq_length} needs to be >= {max_returned_tokens - 1}") |
|
|
|
device, dtype = idx.device, idx.dtype |
|
|
|
empty = torch.empty(max_returned_tokens, dtype=dtype, device=device) |
|
empty[:T] = idx |
|
idx = empty |
|
input_pos = torch.arange(0, T, device=device) |
|
|
|
|
|
for _ in range(max_returned_tokens - T): |
|
x = idx.index_select(0, input_pos).view(1, -1) |
|
|
|
|
|
logits = model(x, input_pos) |
|
idx_next = sample(logits, temperature, top_k).to(dtype=dtype) |
|
|
|
|
|
input_pos = input_pos[-1:] + 1 |
|
|
|
|
|
idx = idx.index_copy(0, input_pos, idx_next) |
|
|
|
|
|
if idx_next == eos_id: |
|
return idx[:input_pos] |
|
|
|
return idx |
|
|
|
|
|
def generate_for_app( |
|
prompt: str = "Hello, my name is", |
|
*, |
|
num_samples: int = 1, |
|
max_new_tokens: int = 50, |
|
top_k: Optional[int] = 200, |
|
temperature: float = 0.8, |
|
checkpoint_dir: Path = Path("checkpoints/stabilityai/stablelm-base-alpha-3b"), |
|
quantize: Optional[Literal["bnb.nf4", "bnb.nf4-dq", "bnb.fp4", "bnb.fp4-dq", "bnb.int8", "gptq.int4"]] = None, |
|
strategy: str = "auto", |
|
devices: int = 1, |
|
precision: Optional[str] = None, |
|
) -> str: |
|
"""Generates text samples based on a pre-trained model and tokenizer. |
|
|
|
Args: |
|
prompt: The prompt string to use for generating the samples. |
|
num_samples: The number of text samples to generate. |
|
max_new_tokens: The number of generation steps to take. |
|
top_k: The number of top most probable tokens to consider in the sampling process. |
|
temperature: A value controlling the randomness of the sampling process. Higher values result in more random |
|
samples. |
|
checkpoint_dir: The checkpoint directory to load. |
|
quantize: Whether to quantize the model and using which method: |
|
- bnb.nf4, bnb.nf4-dq, bnb.fp4, bnb.fp4-dq: 4-bit quantization from bitsandbytes |
|
- bnb.int8: 8-bit quantization from bitsandbytes |
|
- gptq.int4: 4-bit quantization from GPTQ |
|
for more details, see https://github.com/Lightning-AI/lit-gpt/blob/main/tutorials/quantize.md |
|
strategy: Indicates the Fabric strategy setting to use. |
|
devices: How many devices to use. |
|
precision: Indicates the Fabric precision setting to use. |
|
""" |
|
precision = precision or get_default_supported_precision(training=False) |
|
|
|
plugins = None |
|
if quantize is not None: |
|
if devices > 1: |
|
raise NotImplementedError( |
|
"Quantization is currently not supported for multi-GPU training. Please set devices=1 when using the" |
|
" --quantize flag." |
|
) |
|
if quantize.startswith("bnb."): |
|
if "mixed" in precision: |
|
raise ValueError("Quantization and mixed precision is not supported.") |
|
dtype = {"16-true": torch.float16, "bf16-true": torch.bfloat16, "32-true": torch.float32}[precision] |
|
plugins = BitsandbytesPrecision(quantize[4:], dtype) |
|
precision = None |
|
|
|
|
|
fabric = L.Fabric(devices=devices, precision=precision, strategy=strategy, plugins=plugins) |
|
fabric.launch() |
|
|
|
check_valid_checkpoint_dir(checkpoint_dir) |
|
|
|
config = Config.from_json(checkpoint_dir / "lit_config.json") |
|
|
|
if quantize == "gptq.int4": |
|
model_file = "lit_model_gptq.4bit.pth" |
|
if not (checkpoint_dir / model_file).is_file(): |
|
raise ValueError("Please run `python quantize/gptq.py` first") |
|
else: |
|
model_file = "lit_model.pth" |
|
checkpoint_path = checkpoint_dir / model_file |
|
|
|
fabric.print(f"Loading model {str(checkpoint_path)!r} with {config.__dict__}", file=sys.stderr) |
|
with fabric.init_module(empty_init=True), gptq_quantization(quantize == "gptq.int4"): |
|
model = GPT(config) |
|
|
|
model.eval() |
|
model = fabric.setup_module(model) |
|
|
|
tokenizer = Tokenizer(checkpoint_dir) |
|
encoded = tokenizer.encode(prompt, device=fabric.device) |
|
prompt_length = encoded.size(0) |
|
max_returned_tokens = prompt_length + max_new_tokens |
|
|
|
with fabric.init_tensor(): |
|
|
|
model.max_seq_length = max_returned_tokens |
|
|
|
L.seed_everything(1234) |
|
generated_text = [] |
|
for i in range(num_samples): |
|
with fabric.init_tensor(): |
|
|
|
model.set_kv_cache(batch_size=1) |
|
y = generate(model, encoded, max_returned_tokens, temperature=temperature, top_k=top_k) |
|
|
|
generated_text.append(tokenizer.decode(y)) |
|
tokens_generated = y.size(0) - prompt_length |
|
|
|
|
|
if fabric.device.type == "cuda": |
|
fabric.print(f"Memory used: {torch.cuda.max_memory_allocated() / 1e9:.02f} GB", file=sys.stderr) |
|
|
|
fabric.print(' '.join(generated_text)) |
|
|
|
return ' '.join(generated_text) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
from jsonargparse import CLI |
|
|
|
torch.set_float32_matmul_precision("high") |
|
CLI(generate_for_app) |