Spaces:
Sleeping
Sleeping
from collections import defaultdict | |
import logging | |
import bridgebots | |
from bridgebots import PlayerHand, Rank | |
from pathlib import Path | |
from datetime import datetime | |
import pandas as pd | |
import tempfile | |
from dds_util import get_dd_tricks | |
from typing import List, Dict | |
import os | |
DEALER_LIST = ['N', 'E', 'S', 'W'] | |
VULNERABILITY_LIST = ["None","NS","EW","All","NS","EW","All","None","EW","All","None","NS","All","None","NS","EW"] | |
SUITS = [bridgebots.Suit.SPADES, bridgebots.Suit.HEARTS, bridgebots.Suit.DIAMONDS, bridgebots.Suit.CLUBS] | |
def _build_record_dict(record_strings: List[str]) -> Dict: | |
""" | |
Parse the pbn line by line. When a block section like "Auction" or "Play" is encountered, collect all the content of | |
the block into a single entry | |
:param record_strings: List of string lines for a single board | |
:return: A dictionary mapping keys from the pbn to strings or other useful values (e.g. list of strings for the | |
bidding record) | |
""" | |
record_dict = {} | |
# Janky while loop to handle non bracketed lines | |
i = 0 | |
while i < len(record_strings): | |
record_string = record_strings[i] | |
if not (record_string.startswith("[") or record_string.startswith("{")): | |
i += 1 | |
continue | |
if record_string.startswith("{"): | |
commentary = "" | |
while i < len(record_strings): | |
record_string = record_strings[i] | |
if record_string.startswith("["): | |
break | |
commentary += record_string + " " | |
i += 1 | |
record_dict["Commentary"] = commentary.strip() | |
continue | |
if record_string.startswith("[") and "]" not in record_string: | |
while "]" not in record_string: | |
i += 1 | |
record_string = record_string + record_strings[i] | |
record_string = record_string.replace("[", "").replace("]", "") | |
key, value = record_string.split(maxsplit=1) | |
value = value.replace('"', "") | |
if key == "Note": | |
number, message = value.split(":", maxsplit=1) | |
key = key + "_" + number | |
value = message | |
record_dict[key] = value | |
if key == "Auction": | |
auction_record = [] | |
i += 1 | |
while i < len(record_strings): | |
auction_str = record_strings[i] | |
if "[" in auction_str: | |
break | |
auction_record.extend(auction_str.split()) | |
i += 1 | |
record_dict["bidding_record"] = auction_record | |
elif key == "Play": | |
play_record = [] | |
i += 1 | |
while i < len(record_strings): | |
play_str = record_strings[i] | |
if "[" in play_str or play_str == "*": | |
break | |
play_record.append(play_str.split()) | |
i += 1 | |
record_dict["play_record"] = play_record | |
elif key == "OptimumResultTable": | |
dds_tricks_table = [] | |
i += 1 | |
while i < len(record_strings): | |
dds_str = record_strings[i] | |
if "[" in dds_str or dds_str == "*": | |
break | |
dds_tricks_table.append(dds_str.split()) | |
i += 1 | |
record_dict["dds_tricks_table"] = dds_tricks_table | |
else: | |
i += 1 | |
return record_dict | |
def parse_single_pbn_record(record_strings): | |
""" | |
:param record_strings: One string per line of a single PBN deal record | |
:return: Deal and BoardRecord corresponding to the PBN record | |
""" | |
record_dict = _build_record_dict(record_strings) | |
try: | |
deal = bridgebots.pbn.from_pbn_deal(record_dict["Dealer"], record_dict["Vulnerable"], record_dict["Deal"]) | |
except KeyError as e: | |
# if previous_deal: | |
# deal = previous_deal | |
# else: | |
raise ValueError("Missing deal fields and no previous_deal provided") from e | |
# board_record = _parse_board_record(record_dict, deal) | |
return deal, record_dict | |
def parse_pbn(file_path): | |
""" | |
Split PBN file into boards then decompose those boards into Deal and BoardRecord objects. Only supports PBN v1.0 | |
See https://www.tistis.nl/pbn/pbn_v10.txt | |
:param file_path: path to a PBN file | |
:return: A list of DealRecords representing all the boards played | |
""" | |
records_strings = bridgebots.pbn._split_pbn(file_path) | |
# Maintain a mapping from deal to board records to create a single deal record per deal | |
records = defaultdict(list) | |
# Some PBNs have multiple board records per deal | |
previous_deal = None | |
for record_strings in records_strings: | |
try: | |
deal, board_record = parse_single_pbn_record(record_strings) | |
records[deal].append(board_record) | |
# previous_deal = deal | |
except (KeyError, ValueError) as e: | |
logging.warning(f"Malformed record {record_strings}: {e}") | |
return [(deal, board_records) for deal, board_records in records.items()] | |
def create_pbn_file( | |
data: pd.DataFrame, | |
save_path = None, | |
date=datetime.today(), | |
board_no=1, | |
event='', | |
site='', | |
) -> str: | |
if not save_path: | |
fd, save_path = tempfile.mkstemp(suffix='.pbn') | |
os.close(fd) | |
year = date.strftime("%y") | |
month = date.strftime("%m") | |
day = date.strftime("%d") | |
date_print = day + "." + month + "." + year | |
dealer = DEALER_LIST[(board_no-1) % 4] | |
vulnerability=VULNERABILITY_LIST[(board_no-1) % 16] | |
deal = pbn_deal_string(data, dealer) # sss.hhh.ddd.ccc sss.hhh.ddd.ccc...... | |
file = '' | |
file += ("%This PBN was generated by Bridge Hand Scanner\n") | |
file += f'[Event "{event}"]\n' | |
file += f'[Site "{site}"]\n' | |
file += f'[Date "{date_print}"]\n' | |
file += f'[Board "{str(board_no)}"]\n' | |
file += f'[Dealer "{dealer}"]\n' | |
file += f'[Vulnerable "{vulnerability}"]\n' | |
file += f'[Deal "{deal}"]\n' | |
file += pbn_optimum_table(deal) | |
file += '\n' # End of board | |
with open(save_path, 'w') as f: | |
f.write(file) | |
return file | |
def merge_pbn(pbn_paths): | |
fd, fn = tempfile.mkstemp(suffix='.pbn', text=True) | |
board_dict = {} | |
for i, pbn_path in enumerate(pbn_paths): | |
result = parse_pbn(pbn_path)[0] | |
with open(pbn_path, 'r') as f2: | |
pbn_str = f2.read() | |
board_no = int(result[1][0]['Board']) | |
board_dict[board_no] = pbn_str | |
ordered_board_dict = dict(sorted(board_dict.items())) | |
with open(fd, 'w') as f: | |
for i, (k,v) in enumerate(ordered_board_dict.items()): | |
if i != 0: | |
if v[-1] != "\n": | |
f.write('\n') | |
f.write(v) | |
return fn | |
def pbn_deal_string(df, dealer): | |
assert dealer in DEALER_LIST, f'Dealer {dealer} is not valid' | |
dealer_idx = DEALER_LIST.index(dealer) | |
dealer_list_rolled = DEALER_LIST[dealer_idx:] + DEALER_LIST[:dealer_idx] | |
deal = f'{dealer}:' | |
hand_reprs = [] | |
for direction in dealer_list_rolled: | |
hand = PlayerHand.from_string_lists(*df[direction]) | |
hand_reprs.append('.'.join([''.join([r.abbreviation() for r in hand.suits[suit]]) for suit in SUITS])) | |
deal += ' '.join(hand_reprs) | |
return deal | |
DD_DIRECTIONS = ['N', 'S', 'E', 'W'] | |
DD_SUITS = ['NT', 'S', 'H', 'D', 'C'] | |
def pbn_optimum_table(pbn_deal_string): | |
try: | |
dd_tricks_string = get_dd_tricks(pbn_deal_string) | |
except Exception as e: | |
print(e) | |
raise ValueError(e) | |
s = '[OptimumResultTable "Declarer;Denomination\\2R;Result\\2R"]\n' | |
for i, char in enumerate(dd_tricks_string): | |
direction = DD_DIRECTIONS[i//5] | |
suit = DD_SUITS[i%5] | |
tricks = int(char, 16) | |
s += f'{direction} {suit} {tricks}\n' | |
return s |