Spaces:
Sleeping
Sleeping
| from collections import defaultdict | |
| import logging | |
| import bridgebots | |
| from bridgebots import PlayerHand, Rank | |
| from pathlib import Path | |
| from datetime import datetime | |
| import pandas as pd | |
| import tempfile | |
| from dds_util import get_dd_tricks | |
| from typing import List, Dict | |
| import os | |
| DEALER_LIST = ['N', 'E', 'S', 'W'] | |
| VULNERABILITY_LIST = ["None","NS","EW","All","NS","EW","All","None","EW","All","None","NS","All","None","NS","EW"] | |
| SUITS = [bridgebots.Suit.SPADES, bridgebots.Suit.HEARTS, bridgebots.Suit.DIAMONDS, bridgebots.Suit.CLUBS] | |
| def _build_record_dict(record_strings: List[str]) -> Dict: | |
| """ | |
| Parse the pbn line by line. When a block section like "Auction" or "Play" is encountered, collect all the content of | |
| the block into a single entry | |
| :param record_strings: List of string lines for a single board | |
| :return: A dictionary mapping keys from the pbn to strings or other useful values (e.g. list of strings for the | |
| bidding record) | |
| """ | |
| record_dict = {} | |
| # Janky while loop to handle non bracketed lines | |
| i = 0 | |
| while i < len(record_strings): | |
| record_string = record_strings[i] | |
| if not (record_string.startswith("[") or record_string.startswith("{")): | |
| i += 1 | |
| continue | |
| if record_string.startswith("{"): | |
| commentary = "" | |
| while i < len(record_strings): | |
| record_string = record_strings[i] | |
| if record_string.startswith("["): | |
| break | |
| commentary += record_string + " " | |
| i += 1 | |
| record_dict["Commentary"] = commentary.strip() | |
| continue | |
| if record_string.startswith("[") and "]" not in record_string: | |
| while "]" not in record_string: | |
| i += 1 | |
| record_string = record_string + record_strings[i] | |
| record_string = record_string.replace("[", "").replace("]", "") | |
| key, value = record_string.split(maxsplit=1) | |
| value = value.replace('"', "") | |
| if key == "Note": | |
| number, message = value.split(":", maxsplit=1) | |
| key = key + "_" + number | |
| value = message | |
| record_dict[key] = value | |
| if key == "Auction": | |
| auction_record = [] | |
| i += 1 | |
| while i < len(record_strings): | |
| auction_str = record_strings[i] | |
| if "[" in auction_str: | |
| break | |
| auction_record.extend(auction_str.split()) | |
| i += 1 | |
| record_dict["bidding_record"] = auction_record | |
| elif key == "Play": | |
| play_record = [] | |
| i += 1 | |
| while i < len(record_strings): | |
| play_str = record_strings[i] | |
| if "[" in play_str or play_str == "*": | |
| break | |
| play_record.append(play_str.split()) | |
| i += 1 | |
| record_dict["play_record"] = play_record | |
| elif key == "OptimumResultTable": | |
| dds_tricks_table = [] | |
| i += 1 | |
| while i < len(record_strings): | |
| dds_str = record_strings[i] | |
| if "[" in dds_str or dds_str == "*": | |
| break | |
| dds_tricks_table.append(dds_str.split()) | |
| i += 1 | |
| record_dict["dds_tricks_table"] = dds_tricks_table | |
| else: | |
| i += 1 | |
| return record_dict | |
| def parse_single_pbn_record(record_strings): | |
| """ | |
| :param record_strings: One string per line of a single PBN deal record | |
| :return: Deal and BoardRecord corresponding to the PBN record | |
| """ | |
| record_dict = _build_record_dict(record_strings) | |
| try: | |
| deal = bridgebots.pbn.from_pbn_deal(record_dict["Dealer"], record_dict["Vulnerable"], record_dict["Deal"]) | |
| except KeyError as e: | |
| # if previous_deal: | |
| # deal = previous_deal | |
| # else: | |
| raise ValueError("Missing deal fields and no previous_deal provided") from e | |
| # board_record = _parse_board_record(record_dict, deal) | |
| return deal, record_dict | |
| def parse_pbn(file_path): | |
| """ | |
| Split PBN file into boards then decompose those boards into Deal and BoardRecord objects. Only supports PBN v1.0 | |
| See https://www.tistis.nl/pbn/pbn_v10.txt | |
| :param file_path: path to a PBN file | |
| :return: A list of DealRecords representing all the boards played | |
| """ | |
| records_strings = bridgebots.pbn._split_pbn(file_path) | |
| # Maintain a mapping from deal to board records to create a single deal record per deal | |
| records = defaultdict(list) | |
| # Some PBNs have multiple board records per deal | |
| previous_deal = None | |
| for record_strings in records_strings: | |
| try: | |
| deal, board_record = parse_single_pbn_record(record_strings) | |
| records[deal].append(board_record) | |
| # previous_deal = deal | |
| except (KeyError, ValueError) as e: | |
| logging.warning(f"Malformed record {record_strings}: {e}") | |
| return [(deal, board_records) for deal, board_records in records.items()] | |
| def create_pbn_file( | |
| data: pd.DataFrame, | |
| save_path = None, | |
| date=datetime.today(), | |
| board_no=1, | |
| event='', | |
| site='', | |
| ) -> str: | |
| if not save_path: | |
| fd, save_path = tempfile.mkstemp(suffix='.pbn') | |
| os.close(fd) | |
| year = date.strftime("%y") | |
| month = date.strftime("%m") | |
| day = date.strftime("%d") | |
| date_print = day + "." + month + "." + year | |
| dealer = DEALER_LIST[(board_no-1) % 4] | |
| vulnerability=VULNERABILITY_LIST[(board_no-1) % 16] | |
| deal = pbn_deal_string(data, dealer) # sss.hhh.ddd.ccc sss.hhh.ddd.ccc...... | |
| file = '' | |
| file += ("%This PBN was generated by Bridge Hand Scanner\n") | |
| file += f'[Event "{event}"]\n' | |
| file += f'[Site "{site}"]\n' | |
| file += f'[Date "{date_print}"]\n' | |
| file += f'[Board "{str(board_no)}"]\n' | |
| file += f'[Dealer "{dealer}"]\n' | |
| file += f'[Vulnerable "{vulnerability}"]\n' | |
| file += f'[Deal "{deal}"]\n' | |
| file += pbn_optimum_table(deal) | |
| file += '\n' # End of board | |
| with open(save_path, 'w') as f: | |
| f.write(file) | |
| return file | |
| def merge_pbn(pbn_paths): | |
| fd, fn = tempfile.mkstemp(suffix='.pbn', text=True) | |
| board_dict = {} | |
| for i, pbn_path in enumerate(pbn_paths): | |
| result = parse_pbn(pbn_path)[0] | |
| with open(pbn_path, 'r') as f2: | |
| pbn_str = f2.read() | |
| board_no = int(result[1][0]['Board']) | |
| board_dict[board_no] = pbn_str | |
| ordered_board_dict = dict(sorted(board_dict.items())) | |
| with open(fd, 'w') as f: | |
| for i, (k,v) in enumerate(ordered_board_dict.items()): | |
| if i != 0: | |
| if v[-1] != "\n": | |
| f.write('\n') | |
| f.write(v) | |
| return fn | |
| def pbn_deal_string(df, dealer): | |
| assert dealer in DEALER_LIST, f'Dealer {dealer} is not valid' | |
| dealer_idx = DEALER_LIST.index(dealer) | |
| dealer_list_rolled = DEALER_LIST[dealer_idx:] + DEALER_LIST[:dealer_idx] | |
| deal = f'{dealer}:' | |
| hand_reprs = [] | |
| for direction in dealer_list_rolled: | |
| hand = PlayerHand.from_string_lists(*df[direction]) | |
| hand_reprs.append('.'.join([''.join([r.abbreviation() for r in hand.suits[suit]]) for suit in SUITS])) | |
| deal += ' '.join(hand_reprs) | |
| return deal | |
| DD_DIRECTIONS = ['N', 'S', 'E', 'W'] | |
| DD_SUITS = ['NT', 'S', 'H', 'D', 'C'] | |
| def pbn_optimum_table(pbn_deal_string): | |
| try: | |
| dd_tricks_string = get_dd_tricks(pbn_deal_string) | |
| except Exception as e: | |
| print(e) | |
| raise ValueError(e) | |
| s = '[OptimumResultTable "Declarer;Denomination\\2R;Result\\2R"]\n' | |
| for i, char in enumerate(dd_tricks_string): | |
| direction = DD_DIRECTIONS[i//5] | |
| suit = DD_SUITS[i%5] | |
| tricks = int(char, 16) | |
| s += f'{direction} {suit} {tricks}\n' | |
| return s |