|
|
""" |
|
|
Nexus-Nano Search Engine |
|
|
Fast alpha-beta with minimal overhead |
|
|
|
|
|
Focus: Speed > Depth |
|
|
Target: Sub-second responses |
|
|
""" |
|
|
|
|
|
import chess |
|
|
import logging |
|
|
from typing import Optional, Tuple, List, Dict |
|
|
|
|
|
from .evaluate import NexusNanoEvaluator |
|
|
from .transposition import TranspositionTable, NodeType |
|
|
from .move_ordering import MoveOrderer |
|
|
from .time_manager import TimeManager |
|
|
from .endgame import EndgameDetector |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class NexusNanoEngine: |
|
|
"""Ultra-fast 2.8M parameter chess engine""" |
|
|
|
|
|
MATE_SCORE = 100000 |
|
|
MAX_PLY = 100 |
|
|
|
|
|
def __init__(self, model_path: str, num_threads: int = 1): |
|
|
"""Initialize with single-threaded config""" |
|
|
|
|
|
self.evaluator = NexusNanoEvaluator(model_path, num_threads) |
|
|
self.tt = TranspositionTable(size_mb=64) |
|
|
self.move_orderer = MoveOrderer() |
|
|
self.time_manager = TimeManager() |
|
|
self.endgame_detector = EndgameDetector() |
|
|
|
|
|
self.nodes_evaluated = 0 |
|
|
self.depth_reached = 0 |
|
|
self.sel_depth = 0 |
|
|
self.principal_variation = [] |
|
|
|
|
|
logger.info("⚡ Nexus-Nano Engine initialized") |
|
|
logger.info(f" Model: {self.evaluator.get_model_size_mb():.2f} MB") |
|
|
logger.info(f" TT: 64 MB") |
|
|
|
|
|
def get_best_move( |
|
|
self, |
|
|
fen: str, |
|
|
depth: int = 4, |
|
|
time_limit: int = 2000 |
|
|
) -> Dict: |
|
|
""" |
|
|
Fast move search |
|
|
|
|
|
Args: |
|
|
fen: Position |
|
|
depth: Max depth (1-6 recommended) |
|
|
time_limit: Time in ms |
|
|
""" |
|
|
|
|
|
board = chess.Board(fen) |
|
|
|
|
|
|
|
|
self.nodes_evaluated = 0 |
|
|
self.depth_reached = 0 |
|
|
self.sel_depth = 0 |
|
|
self.principal_variation = [] |
|
|
|
|
|
|
|
|
time_limit_sec = time_limit / 1000.0 |
|
|
self.time_manager.start_search(time_limit_sec, time_limit_sec) |
|
|
|
|
|
|
|
|
self.move_orderer.clear() |
|
|
self.tt.increment_age() |
|
|
|
|
|
|
|
|
legal_moves = list(board.legal_moves) |
|
|
|
|
|
if len(legal_moves) == 0: |
|
|
return self._no_legal_moves() |
|
|
|
|
|
if len(legal_moves) == 1: |
|
|
return self._single_move(board, legal_moves[0]) |
|
|
|
|
|
|
|
|
best_move = legal_moves[0] |
|
|
best_score = float('-inf') |
|
|
|
|
|
for current_depth in range(1, depth + 1): |
|
|
if self.time_manager.should_stop(current_depth): |
|
|
break |
|
|
|
|
|
score, move, pv = self._search_root( |
|
|
board, current_depth, float('-inf'), float('inf') |
|
|
) |
|
|
|
|
|
if move: |
|
|
best_move = move |
|
|
best_score = score |
|
|
self.depth_reached = current_depth |
|
|
self.principal_variation = pv |
|
|
|
|
|
return { |
|
|
'best_move': best_move.uci(), |
|
|
'evaluation': round(best_score / 100.0, 2), |
|
|
'depth_searched': self.depth_reached, |
|
|
'seldepth': self.sel_depth, |
|
|
'nodes_evaluated': self.nodes_evaluated, |
|
|
'time_taken': int(self.time_manager.elapsed() * 1000), |
|
|
'pv': [m.uci() for m in self.principal_variation], |
|
|
'nps': int(self.nodes_evaluated / max(self.time_manager.elapsed(), 0.001)), |
|
|
'tt_stats': self.tt.get_stats(), |
|
|
'move_ordering_stats': self.move_orderer.get_stats() |
|
|
} |
|
|
|
|
|
def _search_root( |
|
|
self, |
|
|
board: chess.Board, |
|
|
depth: int, |
|
|
alpha: float, |
|
|
beta: float |
|
|
) -> Tuple[float, Optional[chess.Move], List[chess.Move]]: |
|
|
"""Root search""" |
|
|
|
|
|
legal_moves = list(board.legal_moves) |
|
|
|
|
|
|
|
|
zobrist_key = self.tt.compute_zobrist_key(board) |
|
|
tt_result = self.tt.probe(zobrist_key, depth, alpha, beta) |
|
|
tt_move = tt_result[1] if tt_result else None |
|
|
|
|
|
|
|
|
ordered_moves = self.move_orderer.order_moves( |
|
|
board, legal_moves, depth, tt_move |
|
|
) |
|
|
|
|
|
best_move = ordered_moves[0] |
|
|
best_score = float('-inf') |
|
|
best_pv = [] |
|
|
|
|
|
for move in ordered_moves: |
|
|
board.push(move) |
|
|
score, pv = self._alpha_beta(board, depth - 1, -beta, -alpha) |
|
|
score = -score |
|
|
board.pop() |
|
|
|
|
|
if score > best_score: |
|
|
best_score = score |
|
|
best_move = move |
|
|
best_pv = [move] + pv |
|
|
|
|
|
if score > alpha: |
|
|
alpha = score |
|
|
|
|
|
if self.time_manager.should_stop(depth): |
|
|
break |
|
|
|
|
|
self.tt.store(zobrist_key, depth, best_score, NodeType.EXACT, best_move) |
|
|
|
|
|
return best_score, best_move, best_pv |
|
|
|
|
|
def _alpha_beta( |
|
|
self, |
|
|
board: chess.Board, |
|
|
depth: int, |
|
|
alpha: float, |
|
|
beta: float |
|
|
) -> Tuple[float, List[chess.Move]]: |
|
|
"""Fast alpha-beta search""" |
|
|
|
|
|
self.sel_depth = max(self.sel_depth, self.MAX_PLY - depth) |
|
|
|
|
|
|
|
|
if board.is_repetition(2) or board.is_fifty_moves(): |
|
|
return 0, [] |
|
|
|
|
|
|
|
|
zobrist_key = self.tt.compute_zobrist_key(board) |
|
|
tt_result = self.tt.probe(zobrist_key, depth, alpha, beta) |
|
|
|
|
|
if tt_result and tt_result[0] is not None: |
|
|
return tt_result[0], [] |
|
|
|
|
|
tt_move = tt_result[1] if tt_result else None |
|
|
|
|
|
|
|
|
if depth <= 0: |
|
|
return self._quiescence(board, alpha, beta, 0), [] |
|
|
|
|
|
|
|
|
legal_moves = list(board.legal_moves) |
|
|
if not legal_moves: |
|
|
if board.is_check(): |
|
|
return -self.MATE_SCORE + (self.MAX_PLY - depth), [] |
|
|
return 0, [] |
|
|
|
|
|
ordered_moves = self.move_orderer.order_moves( |
|
|
board, legal_moves, depth, tt_move |
|
|
) |
|
|
|
|
|
|
|
|
best_score = float('-inf') |
|
|
best_pv = [] |
|
|
node_type = NodeType.UPPER_BOUND |
|
|
|
|
|
for move in ordered_moves: |
|
|
board.push(move) |
|
|
score, pv = self._alpha_beta(board, depth - 1, -beta, -alpha) |
|
|
score = -score |
|
|
board.pop() |
|
|
|
|
|
if score > best_score: |
|
|
best_score = score |
|
|
best_pv = [move] + pv |
|
|
|
|
|
if score > alpha: |
|
|
alpha = score |
|
|
node_type = NodeType.EXACT |
|
|
|
|
|
if not board.is_capture(move): |
|
|
self.move_orderer.update_killer_move(move, depth) |
|
|
|
|
|
if score >= beta: |
|
|
node_type = NodeType.LOWER_BOUND |
|
|
break |
|
|
|
|
|
self.tt.store(zobrist_key, depth, best_score, node_type, best_pv[0] if best_pv else None) |
|
|
|
|
|
return best_score, best_pv |
|
|
|
|
|
def _quiescence( |
|
|
self, |
|
|
board: chess.Board, |
|
|
alpha: float, |
|
|
beta: float, |
|
|
qs_depth: int |
|
|
) -> float: |
|
|
"""Fast quiescence (captures only)""" |
|
|
|
|
|
self.nodes_evaluated += 1 |
|
|
|
|
|
|
|
|
stand_pat = self.evaluator.evaluate_hybrid(board) |
|
|
stand_pat = self.endgame_detector.adjust_evaluation(board, stand_pat) |
|
|
|
|
|
if stand_pat >= beta: |
|
|
return beta |
|
|
if alpha < stand_pat: |
|
|
alpha = stand_pat |
|
|
|
|
|
|
|
|
if qs_depth >= 6: |
|
|
return stand_pat |
|
|
|
|
|
|
|
|
captures = [m for m in board.legal_moves if board.is_capture(m)] |
|
|
|
|
|
if not captures: |
|
|
return stand_pat |
|
|
|
|
|
captures = self.move_orderer.order_moves(board, captures, 0) |
|
|
|
|
|
for move in captures: |
|
|
board.push(move) |
|
|
score = -self._quiescence(board, -beta, -alpha, qs_depth + 1) |
|
|
board.pop() |
|
|
|
|
|
if score >= beta: |
|
|
return beta |
|
|
if score > alpha: |
|
|
alpha = score |
|
|
|
|
|
return alpha |
|
|
|
|
|
def _no_legal_moves(self) -> Dict: |
|
|
return { |
|
|
'best_move': '0000', |
|
|
'evaluation': 0.0, |
|
|
'depth_searched': 0, |
|
|
'nodes_evaluated': 0, |
|
|
'time_taken': 0 |
|
|
} |
|
|
|
|
|
def _single_move(self, board: chess.Board, move: chess.Move) -> Dict: |
|
|
eval_score = self.evaluator.evaluate_hybrid(board) |
|
|
|
|
|
return { |
|
|
'best_move': move.uci(), |
|
|
'evaluation': round(eval_score / 100.0, 2), |
|
|
'depth_searched': 0, |
|
|
'nodes_evaluated': 1, |
|
|
'time_taken': 0, |
|
|
'pv': [move.uci()] |
|
|
} |
|
|
|
|
|
def validate_fen(self, fen: str) -> bool: |
|
|
try: |
|
|
chess.Board(fen) |
|
|
return True |
|
|
except: |
|
|
return False |
|
|
|
|
|
def get_model_size(self) -> float: |
|
|
return self.evaluator.get_model_size_mb() |