import json import logging import hashlib from dataclasses import dataclass, field, InitVar, asdict from typing import List, Dict, Union from datetime import datetime, date import pandas as pd from google.cloud import bigquery @dataclass class Player: player_backend_user_id: str player_nickname: str = field(default=None) player_group: int = field(default=None) partners: List[str] = field(default_factory=list) badges: List[str] = field(default_factory=list) rewards_status: Union[str, Dict[str, Dict[str, Union[bool, str]]]] = field( default_factory=dict ) adventure_logs: List[str] = field(default_factory=list) total_active_days: int = field(default=0) total_gained_scores: int = field(default=0) total_finished_contents: int = field(default=0) created_at_date: datetime.date = field(default_factory=date.today) updated_at_date: datetime.date = field(default_factory=date.today) available_achievements: InitVar[list] = field(default=None) init: InitVar[bool] = field(default=False) def __post_init__(self, available_achievements: List, init: bool): # If is_init is True, it means that the player is newly created # and the rewards_status is not yet initialized if init: self.assign_group() self.add_adventure_log( "你開始了星空探險隊的冒險!, {date}".format(date=datetime.now().date()) ) if available_achievements is not None: self.rewards_status.update( { achievement_key: { "is_completed": False, "is_issued": False, } for achievement_key in available_achievements } ) self.serialize_rewards_status() else: self.badges = self.badges.tolist() self.partners = self.partners.tolist() self.adventure_logs = self.adventure_logs.tolist() self.assign_weekly_partner() # Serialization and Deserialization is for content type of rewards_status # rewards_status have to be serialized before inserting into BigQuery def serialize_rewards_status(self): self.rewards_status = json.dumps(self.rewards_status) def deserialize_rewards_status(self): self.rewards_status = json.loads(self.rewards_status) def add_partner(self, partner: str): if partner and partner not in self.partners: self.partners.append(partner) logging.info( f"Player {self.player_backend_user_id} has a new partner: {partner}" ) def add_badge(self, badge: str): if badge and badge not in self.badges: self.badges.append(badge) logging.info( f"Player {self.player_backend_user_id} earned a new badge: {badge}" ) def add_adventure_log(self, log: str): self.adventure_logs.append(log) logging.info( f"Player {self.player_backend_user_id} has a new adventure log: {log}" ) def update_total_gained_scores(self, gained_scores: int): self.total_gained_scores = gained_scores def update_total_finished_contents(self, finished_contents_count: int): self.total_finished_contents = finished_contents_count def update_total_active_days(self, active_days: int): self.total_active_days = active_days def update_rewards_status( self, key: str, value: bool, target=["is_completed", "is_issued"] ): self.deserialize_rewards_status() self.rewards_status[key][target] = value self.serialize_rewards_status() def hash_user_id(self): hashed = hashlib.sha256(self.player_backend_user_id.encode()).hexdigest() hash_int = int(hashed, 16) return hash_int % 40 def assign_group(self): self.player_group = self.hash_user_id() partner_group = self.player_group // 10 group_base_partners = { 0: "phoenix_1", 1: "pegasus_1", 2: "dragon_1", 3: "griffin_1", } self.add_partner(group_base_partners[partner_group]) def assign_weekly_partner(self): event_start_date = datetime(2023, 12, 4).date() current_date = datetime.now().date() weeks_elapsed = (current_date - event_start_date).days // 7 if weeks_elapsed not in range(0, 4): return group_base_partners = { 0: "phoenix", 1: "pegasus", 2: "dragon", 3: "griffin", } base_partner = group_base_partners[self.player_group // 10] stage_partner = ( f"{base_partner}_{weeks_elapsed + 1}" # +1 to start from stage 1 ) if stage_partner not in self.partners: self.add_partner(stage_partner) def display_player_info(self): logging.info(f"Player Backend User ID: {self.player_backend_user_id}") logging.info(f"Player Group: {self.player_group}") logging.info(f"Partners: {self.partners}") logging.info(f"Badges: {self.badges}") logging.info(f"Adventure Logs: {self.adventure_logs}") logging.info(f"Rewards Status: {self.rewards_status}") logging.info(f"Total Gained Scores: {self.total_gained_scores}") logging.info(f"Total Finished Contents: {self.total_finished_contents}") logging.info(f"Total Active Days: {self.total_active_days}") logging.info(f"Created At: {self.created_at_date}") logging.info(f"Updated At: {self.updated_at_date}") def get_incomplete_rewards(self): self.deserialize_rewards_status() incomplete_rewards = list( { k: v for k, v in self.rewards_status.items() if not v["is_completed"] }.keys() ) self.serialize_rewards_status() return incomplete_rewards @staticmethod def get_big_query_schema(): return [ bigquery.SchemaField("player_backend_user_id", "STRING", mode="REQUIRED"), bigquery.SchemaField("player_group", "INTEGER", mode="NULLABLE"), bigquery.SchemaField("partners", "STRING", mode="REPEATED"), bigquery.SchemaField("badges", "STRING", mode="REPEATED"), bigquery.SchemaField("rewards_status", "STRING", mode="REQUIRED"), bigquery.SchemaField("adventure_logs", "STRING", mode="REPEATED"), bigquery.SchemaField("total_active_days", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("total_gained_scores", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("total_finished_contents", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("created_at_date", "DATE", mode="REQUIRED"), bigquery.SchemaField("updated_at_date", "DATE", mode="REQUIRED"), ] @staticmethod def from_dict(series: pd.Series) -> "Player": """ Creates a Player instance from a pandas Series. Args: series (pd.Series): A pandas Series with player attributes. Returns: Player: A Player instance. """ data = series.copy() return Player(**data) def to_dict(self) -> Dict: """ Converts a Player instance to a dictionary. Returns: Dict: A dictionary representation of the Player instance. """ data = asdict(self) # Convert datetime.date objects to string for date_field in ['created_at_date', 'updated_at_date']: if data.get(date_field): data[date_field] = data[date_field].strftime("%Y-%m-%d") return data