|
""" |
|
Sample from a trained model |
|
""" |
|
import os |
|
import pickle |
|
from contextlib import nullcontext |
|
import torch |
|
import tiktoken |
|
from nanogpt.model import GPTConfig, GPT |
|
|
|
BASE_DIR = "nanogpt/" |
|
|
|
|
|
class NanoGptPlayer: |
|
def __init__(self, model_name: str, move_num_in_gamestate: bool=False): |
|
self.model_name = model_name |
|
|
|
|
|
init_from = "resume" |
|
out_dir = "out" |
|
input_dir = "addition" |
|
test_name = "test.txt" |
|
start = "12+44=" |
|
num_samples = 1 |
|
max_new_tokens = 6 |
|
temperature = 0.01 |
|
top_k = 200 |
|
seed = 1337 |
|
device = "cuda" |
|
|
|
dtype = "float16" |
|
compile = False |
|
exec( |
|
open(f"{BASE_DIR}configurator.py").read() |
|
) |
|
|
|
|
|
torch.manual_seed(seed) |
|
torch.cuda.manual_seed(seed) |
|
torch.backends.cuda.matmul.allow_tf32 = True |
|
torch.backends.cudnn.allow_tf32 = True |
|
device_type = ( |
|
"cuda" if "cuda" in device else "cpu" |
|
) |
|
ptdtype = { |
|
"float32": torch.float32, |
|
"bfloat16": torch.bfloat16, |
|
"float16": torch.float16, |
|
}[dtype] |
|
ctx = ( |
|
nullcontext() |
|
if device_type == "cpu" |
|
else torch.amp.autocast(device_type=device_type, dtype=ptdtype) |
|
) |
|
|
|
|
|
if init_from == "resume": |
|
|
|
|
|
ckpt_path = os.path.normpath(f"../../mamba.py/out/{self.model_name}") |
|
checkpoint = torch.load(ckpt_path, map_location=device) |
|
|
|
|
|
model = GPT(checkpoint["model_args"]) |
|
state_dict = checkpoint["model"] |
|
unwanted_prefix = "_orig_mod." |
|
for k, v in list(state_dict.items()): |
|
if k.startswith(unwanted_prefix): |
|
state_dict[k[len(unwanted_prefix) :]] = state_dict.pop(k) |
|
model.load_state_dict(state_dict) |
|
elif init_from.startswith("gpt2"): |
|
|
|
model = GPT.from_pretrained(init_from, dict(dropout=0.0)) |
|
|
|
model.eval() |
|
model.to(device) |
|
if compile: |
|
model = torch.compile(model) |
|
|
|
|
|
meta_path = os.path.join(BASE_DIR, "out", "meta.pkl") |
|
load_meta = os.path.exists(meta_path) |
|
if move_num_in_gamestate and load_meta: |
|
with open(meta_path, "rb") as f: |
|
meta = pickle.load(f) |
|
stoi, itos = meta["stoi"], meta["itos"] |
|
vocab_size = meta['vocab_size'] |
|
encode = lambda s: [stoi[c] for c in s] |
|
decode = lambda l: "".join([itos[i] for i in l]) |
|
else: |
|
stoi = {' ': 0, '.': 1, 'a': 2, 'b': 3, 'c': 4, 'd': 5, 'e': 6, 'f': 7, 'g': 8, 'h': 9, '1': 10, '2': 11, '3': 12, '4': 13, '5': 14, '6': 15, '7': 16, '8': 17, 'B': 18, 'N': 19, 'R': 20, 'Q': 21, 'K': 22, 'O': 23, 'x': 24, '+': 25, '#': 26, '=': 27} |
|
itos = {0: ' ', 1: '.', 2: 'a', 3: 'b', 4: 'c', 5: 'd', 6: 'e', 7: 'f', 8: 'g', 9: 'h', 10: '1', 11: '2', 12: '3', 13: '4', 14: '5', 15: '6', 16: '7', 17: '8', 18: 'B', 19: 'N', 20: 'R', 21: 'Q', 22: 'K', 23: 'O', 24: 'x', 25: '+', 26: '#', 27: '='} |
|
for s in stoi: |
|
assert itos[stoi[s]] == s |
|
vocab_size = len(stoi) |
|
print(f"Vocab size {vocab_size}") |
|
encode = lambda s: [stoi[c] for c in s.replace('-', '')] |
|
decode = lambda l: "".join([itos[i] for i in l]).replace("OOO", "O-O-O").replace("OO", "O-O") |
|
|
|
self.encode = encode |
|
self.decode = decode |
|
self.model = model |
|
self.ctx = ctx |
|
self.device = device |
|
|
|
def get_nanogpt_response(self, game_state: str, temperature: float) -> str: |
|
num_samples = 1 |
|
top_k = 200 |
|
max_new_tokens = 8 |
|
|
|
|
|
|
|
game_state = game_state.split("\n\n")[-1].strip() |
|
|
|
|
|
|
|
|
|
|
|
start_ids = self.encode(game_state) |
|
|
|
x = torch.tensor(start_ids, dtype=torch.long, device=self.device)[None, ...] |
|
with torch.no_grad(): |
|
with self.ctx: |
|
for k in range(num_samples): |
|
y = self.model.generate( |
|
x, max_new_tokens, temperature=temperature, top_k=top_k |
|
) |
|
|
|
model_response = self.decode(y[0].tolist()) |
|
|
|
|
|
|
|
model_response = model_response[len(game_state):].split(";")[0] |
|
return model_response |
|
|
|
def get_move_from_response(self, response: str) -> str: |
|
|
|
moves = response.split() |
|
first_move = moves[0] |
|
|
|
return first_move |
|
|
|
def get_move(self, board: str, game_state: str, temperature: float) -> str: |
|
completion = self.get_nanogpt_response(game_state, temperature) |
|
return self.get_move_from_response(completion) |
|
|
|
def get_config(self) -> dict: |
|
return {"model": self.model_name} |
|
|