HaileyStorm's picture
Added random move opening option
67472fe verified
raw
history blame
No virus
23.6 kB
import openai
import chess
import chess.engine
import os
import csv
import random
import time
import platform
# NOTE: LLAMA AND NANOGPT ARE EXPERIMENTAL PLAYERS, if not using them, comment them out
# from llama_module import BaseLlamaPlayer, LocalLlamaPlayer, LocalLoraLlamaPlayer
from nanogpt.nanogpt_module import NanoGptPlayer
from mamba_module import MambaPlayer
import gpt_query
from lczero.backends import Weights, Backend, GameState
import numpy as np
from typing import Optional, Tuple
from dataclasses import dataclass
@dataclass
class LegalMoveResponse:
move_san: Optional[str] = None
move_uci: Optional[chess.Move] = None
attempts: int = 0
is_resignation: bool = False
is_illegal_move: bool = False
# Define base Player class
class Player:
def get_move(self, board: chess.Board, game_state: str, temperature: float) -> str:
raise NotImplementedError
def get_config(self) -> dict:
raise NotImplementedError
class GPTPlayer(Player):
def __init__(self, model: str):
with open("gpt_inputs/api_key.txt", "r") as f:
openai.api_key = f.read().strip()
self.model = model
def get_move(
self, board: chess.Board, game_state: str, temperature: float
) -> Optional[str]:
response = get_gpt_response(game_state, self.model, temperature)
return get_move_from_gpt_response(response)
def get_config(self) -> dict:
return {"model": self.model}
class LC0PLayer(Player):
# "11258-32x4-se.pb.gz" = stockfish level 0- = skill 0
# "11258-48x5-se.pb.gz" = stockfish level 0+ = skill 1
# "11258-80x7-se.pb.gz" = stockfish level 1 = skill 2
# "11258-104x9-se.pb.gz" = stockfish level 2 = skill 3
# "TK-6430 aka 128x10-BPR-64M-6430000.pb.gz" = stockfish level 3 = skill 4
# "00af53b081e80147172e6f281c01daf5ca19ada173321438914c730370aa4267" = stockfish level 4 = skill 5
# "b2ec465d0fb5b5eb39d2e1e3f74041a5d2fc92d413b71aa7ea0b6fb082ccba9c" = stockfish level 5+ = skill 6
def __init__(self, skill):
self.skill = skill
network_paths = ["./lc0/build/release/11258-32x4-se.pb.gz", "./lc0/build/release/11258-48x5-se.pb.gz", "./lc0/build/release/11258-80x7-se.pb.gz", "./lc0/build/release/11258-104x9-se.pb.gz", "./lc0/build/release/TK-6430 aka 128x10-BPR-64M-6430000.pb.gz", "./lc0/build/release/00af53b081e80147172e6f281c01daf5ca19ada173321438914c730370aa4267", "./lc0/build/release/b2ec465d0fb5b5eb39d2e1e3f74041a5d2fc92d413b71aa7ea0b6fb082ccba9c"]
print(f"\n\nLoading lc0 network: {network_paths[skill]}\n\n")
self.weights = Weights(network_paths[skill])
self.backend = Backend(weights=self.weights)
self.gamestate = GameState()
def get_move(self, board: chess.Board, game_state: str, temperature: float):
self.gamestate = GameState(fen=board.fen())
input_planes = self.gamestate.as_input(self.backend)
result = self.backend.evaluate(input_planes)[0]
moves = self.gamestate.moves()
policy_indices = self.gamestate.policy_indices()
move_probs = np.array(result.p_softmax(*policy_indices))
best_move_idx = move_probs.argmax()
best_move = moves[best_move_idx]
return board.san(chess.Move.from_uci(best_move))
def get_config(self) -> dict:
return {"network": self.weights, "skill_level": self.skill, "play_time": 0}
class StockfishPlayer(Player):
@staticmethod
def get_stockfish_path() -> str:
"""
Determines the operating system and returns the appropriate path for Stockfish.
Returns:
str: Path to the Stockfish executable based on the operating system.
"""
if platform.system() == 'Linux':
return "/usr/games/stockfish"
elif platform.system() == 'Darwin': # Darwin is the system name for macOS
return "stockfish"
elif platform.system() == 'Windows':
return r"C:\Users\Haile\Downloads\stockfish\stockfish-windows-x86-64-avx2.exe"
else:
raise OSError("Unsupported operating system")
def __init__(self, skill_level: int, play_time: float):
self._skill_level = skill_level
self._play_time = play_time
# If getting started, you need to run brew install stockfish
stockfish_path = StockfishPlayer.get_stockfish_path()
self._engine = chess.engine.SimpleEngine.popen_uci(stockfish_path)
def get_move(
self, board: chess.Board, game_state: str, temperature: float
) -> Optional[str]:
if self._skill_level == -2:
legal_moves = list(board.legal_moves)
random_move = random.choice(legal_moves)
return board.san(random_move)
elif self._skill_level < 0:
self._engine.configure({"Skill Level": 0})
result = self._engine.play(
board, chess.engine.Limit(time=1e-8, depth=1, nodes=1)
)
else:
self._engine.configure({"Skill Level": self._skill_level})
result = self._engine.play(board, chess.engine.Limit(time=self._play_time))
if result.move is None:
return None
return board.san(result.move)
def get_config(self) -> dict:
return {"skill_level": self._skill_level, "play_time": self._play_time}
def close(self):
self._engine.quit()
class HumanPlayer(Player):
def get_move(self, board: chess.Board, game_state: str, temperature: float) -> str:
# Print board for human player
print(board)
while True:
move = input("Enter your move (SAN format): ")
try:
move_uci = board.parse_san(move)
if move_uci in board.legal_moves:
return move
except:
print("Illegal move, try again.")
def get_config(self) -> dict:
return {"player": "human"}
def get_gpt_response(game_state: str, model: str, temperature: float) -> Optional[str]:
# trying to prevent what I believe to be rate limit issues
if model == "gpt-4":
time.sleep(0.4)
response = gpt_query.get_gpt_response(game_state, model, temperature)
return response
def get_move_from_gpt_response(response: Optional[str]) -> Optional[str]:
if response is None:
return None
# Parse the response to get only the first move
moves = response.split()
first_move = moves[0] if moves else None
return first_move
def record_results(
board: chess.Board,
player_one: Player,
player_two: Player,
game_state: str,
player_one_illegal_moves: int,
player_one_illegal_attempts: int,
player_two_illegal_moves: int,
player_one_legal_moves: int,
player_two_legal_moves: int,
total_time: float,
player_one_resignation: bool,
player_two_resignation: bool,
player_one_failed_to_find_legal_move: bool,
player_two_failed_to_find_legal_move: bool,
total_moves: int,
illegal_moves: int,
opening_moves: int,
illegal_move_numbers: list[int]
):
unique_game_id = generate_unique_game_id()
(
player_one_title,
player_two_title,
player_one_time,
player_two_time,
) = get_player_titles_and_time(player_one, player_two)
if player_one_resignation or player_one_failed_to_find_legal_move:
result = "0-1"
player_one_score = 0
player_two_score = 1
elif player_two_resignation or player_two_failed_to_find_legal_move:
result = "1-0"
player_one_score = 1
player_two_score = 0
else:
result = board.result()
# Hmmm.... debating this one. Annoying if I leave it running and it fails here for some reason, probably involving some
# resignation / failed move situation I didn't think of
# -1e10 at least ensures it doesn't fail silently
if "-" in result:
player_one_score = result.split("-")[0]
player_two_score = result.split("-")[1]
elif result == "*": # Draw due to hitting max moves
player_one_score = 0#1/2
player_two_score = 1#1/2
else:
player_one_score = -1e10
player_two_score = -1e10
played_moves = player_one_illegal_moves + player_one_legal_moves
info_dict = {
"game_id": unique_game_id,
"transcript": game_state,
"result": result,
"player_one": player_one_title,
"player_two": player_two_title,
"player_one_time": player_one_time,
"player_two_time": player_two_time,
"player_one_score": player_one_score,
"player_two_score": player_two_score,
"player_one_illegal_moves": player_one_illegal_moves,
"player_two_illegal_moves": player_two_illegal_moves,
"player_one_legal_moves": player_one_legal_moves,
"player_two_legal_moves": player_two_legal_moves,
"player_one_resignation": player_one_resignation,
"player_two_resignation": player_two_resignation,
"player_one_failed_to_find_legal_move": player_one_failed_to_find_legal_move,
"player_two_failed_to_find_legal_move": player_two_failed_to_find_legal_move,
"game_title": f"{player_one_title} vs. {player_two_title}",
"number_of_moves": board.fullmove_number,
"p1_illegal_attempts": player_one_illegal_attempts,
"p1_avg_attempts_per_illegal": 0 if player_one_illegal_moves == 0 else player_one_illegal_attempts / float(player_one_illegal_moves),
"p1_illegal_attemtps_pct": 1.0 if played_moves == 0 else player_one_illegal_attempts / float(player_one_illegal_attempts + player_one_legal_moves),
"p1_illegal_moves_pct": 1.0 if played_moves == 0 else player_one_illegal_moves / float(played_moves),
"p1_first_illegal_move_num": illegal_move_numbers[0] if illegal_move_numbers else 0,
"p1_avg_illegal_move_num": np.average(illegal_move_numbers) if illegal_move_numbers else 0,
"time_taken": total_time,
"total_moves": total_moves,
"illegal_moves": illegal_moves,
}
if RUN_FOR_ANALYSIS:
csv_file_path = f"logs/{player_one_recording_name}_vs_{player_two_recording_name}"
csv_file_path = csv_file_path.replace(".", "_") # Because I'm using ckpt filenames for nanogpt models
csv_file_path += ".csv"
else:
csv_file_path = recording_file
# Determine if we need to write headers (in case the file doesn't exist yet)
write_headers = not os.path.exists(csv_file_path)
# Append the results to the CSV file
os.makedirs(os.path.dirname(csv_file_path), exist_ok=True)
with open(csv_file_path, "a", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=info_dict.keys())
if write_headers:
writer.writeheader()
writer.writerow(info_dict)
with open("game.txt", "w") as f:
f.write(game_state)
def generate_unique_game_id() -> str:
timestamp = int(time.time())
random_num = random.randint(1000, 9999) # 4-digit random number
return f"{timestamp}-{random_num}"
def get_player_titles_and_time(
player_one: Player, player_two: Player
) -> Tuple[str, str, Optional[float], Optional[float]]:
player_one_config = player_one.get_config()
player_two_config = player_two.get_config()
# For player one
if "model" in player_one_config:
player_one_title = player_one_config["model"]
player_one_time = None
else:
player_one_title = f"Stockfish {player_one_config['skill_level']}"
player_one_time = player_one_config["play_time"]
# For player two
if "model" in player_two_config:
player_two_title = player_two_config["model"]
player_two_time = None
else:
player_two_title = f"Stockfish {player_two_config['skill_level']}"
player_two_time = player_two_config["play_time"]
return (player_one_title, player_two_title, player_one_time, player_two_time)
used_openings = []
def random_book_opening(
game_state: str, board: chess.Board
) -> Tuple[str, chess.Board]:
global used_openings
with open("openings.csv", "r") as file:
lines = file.readlines()[1:] # Skip header
moves_string = random.choice(lines)
while moves_string in used_openings:
moves_string = random.choice(lines)
used_openings.append(moves_string)
if move_num_in_gamestate:
game_state = moves_string.rstrip() + " "
else:
game_state = ' '.join(['.' + m.split(".")[-1] if "." in m else m for m in moves_string.split()])
game_state = game_state.rstrip() + " "
# Splitting the moves string on spaces
tokens = moves_string.split()
for token in tokens:
# If the token contains a period, it's a move number + move combination
if "." in token:
move = token.split(".")[-1] # Take the move part after the period
else:
move = token
board.push_san(move)
return game_state.rstrip(), board, len(tokens) // 2
def add_random_moves(
game_state: str, board: chess.Board, num_moves: int = 20
) -> Tuple[str, chess.Board, int]:
for i in range(num_moves * 2): # Full moves to half moves
legal_moves = list(board.legal_moves)
if not legal_moves:
break
move = board.san(random.choice(legal_moves))
board.push(board.parse_san(move))
if board.turn == chess.BLACK:
game_state += f" {i//2 + 1}.{move}" if move_num_in_gamestate else f" .{move}"
else:
game_state += f" {move}"
game_state = game_state.strip()
return game_state, board, num_moves
# Return is (move_san, move_uci, attempts, is_resignation, is_illegal_move)
def get_legal_move(
player: Player,
board: chess.Board,
game_state: str,
player_one: bool,
max_attempts: int = 5,
) -> LegalMoveResponse:
"""Request a move from the player and ensure it's legal."""
move_san = None
move_uci = None
for attempt in range(max_attempts):
#print(f"get_legal_move: |{game_state}|")
move_san = player.get_move(
board, game_state, min(((attempt / max_attempts) * 1) + 0.001, 0.75)
)
# Sometimes when GPT thinks it's the end of the game, it will just output the result
# Like "1-0". If so, this really isn't an illegal move, so we'll add a check for that.
if move_san is not None:
if move_san == "1-0" or move_san == "0-1" or move_san == "1/2-1/2":
print(f"{move_san}, player has resigned")
return LegalMoveResponse(
move_san=None,
move_uci=None,
attempts=attempt,
is_resignation=True,
)
try:
move_uci = board.parse_san(move_san)
except Exception as e:
print(f"Error parsing move {move_san}: {e}")
# check if player is gpt-3.5-turbo-instruct
# only recording errors for gpt-3.5-turbo-instruct because it's errors are so rare
if player.get_config()["model"] == "gpt-3.5-turbo-instruct":
with open("gpt-3.5-turbo-instruct-illegal-moves.txt", "a") as f:
f.write(f"{game_state}\n{move_san}\n")
continue
if move_uci in board.legal_moves:
if player_one == False:
if not move_san.startswith(" "):
move_san = " " + move_san
else:
if move_san.startswith(" "):
move_san = move_san[1:]
return LegalMoveResponse(move_san, move_uci, attempt)
print(f"Illegal move: {move_san}")
# If we reach here, the player has made illegal moves for all attempts.
print(f"{player} provided illegal moves for {max_attempts} attempts.")
return LegalMoveResponse(
move_san=None, move_uci=None, attempts=max_attempts, is_illegal_move=True
)
def play_turn(
player: Player, board: chess.Board, game_state: str, player_one: bool
) -> Tuple[str, bool, bool, int]:
result = get_legal_move(player, board, game_state, player_one, 5)
illegal_moves = result.attempts
move_san = result.move_san
move_uci = result.move_uci
resignation = result.is_resignation
failed_to_find_legal_move = result.is_illegal_move
if resignation:
print(f"{player} resigned with result: {board.result()}")
elif failed_to_find_legal_move:
print(f"Game over: 5 consecutive illegal moves from {player}")
elif move_san is None or move_uci is None:
print(f"Game over: {player} failed to find a legal move")
else:
board.push(move_uci)
game_state += move_san
print(move_san, end=" ")
return game_state, resignation, failed_to_find_legal_move, illegal_moves
def play_game(
player_one: Player,
player_two: Player,
max_games: int = 10,
book_opening: bool = False,
random_opening: bool = False,
random_opening_moves: int = 20,
):
for z in range(max_games):
print(f"\nGame {z} of {max_games}\n")
with open("gpt_inputs/prompt.txt", "r") as f:
game_state = f.read()
board = chess.Board()
if book_opening:
game_state, board, opening_moves = random_book_opening(game_state, board)
elif random_opening:
game_state, board, opening_moves = add_random_moves(game_state, board, random_opening_moves)
else:
opening_moves = 0
player_one_illegal_moves = 0
player_one_illegal_attempts = 0
player_two_illegal_moves = 0
player_one_legal_moves = 0
player_two_legal_moves = 0
player_one_resignation = False
player_two_resignation = False
player_one_failed_to_find_legal_move = False
player_two_failed_to_find_legal_move = False
start_time = time.time()
total_moves = 0
illegal_moves = 0
illegal_move_numbers = []
print_for_human = isinstance(player_one, HumanPlayer) or isinstance(player_two, HumanPlayer)
while not board.is_game_over():
if print_for_human:
print(board)
with open("game.txt", "w") as f:
f.write(game_state)
current_move_num = f"{board.fullmove_number if move_num_in_gamestate else ''}."
total_moves += 1
# I increment legal moves here so player_two isn't penalized for the game ending before its turn
player_one_legal_moves += 1
player_two_legal_moves += 1
# this if statement may be overkill, just trying to get format to exactly match PGN notation
if board.fullmove_number != 1:
game_state += " "
game_state += current_move_num
#print(f"|{game_state}|")
#print(f"{current_move_num}", end=" ")
(
game_state,
player_one_resignation,
player_one_failed_to_find_legal_move,
illegal_moves_one,
) = play_turn(player_one, board, game_state, player_one=True)
player_one_illegal_moves += 1 if illegal_moves_one > 0 else 0
player_one_illegal_attempts += illegal_moves_one
if illegal_moves_one != 0:
player_one_legal_moves -= 1
illegal_move_numbers.append(board.fullmove_number)
if (
board.is_game_over()
or player_one_resignation
or player_one_failed_to_find_legal_move
):
break
(
game_state,
player_two_resignation,
player_two_failed_to_find_legal_move,
illegal_moves_two,
) = play_turn(player_two, board, game_state, player_one=False)
player_two_illegal_moves += 1 if illegal_moves_two > 0 else 0
if illegal_moves_two != 0:
player_two_legal_moves -= 1
if (
board.is_game_over()
or player_two_resignation
or player_two_failed_to_find_legal_move
):
break
print("\n", end="")
if total_moves > MAX_MOVES:
break
end_time = time.time()
total_time = end_time - start_time
print(f"\nGame over. Total time: {total_time} seconds")
print(f"Result: {board.result()}")
print(board)
print()
record_results(
board,
player_one,
player_two,
game_state,
player_one_illegal_moves,
player_one_illegal_attempts,
player_two_illegal_moves,
player_one_legal_moves,
player_two_legal_moves,
total_time,
player_one_resignation,
player_two_resignation,
player_one_failed_to_find_legal_move,
player_two_failed_to_find_legal_move,
total_moves,
illegal_moves,
opening_moves,
illegal_move_numbers
)
if isinstance(player_one, StockfishPlayer):
player_one.close()
if isinstance(player_two, StockfishPlayer):
player_two.close()
# print(game_state)
RUN_FOR_ANALYSIS = True
MAX_MOVES = 999 # Due to nanogpt max input length of 1024
recording_file = "logs/determine.csv" # default recording file. Because we are using list [player_ones], recording_file is overwritten
# player_one_recording_name = "ckpt_8.pt"
#player_ones = ["ckpt_iter_20000.pt","ckpt_iter_40000.pt","ckpt_iter_60000.pt","ckpt_iter_80000.pt"] #["ckpt.pt"]
player_ones = ["50M/ckpt_9120050b.pt"]
player_two_recording_name = "lc0_sweep" #"stockfish_sweep"
move_num_in_gamestate = False
book_opening = True
random_opening = False
random_opening_moves = 20
if __name__ == "__main__":
for nanogpt_player in player_ones:
for i in range(1): #range(11):
num_games = 540
# player_one = GPTPlayer(model="gpt-3.5-turbo-instruct")
# player_one = LocalLlamaPlayer(model_name="meta-llama/Llama-2-7b-hf")
# player_one = LocalLoraLlamaPlayer("meta-llama/Llama-2-7b-hf", "/workspace/axolotl/lora2-out")
# player_one = GPTPlayer(model="gpt-4")
# player_one = StockfishPlayer(skill_level=-1, play_time=0.1)
player_one_recording_name = nanogpt_player
#player_one = NanoGptPlayer(model_name=player_one_recording_name, move_num_in_gamestate=move_num_in_gamestate)
#player_one_recording_name = "xformer_" + nanogpt_player
player_one = MambaPlayer(model_name=player_one_recording_name, move_num_in_gamestate=move_num_in_gamestate)
#player_two = StockfishPlayer(skill_level=i, play_time=0.1)
player_two = LC0PLayer(skill=i)
# player_two = GPTPlayer(model="gpt-4")
# player_two = GPTPlayer(model="gpt-3.5-turbo-instruct")
#print(f"\n\nSTARTING GAMES AGAINST STOCKFISH LEVEL {i}\n\n")
print(f"\n\nSTARTING GAMES AGAINST LC0 LEVEL {i}\n\n")
play_game(player_one, player_two, num_games, book_opening=book_opening, random_opening=random_opening,random_opening_moves=random_opening_moves)
print("\n\n\n********\nDONE!\n********\n\n\n")