import threading import random from typing import Tuple import re import copy import os import json import openai import time openai.api_key = os.environ["OPENAI_KEY"] SIZE = 15 RIDDLE_COUNT = 3 MIN_WORD_SIZE = 4 MAX_WORD_SIZE = 10 WORD_LIMIT = 7500 COMPLETE_GAME_TIMEOUT = 60 * 60 INCOMPLETE_GAME_TIMEOUT = 60 * 60 * 24 GUESS_TIMEOUT = 30 with open("words.json", "r") as f: words = json.loads(f.read()) with open("prompt.txt", "r") as f: RIDDLE_PROMPT = f.read() search_text = "\n".join(words) games = {} all_games_lock = threading.Lock() def get_riddle(answer): prompt = RIDDLE_PROMPT.format( answer, ) while True: try: completions = openai.Completion.create( engine="text-davinci-003", prompt=prompt, max_tokens=200, n=1, temperature=0.5 ) riddle = completions["choices"][0]["text"] return riddle except Exception as e: print("OpenAI Error", e, "Retrying...") time.sleep(0.5) class Clue: def __init__(self, answer, location, across, solved): self.answer: str = answer self.location: Tuple[int, int] = location self.across: bool = across self.solved: bool = solved self.riddle: str = "" self.create_time: int self.solver: str = None self.timed_out: bool = False def __repr__(self): return f"{self.answer}: {self.location}, {'Across' if self.across else 'Down'}, {'Solved' if self.solved else 'Unsolved'}" class Game: def __init__(self, room_name, competitive, init_word): self.room_name = room_name self.competitive = competitive self.player_scores = {} self.grid = [[None for i in range(SIZE)] for j in range(SIZE)] self.grid = place_on_grid( self.grid, init_word, (SIZE // 2, SIZE // 2 - len(init_word) // 2), True ) self.clues = [None] * RIDDLE_COUNT self.previous_clues = [] self.lock = threading.Lock() self.complete = False self.last_update_index = 0 self.last_update_time = time.time() self.last_riddle_update_time = None self.pending_request = False games[room_name] = self def update(self): self.last_update_index += 1 self.last_update_time = time.time() def replace_clue(self, index): clue_grid = copy.deepcopy(self.grid) for j, clue in enumerate(self.clues): if clue and index != j: clue_grid = place_on_grid(clue_grid, clue.answer, clue.location, clue.across) if self.clues[index]: self.previous_clues.append(self.clues[index]) clue = find_clue(clue_grid) if clue is None: self.complete = True return clue.create_time = time.time() self.pending_request = True clue.riddle = get_riddle(clue.answer) self.pending_request = False self.last_riddle_update_time = time.time() self.clues[index] = clue def add_player(self, player_name): with self.lock: self.update() if player_name not in self.player_scores: self.player_scores[player_name] = 0 def player_guess(self, player_name, guess): guess = guess.lower() if self.pending_request: return with self.lock: self.update() matched_clues = [ clue for clue in self.clues if not clue.solved and clue.answer == guess ] if len(matched_clues) == 0: return False for clue in matched_clues: clue.solved = True clue.solver = player_name place_on_grid(self.grid, clue.answer, clue.location, clue.across) self.player_scores[player_name] += 1 def place_on_grid(grid, word, location, across): x, y = location if across: grid[x][y : y + len(word)] = word else: for i, letter in enumerate(word): grid[x + i][y] = letter return grid def find_clue(grid) -> Clue: all_coordinate_pairs = [ (i, j) for i in range(SIZE) for j in range(SIZE) if grid[i][j] is not None ] random.shuffle(all_coordinate_pairs) for i, j in all_coordinate_pairs: regexes = [] if (j == 0 or grid[i][j - 1] is None) and ( j + 1 == SIZE or grid[i][j + 1] is None ): running_regex = "" possible_across_regexes = [] for k in range(j, -1, -1): if ( (i != 0 and grid[i - 1][k] is not None) or (i != SIZE - 1 and grid[i + 1][k] is not None) ) and grid[i][k] is None: break valid = k == 0 or grid[i][k - 1] is None running_regex = (grid[i][k] or ".") + running_regex possible_across_regexes.append(((i, k), running_regex, valid)) possible_across_regexes = [p for p in possible_across_regexes if p[2]] for k in range(j + 1, SIZE): if ( (i != 0 and grid[i - 1][k] is not None) or (i != SIZE - 1 and grid[i + 1][k] is not None) ) and grid[i][k] is None: break valid = k == SIZE - 1 or grid[i][k + 1] is None for start, possible_across_regex, _ in possible_across_regexes[:]: if start[1] + len(possible_across_regex) == k: possible_across_regexes.append( (start, possible_across_regex + (grid[i][k] or "."), valid) ) possible_across_regexes = [ (loc, regex, True) for loc, regex, valid in possible_across_regexes if len(regex) >= MIN_WORD_SIZE and valid ] regexes.extend(possible_across_regexes) elif (i == 0 or grid[i - 1][j] is None) and ( i + 1 == SIZE or grid[i + 1][j] is None ): running_regex = "" possible_down_regexes = [] for k in range(i, -1, -1): if ( (j != 0 and grid[k][j - 1] is not None) or (j != SIZE - 1 and grid[k][j + 1] is not None) ) and grid[k][j] is None: break valid = k == 0 or grid[k - 1][j] is None running_regex = (grid[k][j] or ".") + running_regex possible_down_regexes.append(((k, j), running_regex, valid)) possible_down_regexes = [p for p in possible_down_regexes if p[2]] for k in range(i + 1, SIZE): if ( (j != 0 and grid[k][j - 1] is not None) or (j != SIZE - 1 and grid[k][j + 1] is not None) ) and grid[k][j] is None: break valid = k == SIZE - 1 or grid[k + 1][j] is None for start, possible_down_regex, _ in possible_down_regexes[:]: if start[0] + len(possible_down_regex) == k: possible_down_regexes.append( (start, possible_down_regex + (grid[k][j] or "."), valid) ) possible_down_regexes = [ (loc, regex, False) for loc, regex, valid in possible_down_regexes if len(regex) >= MIN_WORD_SIZE and valid ] regexes.extend(possible_down_regexes) random.shuffle(regexes) for loc, regex, across in regexes: matches = re.findall("^" + regex + "$", search_text, re.MULTILINE) if len(matches) > 1: random.shuffle(matches) answer = matches[0] clue = Clue(answer, loc, across, False) return clue return None def new_game(room_name): competitive = room_name != "" with all_games_lock: if room_name in games: return games[room_name] if not competitive: while room_name == "" or room_name in games: room_name = str(random.randint(0, 999999)) init_word = random.choice(words) else: init_word = room_name return Game(room_name, competitive, init_word) def game_thread(): while True: now = time.time() for room_name, game in games.items(): idle_time = now - game.last_update_time if (game.complete and idle_time > COMPLETE_GAME_TIMEOUT) or ( idle_time > INCOMPLETE_GAME_TIMEOUT ): del games[room_name] continue for i, clue in enumerate(game.clues): timed_out = now - clue.create_time > GUESS_TIMEOUT if clue is not None else None if timed_out: game.clues[i].timed_out = True if clue is None or clue.solved or timed_out: game.replace_clue(i) time.sleep(0.1) thread = threading.Thread(target=game_thread) thread.daemon = True thread.start()