|
import chess |
|
import chess.pgn |
|
import csv |
|
import sys |
|
|
|
def process_pgn_stream(output_file): |
|
with open(output_file, 'a', newline='') as csv_file: |
|
csv_writer = csv.writer(csv_file) |
|
csv_writer.writerow(['transcript']) |
|
|
|
games_seen = 0 |
|
games_added = 0 |
|
while True: |
|
game = chess.pgn.read_game(sys.stdin) |
|
if game is None: |
|
break |
|
games_seen += 1 |
|
|
|
|
|
if ( |
|
game.headers['Result'] == '1-0' and |
|
'Rated' in game.headers['Event'] and |
|
1500 < int(game.headers['WhiteElo']) < 2400 and |
|
1400 < int(game.headers['BlackElo']) < 2800 |
|
): |
|
board = chess.Board() |
|
moves = [] |
|
move_number = 1 |
|
for move in game.mainline_moves(): |
|
if board.turn == chess.WHITE: |
|
moves.append(f"{move_number}.") |
|
move_number += 1 |
|
san = board.san(move) |
|
moves.append(san + " ") |
|
board.push(board.parse_san(san)) |
|
|
|
if board.is_game_over() and board.result() == "1-0": |
|
transcript = ''.join(moves) |
|
csv_writer.writerow([transcript.rstrip()]) |
|
games_added += 1 |
|
if games_added % 100 == 0: |
|
print(f"Added {games_added} of {games_seen} games.", file=sys.stderr) |
|
|
|
|
|
output_file = './lichess_transcripts_phase2_stable.csv' |
|
process_pgn_stream(output_file) |