Spaces:
Sleeping
Sleeping
| """ | |
| Nexus-Core Search Engine | |
| PVS with advanced pruning techniques | |
| Research Implementation: | |
| - Principal Variation Search (Marsland, 1986) | |
| - Null Move Pruning (Donninger, 1993) | |
| - Late Move Reductions (Heinz, 2000) | |
| - Quiescence Search (Harris, 1975) | |
| """ | |
| import chess | |
| import time | |
| import logging | |
| from typing import Optional, Tuple, List, Dict | |
| from .evaluate import NexusCoreEvaluator | |
| from .transposition import TranspositionTable, NodeType | |
| from .move_ordering import MoveOrderer | |
| from .time_manager import TimeManager | |
| from .endgame import EndgameDetector | |
| logger = logging.getLogger(__name__) | |
| class NexusCoreEngine: | |
| """Nexus-Core chess engine with 13.2M parameter neural network""" | |
| MATE_SCORE = 100000 | |
| MAX_PLY = 100 | |
| # Pruning parameters | |
| NULL_MOVE_REDUCTION = 2 | |
| NULL_MOVE_MIN_DEPTH = 3 | |
| LMR_MIN_DEPTH = 3 | |
| LMR_MOVE_THRESHOLD = 4 | |
| ASPIRATION_WINDOW = 50 | |
| def __init__(self, model_path: str, num_threads: int = 2): | |
| """Initialize engine""" | |
| self.evaluator = NexusCoreEvaluator(model_path, num_threads) | |
| self.tt = TranspositionTable(size_mb=128) # 128MB TT | |
| self.move_orderer = MoveOrderer() | |
| self.time_manager = TimeManager() | |
| self.endgame_detector = EndgameDetector() | |
| # Statistics | |
| self.nodes_evaluated = 0 | |
| self.depth_reached = 0 | |
| self.sel_depth = 0 | |
| self.principal_variation = [] | |
| logger.info("🎯 Nexus-Core Engine initialized") | |
| logger.info(f" Model: {self.evaluator.get_model_size_mb():.2f} MB") | |
| logger.info(f" TT Size: 128 MB") | |
| def get_best_move( | |
| self, | |
| fen: str, | |
| depth: int = 5, | |
| time_limit: int = 3000 | |
| ) -> Dict: | |
| """ | |
| Main search entry point | |
| Args: | |
| fen: Position in FEN | |
| depth: Max search depth | |
| time_limit: Time limit in ms | |
| Returns: | |
| Dictionary with best_move and stats | |
| """ | |
| board = chess.Board(fen) | |
| # Reset stats | |
| self.nodes_evaluated = 0 | |
| self.depth_reached = 0 | |
| self.sel_depth = 0 | |
| self.principal_variation = [] | |
| # Time management | |
| time_limit_sec = time_limit / 1000.0 | |
| self.time_manager.start_search(time_limit_sec, time_limit_sec) | |
| # Age history | |
| self.move_orderer.age_history(0.95) | |
| self.tt.increment_age() | |
| # Special cases | |
| legal_moves = list(board.legal_moves) | |
| if len(legal_moves) == 0: | |
| return self._no_legal_moves_result() | |
| if len(legal_moves) == 1: | |
| return self._single_move_result(board, legal_moves[0]) | |
| # Iterative deepening | |
| best_move = legal_moves[0] | |
| best_score = float('-inf') | |
| alpha = float('-inf') | |
| beta = float('inf') | |
| for current_depth in range(1, depth + 1): | |
| if self.time_manager.should_stop(current_depth): | |
| break | |
| # Aspiration windows for depth >= 4 | |
| if current_depth >= 4 and abs(best_score) < self.MATE_SCORE - 1000: | |
| alpha = best_score - self.ASPIRATION_WINDOW | |
| beta = best_score + self.ASPIRATION_WINDOW | |
| else: | |
| alpha = float('-inf') | |
| beta = float('inf') | |
| # Search | |
| score, move, pv = self._search_root( | |
| board, current_depth, alpha, beta | |
| ) | |
| # Handle aspiration failures | |
| if score <= alpha or score >= beta: | |
| score, move, pv = self._search_root( | |
| board, current_depth, float('-inf'), float('inf') | |
| ) | |
| # Update best | |
| if move: | |
| best_move = move | |
| best_score = score | |
| self.depth_reached = current_depth | |
| self.principal_variation = pv | |
| logger.info( | |
| f"Depth {current_depth}: {move.uci()} " | |
| f"({score:+.2f}) | Nodes: {self.nodes_evaluated}" | |
| ) | |
| return { | |
| 'best_move': best_move.uci(), | |
| 'evaluation': round(best_score / 100.0, 2), | |
| 'depth_searched': self.depth_reached, | |
| 'seldepth': self.sel_depth, | |
| 'nodes_evaluated': self.nodes_evaluated, | |
| 'time_taken': int(self.time_manager.elapsed() * 1000), | |
| 'pv': [m.uci() for m in self.principal_variation], | |
| 'nps': int(self.nodes_evaluated / max(self.time_manager.elapsed(), 0.001)), | |
| 'tt_stats': self.tt.get_stats(), | |
| 'move_ordering_stats': self.move_orderer.get_stats() | |
| } | |
| def _search_root( | |
| self, | |
| board: chess.Board, | |
| depth: int, | |
| alpha: float, | |
| beta: float | |
| ) -> Tuple[float, Optional[chess.Move], List[chess.Move]]: | |
| """Root node search""" | |
| legal_moves = list(board.legal_moves) | |
| # TT probe | |
| zobrist_key = self.tt.compute_zobrist_key(board) | |
| tt_result = self.tt.probe(zobrist_key, depth, alpha, beta) | |
| tt_move = tt_result[1] if tt_result else None | |
| # Order moves | |
| ordered_moves = self.move_orderer.order_moves( | |
| board, legal_moves, depth, tt_move | |
| ) | |
| best_move = ordered_moves[0] | |
| best_score = float('-inf') | |
| best_pv = [] | |
| for i, move in enumerate(ordered_moves): | |
| board.push(move) | |
| if i == 0: | |
| score, pv = self._pvs( | |
| board, depth - 1, -beta, -alpha, True | |
| ) | |
| score = -score | |
| else: | |
| score, _ = self._pvs( | |
| board, depth - 1, -alpha - 1, -alpha, False | |
| ) | |
| score = -score | |
| if alpha < score < beta: | |
| score, pv = self._pvs( | |
| board, depth - 1, -beta, -alpha, True | |
| ) | |
| score = -score | |
| else: | |
| pv = [] | |
| board.pop() | |
| if score > best_score: | |
| best_score = score | |
| best_move = move | |
| best_pv = [move] + pv | |
| if score > alpha: | |
| alpha = score | |
| if self.time_manager.should_stop(depth): | |
| break | |
| # Store in TT | |
| self.tt.store(zobrist_key, depth, best_score, NodeType.EXACT, best_move) | |
| return best_score, best_move, best_pv | |
| def _pvs( | |
| self, | |
| board: chess.Board, | |
| depth: int, | |
| alpha: float, | |
| beta: float, | |
| do_null: bool | |
| ) -> Tuple[float, List[chess.Move]]: | |
| """Principal Variation Search""" | |
| self.sel_depth = max(self.sel_depth, self.MAX_PLY - depth) | |
| # Mate distance pruning | |
| alpha = max(alpha, -self.MATE_SCORE + (self.MAX_PLY - depth)) | |
| beta = min(beta, self.MATE_SCORE - (self.MAX_PLY - depth) - 1) | |
| if alpha >= beta: | |
| return alpha, [] | |
| # Draw detection | |
| if board.is_repetition(2) or board.is_fifty_moves(): | |
| return 0, [] | |
| # TT probe | |
| zobrist_key = self.tt.compute_zobrist_key(board) | |
| tt_result = self.tt.probe(zobrist_key, depth, alpha, beta) | |
| if tt_result and tt_result[0] is not None: | |
| return tt_result[0], [] | |
| tt_move = tt_result[1] if tt_result else None | |
| # Quiescence search | |
| if depth <= 0: | |
| return self._quiescence(board, alpha, beta, 0), [] | |
| # Null move pruning | |
| if (do_null and | |
| depth >= self.NULL_MOVE_MIN_DEPTH and | |
| not board.is_check() and | |
| self._has_non_pawn_material(board)): | |
| board.push(chess.Move.null()) | |
| score, _ = self._pvs( | |
| board, depth - 1 - self.NULL_MOVE_REDUCTION, | |
| -beta, -beta + 1, False | |
| ) | |
| score = -score | |
| board.pop() | |
| if score >= beta: | |
| return beta, [] | |
| # Generate moves | |
| legal_moves = list(board.legal_moves) | |
| if not legal_moves: | |
| if board.is_check(): | |
| return -self.MATE_SCORE + (self.MAX_PLY - depth), [] | |
| return 0, [] | |
| ordered_moves = self.move_orderer.order_moves( | |
| board, legal_moves, depth, tt_move | |
| ) | |
| # Main search | |
| best_score = float('-inf') | |
| best_pv = [] | |
| node_type = NodeType.UPPER_BOUND | |
| moves_searched = 0 | |
| for move in ordered_moves: | |
| board.push(move) | |
| # Late move reductions | |
| reduction = 0 | |
| if (moves_searched >= self.LMR_MOVE_THRESHOLD and | |
| depth >= self.LMR_MIN_DEPTH and | |
| not board.is_check() and | |
| not board.is_capture(board.peek())): | |
| reduction = 1 | |
| # PVS | |
| if moves_searched == 0: | |
| score, pv = self._pvs( | |
| board, depth - 1, -beta, -alpha, True | |
| ) | |
| score = -score | |
| else: | |
| score, _ = self._pvs( | |
| board, depth - 1 - reduction, -alpha - 1, -alpha, True | |
| ) | |
| score = -score | |
| if alpha < score < beta: | |
| score, pv = self._pvs( | |
| board, depth - 1, -beta, -alpha, True | |
| ) | |
| score = -score | |
| else: | |
| pv = [] | |
| board.pop() | |
| moves_searched += 1 | |
| if score > best_score: | |
| best_score = score | |
| best_pv = [move] + pv | |
| if score > alpha: | |
| alpha = score | |
| node_type = NodeType.EXACT | |
| if not board.is_capture(move): | |
| self.move_orderer.update_history(move, depth, True) | |
| self.move_orderer.update_killer_move(move, depth) | |
| if score >= beta: | |
| node_type = NodeType.LOWER_BOUND | |
| break | |
| self.tt.store(zobrist_key, depth, best_score, node_type, best_pv[0] if best_pv else None) | |
| return best_score, best_pv | |
| def _quiescence( | |
| self, | |
| board: chess.Board, | |
| alpha: float, | |
| beta: float, | |
| qs_depth: int | |
| ) -> float: | |
| """Quiescence search""" | |
| self.nodes_evaluated += 1 | |
| # Stand-pat | |
| stand_pat = self.evaluator.evaluate_hybrid(board) | |
| stand_pat = self.endgame_detector.adjust_evaluation(board, stand_pat) | |
| if stand_pat >= beta: | |
| return beta | |
| if alpha < stand_pat: | |
| alpha = stand_pat | |
| # Depth limit | |
| if qs_depth >= 8: | |
| return stand_pat | |
| # Tactical moves | |
| tactical_moves = [ | |
| move for move in board.legal_moves | |
| if board.is_capture(move) or board.gives_check(move) | |
| ] | |
| if not tactical_moves: | |
| return stand_pat | |
| tactical_moves = self.move_orderer.order_moves(board, tactical_moves, 0) | |
| for move in tactical_moves: | |
| board.push(move) | |
| score = -self._quiescence(board, -beta, -alpha, qs_depth + 1) | |
| board.pop() | |
| if score >= beta: | |
| return beta | |
| if score > alpha: | |
| alpha = score | |
| return alpha | |
| def _has_non_pawn_material(self, board: chess.Board) -> bool: | |
| """Check for non-pawn material""" | |
| for piece_type in [chess.KNIGHT, chess.BISHOP, chess.ROOK, chess.QUEEN]: | |
| if board.pieces(piece_type, board.turn): | |
| return True | |
| return False | |
| def _no_legal_moves_result(self) -> Dict: | |
| """No legal moves result""" | |
| return { | |
| 'best_move': '0000', | |
| 'evaluation': 0.0, | |
| 'depth_searched': 0, | |
| 'nodes_evaluated': 0, | |
| 'time_taken': 0 | |
| } | |
| def _single_move_result(self, board: chess.Board, move: chess.Move) -> Dict: | |
| """Single move result""" | |
| eval_score = self.evaluator.evaluate_hybrid(board) | |
| return { | |
| 'best_move': move.uci(), | |
| 'evaluation': round(eval_score / 100.0, 2), | |
| 'depth_searched': 0, | |
| 'nodes_evaluated': 1, | |
| 'time_taken': 0, | |
| 'pv': [move.uci()] | |
| } | |
| def validate_fen(self, fen: str) -> bool: | |
| """Validate FEN""" | |
| try: | |
| chess.Board(fen) | |
| return True | |
| except: | |
| return False | |
| def get_model_size(self) -> float: | |
| """Get model size""" | |
| return self.evaluator.get_model_size_mb() |