from collections import defaultdict import logging import bridgebots from pathlib import Path from datetime import datetime import pandas as pd import tempfile from collections import Counter from dds_util import get_result_table from typing import List, Dict DEALER_LIST = ['N', 'E', 'S', 'W'] VULNERABILITY_LIST = ["None","NS","EW","All","NS","EW","All","None","EW","All","None","NS","All","None","NS","EW"] SUITS = [bridgebots.Suit.SPADES, bridgebots.Suit.HEARTS, bridgebots.Suit.DIAMONDS, bridgebots.Suit.CLUBS] def _build_record_dict(record_strings: List[str]) -> Dict: """ Parse the pbn line by line. When a block section like "Auction" or "Play" is encountered, collect all the content of the block into a single entry :param record_strings: List of string lines for a single board :return: A dictionary mapping keys from the pbn to strings or other useful values (e.g. list of strings for the bidding record) """ record_dict = {} # Janky while loop to handle non bracketed lines i = 0 while i < len(record_strings): record_string = record_strings[i] if not (record_string.startswith("[") or record_string.startswith("{")): i += 1 continue if record_string.startswith("{"): commentary = "" while i < len(record_strings): record_string = record_strings[i] if record_string.startswith("["): break commentary += record_string + " " i += 1 record_dict["Commentary"] = commentary.strip() continue if record_string.startswith("[") and "]" not in record_string: while "]" not in record_string: i += 1 record_string = record_string + record_strings[i] record_string = record_string.replace("[", "").replace("]", "") key, value = record_string.split(maxsplit=1) value = value.replace('"', "") if key == "Note": number, message = value.split(":", maxsplit=1) key = key + "_" + number value = message record_dict[key] = value if key == "Auction": auction_record = [] i += 1 while i < len(record_strings): auction_str = record_strings[i] if "[" in auction_str: break auction_record.extend(auction_str.split()) i += 1 record_dict["bidding_record"] = auction_record elif key == "Play": play_record = [] i += 1 while i < len(record_strings): play_str = record_strings[i] if "[" in play_str or play_str == "*": break play_record.append(play_str.split()) i += 1 record_dict["play_record"] = play_record elif key == "OptimumResultTable": dds_tricks_table = [] i += 1 while i < len(record_strings): dds_str = record_strings[i] if "[" in dds_str or dds_str == "*": break dds_tricks_table.append(dds_str.split()) i += 1 print(dds_tricks_table) record_dict["dds_tricks_table"] = dds_tricks_table else: i += 1 return record_dict def parse_single_pbn_record(record_strings): """ :param record_strings: One string per line of a single PBN deal record :return: Deal and BoardRecord corresponding to the PBN record """ record_dict = _build_record_dict(record_strings) try: deal = bridgebots.pbn.from_pbn_deal(record_dict["Dealer"], record_dict["Vulnerable"], record_dict["Deal"]) except KeyError as e: # if previous_deal: # deal = previous_deal # else: raise ValueError("Missing deal fields and no previous_deal provided") from e # board_record = _parse_board_record(record_dict, deal) return deal, record_dict def parse_pbn(file_path): """ Split PBN file into boards then decompose those boards into Deal and BoardRecord objects. Only supports PBN v1.0 See https://www.tistis.nl/pbn/pbn_v10.txt :param file_path: path to a PBN file :return: A list of DealRecords representing all the boards played """ records_strings = bridgebots.pbn._split_pbn(file_path) # Maintain a mapping from deal to board records to create a single deal record per deal records = defaultdict(list) # Some PBNs have multiple board records per deal previous_deal = None for record_strings in records_strings: try: deal, board_record = parse_single_pbn_record(record_strings) records[deal].append(board_record) # previous_deal = deal except (KeyError, ValueError) as e: logging.warning(f"Malformed record {record_strings}: {e}") return [(deal, board_records) for deal, board_records in records.items()] def create_single_pbn_string( data: pd.DataFrame, date=datetime.today(), board_no=1, event='', site='', ) -> str: year = date.strftime("%y") month = date.strftime("%m") day = date.strftime("%d") date_print = day + "." + month + "." + year dealer = DEALER_LIST[(board_no-1) % 4] vulnerability=VULNERABILITY_LIST[(board_no-1) % 16] deal = 'N:' deal += ' '.join( ['.'.join(data[col]) for col in data.columns[1:]] ) # sss.hhh.ddd.ccc sss.hhh.ddd.ccc...... dd_tricks = get_result_table(deal) file = '' file += ("%This PBN was generated by Bridge Hand Scanner\n") file += f'[Event "{event}"]\n' file += f'[Site "{site}"]\n' file += f'[Date "{date_print}"]\n' file += f'[Board "{str(board_no)}"]\n' file += f'[Dealer "{dealer}"]\n' file += f'[Vulnerable "{vulnerability}"]\n' file += f'[Deal "{deal}"]\n' file += dd_tricks file += '\n' return file def merge_pbn(pbn_paths): fd, fn = tempfile.mkstemp(suffix='.pbn', text=True) board_dict = {} for i, pbn_path in enumerate(pbn_paths): result = parse_pbn(pbn_path)[0] with open(pbn_path, 'r') as f2: pbn_str = f2.read() board_no = int(result[1][0]['Board']) board_dict[board_no] = pbn_str ordered_board_dict = dict(sorted(board_dict.items())) with open(fd, 'w') as f: for i, (k,v) in enumerate(ordered_board_dict.items()): if i != 0: f.write('\n') f.write(v) return fn def validate_pbn(pbn_string): try: deal, record = parse_single_pbn_record(pbn_string) except AssertionError: raise ValueError('Everyone should have 13 cards') except Exception as e: raise Exception(e) hands = deal.hands duplicated = set() missing = set() validation_dict = {} for suit in bridgebots.Suit: cards = [hands[direction].suits[suit] for direction in hands] # assert len(cards) == 13, cards = [c for c in sum([hands[direction].suits[suit] for direction in hands], [])] duplicated.update([bridgebots.Card(suit,val) for val,cnt in Counter(cards).items() if cnt >1]) cards_set = set(cards) missing.update([bridgebots.Card(suit,r) for r in bridgebots.Rank if not r in cards_set]) err_msg = '' if len(duplicated) > 0: err_msg += f'Duplicated cards: {duplicated}. ' if len(missing) > 0: err_msg += f'Missing cards: {missing}. ' for direction in hands: num_cards = len(hands[direction].cards) if not num_cards == 13: err_msg += '{direction.name} has {num_cards} cards. ' if err_msg: raise ValueError(err_msg) def parse_dds_table(raw_list): table = [] row = [] tempchar = '' for x in raw_list: if len(x) > 0: tempchar += x[0] else: if len(tempchar) > 0: row.append(tempchar) tempchar = '' if len(row) == 3: table.append(row) row = [] return table