|
import chess |
|
import chess.pgn |
|
import csv |
|
import os |
|
|
|
start_at = 0 |
|
total_games = 92055571 |
|
def process_pgn_file(input_file, output_file): |
|
with open(input_file, 'r') as pgn_file, open(output_file, 'a', newline='') as csv_file: |
|
csv_writer = csv.writer(csv_file) |
|
if start_at == 0: |
|
csv_writer.writerow(['transcript']) |
|
|
|
file_size = os.stat(pgn_file.fileno()).st_size |
|
pgn_file.seek(int(file_size * (start_at / total_games))) |
|
|
|
games_seen = 0 |
|
games_added = 0 |
|
while True: |
|
game = chess.pgn.read_game(pgn_file) |
|
if game is None: |
|
break |
|
games_seen += 1 |
|
|
|
|
|
if ( |
|
game.headers['Result'] == '1-0' and |
|
'Rated' in game.headers['Event'] and |
|
1500 < int(game.headers['WhiteElo']) < 2400 and |
|
1400 < int(game.headers['BlackElo']) < 2800 |
|
): |
|
board = chess.Board() |
|
moves = [] |
|
move_number = 1 |
|
for move in game.mainline_moves(): |
|
if board.turn == chess.WHITE: |
|
moves.append(f"{move_number}.") |
|
move_number += 1 |
|
san = board.san(move) |
|
moves.append(san + " ") |
|
board.push(board.parse_san(san)) |
|
|
|
if board.is_game_over() and board.result() == "1-0": |
|
transcript = ''.join(moves) |
|
csv_writer.writerow([transcript.rstrip()]) |
|
games_added += 1 |
|
if games_added % 100 == 0: |
|
print(f"Added {games_added} of {games_seen} games. {(games_seen+start_at)/float(total_games):.2%} complete.") |
|
|
|
|
|
input_file = './lichess_db_standard_rated_2022-07.pgn' |
|
output_file = './lichess_transcripts_phase2_stable.csv' |
|
process_pgn_file(input_file, output_file) |