Rafs-an09002 commited on
Commit
da6ae37
·
verified ·
1 Parent(s): 71b16fa

Create engine/search.py

Browse files
Files changed (1) hide show
  1. engine/search.py +294 -0
engine/search.py ADDED
@@ -0,0 +1,294 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Nexus-Core Search Engine
3
+ Efficient alpha-beta with essential optimizations
4
+ - Basic transposition table
5
+ - Simple move ordering (MVV-LVA)
6
+ - Quiescence search
7
+ """
8
+
9
+ import onnxruntime as ort
10
+ import numpy as np
11
+ import chess
12
+ import time
13
+ import logging
14
+ from pathlib import Path
15
+ from typing import Optional, Dict, Tuple, List
16
+
17
+ logger = logging.getLogger(__name__)
18
+
19
+
20
+ class NexusCoreEngine:
21
+ """
22
+ Lightweight chess engine for Nexus-Core
23
+ Optimized for speed over strength
24
+ """
25
+
26
+ PIECE_VALUES = {
27
+ chess.PAWN: 100,
28
+ chess.KNIGHT: 320,
29
+ chess.BISHOP: 330,
30
+ chess.ROOK: 500,
31
+ chess.QUEEN: 900,
32
+ chess.KING: 0
33
+ }
34
+
35
+ def __init__(self, model_path: str, num_threads: int = 2):
36
+ """Initialize engine"""
37
+
38
+ self.model_path = Path(model_path)
39
+ if not self.model_path.exists():
40
+ raise FileNotFoundError(f"Model not found: {model_path}")
41
+
42
+ # Load ONNX model
43
+ sess_options = ort.SessionOptions()
44
+ sess_options.intra_op_num_threads = num_threads
45
+ sess_options.inter_op_num_threads = num_threads
46
+ sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
47
+
48
+ logger.info(f"Loading Nexus-Core from {model_path}...")
49
+ self.session = ort.InferenceSession(
50
+ str(self.model_path),
51
+ sess_options=sess_options,
52
+ providers=['CPUExecutionProvider']
53
+ )
54
+
55
+ self.input_name = self.session.get_inputs()[0].name
56
+ self.output_name = self.session.get_outputs()[0].name
57
+
58
+ # Simple transposition table (dict-based, 100K entries)
59
+ self.tt_cache = {}
60
+ self.max_tt_size = 100000
61
+
62
+ # Statistics
63
+ self.nodes_evaluated = 0
64
+
65
+ logger.info("✅ Nexus-Core engine ready")
66
+
67
+ def fen_to_tensor(self, fen: str) -> np.ndarray:
68
+ """Convert FEN to 12-channel tensor"""
69
+ board = chess.Board(fen)
70
+ tensor = np.zeros((1, 12, 8, 8), dtype=np.float32)
71
+
72
+ piece_to_channel = {
73
+ chess.PAWN: 0, chess.KNIGHT: 1, chess.BISHOP: 2,
74
+ chess.ROOK: 3, chess.QUEEN: 4, chess.KING: 5
75
+ }
76
+
77
+ for square, piece in board.piece_map().items():
78
+ rank, file = divmod(square, 8)
79
+ channel = piece_to_channel[piece.piece_type]
80
+ if piece.color == chess.BLACK:
81
+ channel += 6
82
+ tensor[0, channel, rank, file] = 1.0
83
+
84
+ return tensor
85
+
86
+ def evaluate(self, board: chess.Board) -> float:
87
+ """Neural network evaluation"""
88
+ self.nodes_evaluated += 1
89
+
90
+ # Check cache (simple FEN-based)
91
+ fen_key = board.fen().split(' ')[0]
92
+ if fen_key in self.tt_cache:
93
+ return self.tt_cache[fen_key]
94
+
95
+ # Run inference
96
+ input_tensor = self.fen_to_tensor(board.fen())
97
+ output = self.session.run([self.output_name], {self.input_name: input_tensor})
98
+
99
+ # Value output (tanh normalized)
100
+ eval_score = float(output[0][0][0]) * 400.0 # Scale to centipawns
101
+
102
+ # Flip for black
103
+ if board.turn == chess.BLACK:
104
+ eval_score = -eval_score
105
+
106
+ # Cache result
107
+ if len(self.tt_cache) < self.max_tt_size:
108
+ self.tt_cache[fen_key] = eval_score
109
+
110
+ return eval_score
111
+
112
+ def order_moves(self, board: chess.Board, moves: List[chess.Move]) -> List[chess.Move]:
113
+ """
114
+ Simple move ordering
115
+ 1. Captures (MVV-LVA)
116
+ 2. Checks
117
+ 3. Other moves
118
+ """
119
+ scored_moves = []
120
+
121
+ for move in moves:
122
+ score = 0
123
+
124
+ # Captures
125
+ if board.is_capture(move):
126
+ victim = board.piece_at(move.to_square)
127
+ attacker = board.piece_at(move.from_square)
128
+ if victim and attacker:
129
+ victim_val = self.PIECE_VALUES.get(victim.piece_type, 0)
130
+ attacker_val = self.PIECE_VALUES.get(attacker.piece_type, 1)
131
+ score = (victim_val * 10 - attacker_val) * 100
132
+
133
+ # Promotions
134
+ if move.promotion == chess.QUEEN:
135
+ score += 9000
136
+
137
+ # Checks
138
+ board.push(move)
139
+ if board.is_check():
140
+ score += 5000
141
+ board.pop()
142
+
143
+ scored_moves.append((score, move))
144
+
145
+ scored_moves.sort(key=lambda x: x[0], reverse=True)
146
+ return [move for _, move in scored_moves]
147
+
148
+ def quiescence(self, board: chess.Board, alpha: float, beta: float, depth: int = 2) -> float:
149
+ """Quiescence search (captures only)"""
150
+
151
+ stand_pat = self.evaluate(board)
152
+
153
+ if stand_pat >= beta:
154
+ return beta
155
+ if alpha < stand_pat:
156
+ alpha = stand_pat
157
+
158
+ if depth == 0:
159
+ return stand_pat
160
+
161
+ # Only captures
162
+ captures = [m for m in board.legal_moves if board.is_capture(m)]
163
+ if not captures:
164
+ return stand_pat
165
+
166
+ captures = self.order_moves(board, captures)
167
+
168
+ for move in captures:
169
+ board.push(move)
170
+ score = -self.quiescence(board, -beta, -alpha, depth - 1)
171
+ board.pop()
172
+
173
+ if score >= beta:
174
+ return beta
175
+ if score > alpha:
176
+ alpha = score
177
+
178
+ return alpha
179
+
180
+ def alpha_beta(
181
+ self,
182
+ board: chess.Board,
183
+ depth: int,
184
+ alpha: float,
185
+ beta: float,
186
+ start_time: float,
187
+ time_limit: float
188
+ ) -> Tuple[float, Optional[chess.Move]]:
189
+ """Alpha-beta search"""
190
+
191
+ # Time check
192
+ if time.time() - start_time > time_limit:
193
+ return self.evaluate(board), None
194
+
195
+ # Terminal nodes
196
+ if board.is_game_over():
197
+ if board.is_checkmate():
198
+ return -10000, None
199
+ return 0, None
200
+
201
+ # Leaf nodes
202
+ if depth == 0:
203
+ return self.quiescence(board, alpha, beta), None
204
+
205
+ legal_moves = list(board.legal_moves)
206
+ if not legal_moves:
207
+ return 0, None
208
+
209
+ ordered_moves = self.order_moves(board, legal_moves)
210
+
211
+ best_move = ordered_moves[0]
212
+ best_score = float('-inf')
213
+
214
+ for move in ordered_moves:
215
+ board.push(move)
216
+ score, _ = self.alpha_beta(board, depth - 1, -beta, -alpha, start_time, time_limit)
217
+ score = -score
218
+ board.pop()
219
+
220
+ if score > best_score:
221
+ best_score = score
222
+ best_move = move
223
+
224
+ alpha = max(alpha, score)
225
+ if alpha >= beta:
226
+ break
227
+
228
+ return best_score, best_move
229
+
230
+ def get_best_move(self, fen: str, depth: int = 4, time_limit: int = 3000) -> Dict:
231
+ """Main search entry"""
232
+
233
+ board = chess.Board(fen)
234
+ self.nodes_evaluated = 0
235
+
236
+ time_limit_sec = time_limit / 1000.0
237
+ start_time = time.time()
238
+
239
+ # Special cases
240
+ legal_moves = list(board.legal_moves)
241
+ if len(legal_moves) == 0:
242
+ return {'best_move': '0000', 'evaluation': 0.0, 'depth_searched': 0, 'nodes_evaluated': 0}
243
+
244
+ if len(legal_moves) == 1:
245
+ return {
246
+ 'best_move': legal_moves[0].uci(),
247
+ 'evaluation': round(self.evaluate(board) / 100.0, 2),
248
+ 'depth_searched': 0,
249
+ 'nodes_evaluated': 1,
250
+ 'time_taken': 0
251
+ }
252
+
253
+ # Iterative deepening
254
+ best_move = legal_moves[0]
255
+ best_score = float('-inf')
256
+
257
+ for current_depth in range(1, depth + 1):
258
+ if time.time() - start_time > time_limit_sec * 0.9:
259
+ break
260
+
261
+ try:
262
+ score, move = self.alpha_beta(
263
+ board, current_depth,
264
+ float('-inf'), float('inf'),
265
+ start_time, time_limit_sec
266
+ )
267
+
268
+ if move:
269
+ best_move = move
270
+ best_score = score
271
+
272
+ except Exception as e:
273
+ logger.warning(f"Search error: {e}")
274
+ break
275
+
276
+ time_taken = int((time.time() - start_time) * 1000)
277
+
278
+ return {
279
+ 'best_move': best_move.uci(),
280
+ 'evaluation': round(best_score / 100.0, 2),
281
+ 'depth_searched': current_depth,
282
+ 'nodes_evaluated': self.nodes_evaluated,
283
+ 'time_taken': time_taken
284
+ }
285
+
286
+ def validate_fen(self, fen: str) -> bool:
287
+ try:
288
+ chess.Board(fen)
289
+ return True
290
+ except:
291
+ return False
292
+
293
+ def get_model_size(self) -> float:
294
+ return self.model_path.stat().st_size / (1024 * 1024)