crossword / game_manager.py
Ali Abid
changes
defc452
import threading
import random
from typing import Tuple
import re
import copy
import os
import json
import openai
import time
openai.api_key = os.environ["OPENAI_KEY"]
SIZE = 15
RIDDLE_COUNT = 3
MIN_WORD_SIZE = 4
MAX_WORD_SIZE = 10
WORD_LIMIT = 7500
COMPLETE_GAME_TIMEOUT = 60 * 60
INCOMPLETE_GAME_TIMEOUT = 60 * 60 * 24
GUESS_TIMEOUT = 30
with open("words.json", "r") as f:
words = json.loads(f.read())
with open("prompt.txt", "r") as f:
RIDDLE_PROMPT = f.read()
search_text = "\n".join(words)
games = {}
all_games_lock = threading.Lock()
def get_riddle(answer):
prompt = RIDDLE_PROMPT.format(
answer,
)
while True:
try:
completions = openai.Completion.create(
engine="text-davinci-003", prompt=prompt, max_tokens=200, n=1, temperature=0.5
)
riddle = completions["choices"][0]["text"]
return riddle
except Exception as e:
print("OpenAI Error", e, "Retrying...")
time.sleep(0.5)
class Clue:
def __init__(self, answer, location, across, solved):
self.answer: str = answer
self.location: Tuple[int, int] = location
self.across: bool = across
self.solved: bool = solved
self.riddle: str = ""
self.create_time: int
self.solver: str = None
self.timed_out: bool = False
def __repr__(self):
return f"{self.answer}: {self.location}, {'Across' if self.across else 'Down'}, {'Solved' if self.solved else 'Unsolved'}"
class Game:
def __init__(self, room_name, competitive, init_word):
self.room_name = room_name
self.competitive = competitive
self.player_scores = {}
self.grid = [[None for i in range(SIZE)] for j in range(SIZE)]
self.grid = place_on_grid(
self.grid, init_word, (SIZE // 2, SIZE // 2 - len(init_word) // 2), True
)
self.clues = [None] * RIDDLE_COUNT
self.previous_clues = []
self.lock = threading.Lock()
self.complete = False
self.last_update_index = 0
self.last_update_time = time.time()
self.last_riddle_update_time = None
self.pending_request = False
games[room_name] = self
def update(self):
self.last_update_index += 1
self.last_update_time = time.time()
def replace_clue(self, index):
clue_grid = copy.deepcopy(self.grid)
for j, clue in enumerate(self.clues):
if clue and index != j:
clue_grid = place_on_grid(clue_grid, clue.answer, clue.location, clue.across)
if self.clues[index]:
self.previous_clues.append(self.clues[index])
clue = find_clue(clue_grid)
if clue is None:
self.complete = True
return
clue.create_time = time.time()
self.pending_request = True
clue.riddle = get_riddle(clue.answer)
self.pending_request = False
self.last_riddle_update_time = time.time()
self.clues[index] = clue
def add_player(self, player_name):
with self.lock:
self.update()
if player_name not in self.player_scores:
self.player_scores[player_name] = 0
def player_guess(self, player_name, guess):
guess = guess.lower()
if self.pending_request:
return
with self.lock:
self.update()
matched_clues = [
clue for clue in self.clues if not clue.solved and clue.answer == guess
]
if len(matched_clues) == 0:
return False
for clue in matched_clues:
clue.solved = True
clue.solver = player_name
place_on_grid(self.grid, clue.answer, clue.location, clue.across)
self.player_scores[player_name] += 1
def place_on_grid(grid, word, location, across):
x, y = location
if across:
grid[x][y : y + len(word)] = word
else:
for i, letter in enumerate(word):
grid[x + i][y] = letter
return grid
def find_clue(grid) -> Clue:
all_coordinate_pairs = [
(i, j) for i in range(SIZE) for j in range(SIZE) if grid[i][j] is not None
]
random.shuffle(all_coordinate_pairs)
for i, j in all_coordinate_pairs:
regexes = []
if (j == 0 or grid[i][j - 1] is None) and (
j + 1 == SIZE or grid[i][j + 1] is None
):
running_regex = ""
possible_across_regexes = []
for k in range(j, -1, -1):
if (
(i != 0 and grid[i - 1][k] is not None)
or (i != SIZE - 1 and grid[i + 1][k] is not None)
) and grid[i][k] is None:
break
valid = k == 0 or grid[i][k - 1] is None
running_regex = (grid[i][k] or ".") + running_regex
possible_across_regexes.append(((i, k), running_regex, valid))
possible_across_regexes = [p for p in possible_across_regexes if p[2]]
for k in range(j + 1, SIZE):
if (
(i != 0 and grid[i - 1][k] is not None)
or (i != SIZE - 1 and grid[i + 1][k] is not None)
) and grid[i][k] is None:
break
valid = k == SIZE - 1 or grid[i][k + 1] is None
for start, possible_across_regex, _ in possible_across_regexes[:]:
if start[1] + len(possible_across_regex) == k:
possible_across_regexes.append(
(start, possible_across_regex + (grid[i][k] or "."), valid)
)
possible_across_regexes = [
(loc, regex, True)
for loc, regex, valid in possible_across_regexes
if len(regex) >= MIN_WORD_SIZE and valid
]
regexes.extend(possible_across_regexes)
elif (i == 0 or grid[i - 1][j] is None) and (
i + 1 == SIZE or grid[i + 1][j] is None
):
running_regex = ""
possible_down_regexes = []
for k in range(i, -1, -1):
if (
(j != 0 and grid[k][j - 1] is not None)
or (j != SIZE - 1 and grid[k][j + 1] is not None)
) and grid[k][j] is None:
break
valid = k == 0 or grid[k - 1][j] is None
running_regex = (grid[k][j] or ".") + running_regex
possible_down_regexes.append(((k, j), running_regex, valid))
possible_down_regexes = [p for p in possible_down_regexes if p[2]]
for k in range(i + 1, SIZE):
if (
(j != 0 and grid[k][j - 1] is not None)
or (j != SIZE - 1 and grid[k][j + 1] is not None)
) and grid[k][j] is None:
break
valid = k == SIZE - 1 or grid[k + 1][j] is None
for start, possible_down_regex, _ in possible_down_regexes[:]:
if start[0] + len(possible_down_regex) == k:
possible_down_regexes.append(
(start, possible_down_regex + (grid[k][j] or "."), valid)
)
possible_down_regexes = [
(loc, regex, False)
for loc, regex, valid in possible_down_regexes
if len(regex) >= MIN_WORD_SIZE and valid
]
regexes.extend(possible_down_regexes)
random.shuffle(regexes)
for loc, regex, across in regexes:
matches = re.findall("^" + regex + "$", search_text, re.MULTILINE)
if len(matches) > 1:
random.shuffle(matches)
answer = matches[0]
clue = Clue(answer, loc, across, False)
return clue
return None
def new_game(room_name):
competitive = room_name != ""
with all_games_lock:
if room_name in games:
return games[room_name]
if not competitive:
while room_name == "" or room_name in games:
room_name = str(random.randint(0, 999999))
init_word = random.choice(words)
else:
init_word = room_name
return Game(room_name, competitive, init_word)
def game_thread():
while True:
now = time.time()
for room_name, game in games.items():
idle_time = now - game.last_update_time
if (game.complete and idle_time > COMPLETE_GAME_TIMEOUT) or (
idle_time > INCOMPLETE_GAME_TIMEOUT
):
del games[room_name]
continue
for i, clue in enumerate(game.clues):
timed_out = now - clue.create_time > GUESS_TIMEOUT if clue is not None else None
if timed_out:
game.clues[i].timed_out = True
if clue is None or clue.solved or timed_out:
game.replace_clue(i)
time.sleep(0.1)
thread = threading.Thread(target=game_thread)
thread.daemon = True
thread.start()