Visual-Reasoning / frozenlake /frozenlake_processor.py
Jayce-Ping's picture
Add files using upload-large-folder tool
a6c53e4 verified
"""
FrozenLakeProcessor - FrozenLake puzzle generation, solving, rendering, and evaluation.
Grid cells: S=Start, F=Frozen(safe), H=Hole(death), G=Goal
Table chars: @=Start, _=Frozen, #=Hole, *=Goal
Generation strategy:
- ``generate()``: Pure random + BFS retry. Fine for small grids (≤16).
- ``generate_guided()``: Lay a random walk path first, then fill remaining
cells. Guarantees long paths even at 32×32+ without exponential retries.
- ``generate_auto()``: Auto-select best strategy based on difficulty.
- ``generate_batch()``: Multiprocessing wrapper for high-throughput.
Solving uses plain BFS (~10× faster than networkx).
"""
import os
import random
import warnings
from collections import deque
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List, Tuple, Optional
import numpy as np
from PIL import Image, ImageDraw
try:
os.environ.setdefault("SDL_AUDIODRIVER", "dummy")
warnings.filterwarnings("ignore", category=UserWarning, module="pygame")
warnings.filterwarnings("ignore", category=DeprecationWarning)
import gymnasium as gym
HAS_GYM = True
except ImportError:
HAS_GYM = False
TABLE_TO_GRID = {"@": "S", "_": "F", "#": "H", "*": "G"}
GRID_TO_TABLE = {v: k for k, v in TABLE_TO_GRID.items()}
MOVES = {"U": (-1, 0), "D": (1, 0), "L": (0, -1), "R": (0, 1)}
GYM_ACTION_MAP = {"L": 0, "D": 1, "R": 2, "U": 3}
GridDesc = List[str]
class FrozenLakeProcessor:
"""FrozenLake generation, BFS solving, rendering, and evaluation."""
def __init__(self, img_size: int = 512):
self.img_size = img_size
self.path_color = "red"
# ==================== Generation: Pure Random ====================
def generate(
self, size: int, p: float = 0.8,
min_path_len: int = 1, max_attempts: int = 500,
) -> Tuple[GridDesc, List[Tuple[int, int]]]:
"""
Random layout + BFS retry. Good for small grids or low min_path_len.
For large grids with long path requirements, use ``generate_guided()``.
"""
for _ in range(max_attempts):
desc = self._random_layout(size, p)
path = self.solve(desc)
if path is not None and (len(path) - 1) >= min_path_len:
return desc, path
raise RuntimeError(
f"Failed after {max_attempts} attempts "
f"(size={size}, p={p}, min_path_len={min_path_len})."
)
@staticmethod
def _random_layout(size: int, p: float = 0.8) -> GridDesc:
all_coords = [(r, c) for r in range(size) for c in range(size)]
start, goal = random.sample(all_coords, 2)
grid = []
for r in range(size):
row = []
for c in range(size):
if (r, c) == start:
row.append("S")
elif (r, c) == goal:
row.append("G")
else:
row.append("F" if random.random() < p else "H")
grid.append("".join(row))
return grid
# ==================== Generation: Guided (path-first) ====================
def simplify_path(self, path: List[Tuple[int, int]]) -> List[Tuple[int, int]]:
"""
Reduce the path
"""
if not path:
return path
simplified = [path[0]]
curr_idx = 0
while curr_idx < len(path) - 1:
found_shortcut = False
for next_idx in range(len(path) - 1, curr_idx + 1, -1):
r1, c1 = path[curr_idx]
r2, c2 = path[next_idx]
if abs(r1 - r2) + abs(c1 - c2) == 1:
simplified.append(path[next_idx])
curr_idx = next_idx
found_shortcut = True
break
if not found_shortcut:
curr_idx += 1
simplified.append(path[curr_idx])
return simplified
def generate_guided(
self, size: int, p: float = 0.8,
min_path_len: int = 1, max_attempts: int = 100,
) -> Tuple[GridDesc, List[Tuple[int, int]]]:
"""
Path-first generation using DFS spanning tree diameter.
The walk is a valid S→G path by construction (all walk cells are
Frozen, all others are Holes). We return the walk directly as
the solution path — it may not be the BFS-shortest, but it IS a
valid path of guaranteed minimum length.
"""
for _ in range(max_attempts):
desc, walk = self._guided_layout(size, p, min_path_len)
if desc is None:
continue
optimized_walk = self.simplify_path(walk)
if len(optimized_walk) - 1 >= min_path_len:
return desc, optimized_walk
raise RuntimeError(
f"Guided generation failed after {max_attempts} attempts "
f"(size={size}, p={p}, min_path_len={min_path_len})."
)
def _guided_layout(
self, size: int, p: float, min_path_len: int,
) -> Tuple[Optional[GridDesc], Optional[List[Tuple[int, int]]]]:
"""
Build grid with a guaranteed long path using a DFS spanning tree.
Strategy:
1. Build random spanning tree of the grid via DFS.
2. Find tree diameter (longest path) via double-BFS — guaranteed
unique path, no shortcuts possible.
3. Trim to desired length if much longer than needed.
4. Cells adjacent to ≥2 walk cells but OFF the walk become Holes
(deterministically blocks all shortcuts).
5. Remaining off-path cells are cosmetically filled with p.
Because tree paths are unique, the BFS shortest path in the resulting
grid equals the walk length (no shortcuts exist).
"""
dirs = [(0, 1), (0, -1), (1, 0), (-1, 0)]
# Step 1: Random spanning tree via DFS
adj: dict = {(r, c): [] for r in range(size) for c in range(size)}
vis = [[False] * size for _ in range(size)]
sr, sc = random.randrange(size), random.randrange(size)
vis[sr][sc] = True
stack = [(sr, sc)]
while stack:
r, c = stack[-1]
nbrs = []
for dr, dc in dirs:
nr, nc = r + dr, c + dc
if 0 <= nr < size and 0 <= nc < size and not vis[nr][nc]:
nbrs.append((nr, nc))
if nbrs:
nr, nc = random.choice(nbrs)
vis[nr][nc] = True
adj[(r, c)].append((nr, nc))
adj[(nr, nc)].append((r, c))
stack.append((nr, nc))
else:
stack.pop()
# Step 2: Tree diameter via double-BFS
def _bfs_far(start):
dist = {start: 0}
q = deque([start])
far = start
while q:
node = q.popleft()
for nb in adj[node]:
if nb not in dist:
dist[nb] = dist[node] + 1
q.append(nb)
if dist[nb] > dist[far]:
far = nb
return far, dist
end1, _ = _bfs_far((sr, sc))
end2, dist1 = _bfs_far(end1)
if dist1[end2] < min_path_len:
return None, None
# Step 3: Reconstruct path end1 → end2
prev = {end1: None}
q = deque([end1])
while q:
node = q.popleft()
if node == end2:
break
for nb in adj[node]:
if nb not in prev:
prev[nb] = node
q.append(nb)
walk = []
cur = end2
while cur is not None:
walk.append(cur)
cur = prev[cur]
walk.reverse()
# Optionally trim if much longer
if len(walk) - 1 > min_path_len * 2:
excess = len(walk) - 1 - min_path_len
trim = random.randint(0, excess // 2)
if trim > 0:
walk = walk[trim:]
excess2 = len(walk) - 1 - min_path_len
trim2 = random.randint(0, excess2 // 2)
if trim2 > 0:
walk = walk[: len(walk) - trim2]
start_pos, end_pos = walk[0], walk[-1]
walk_set = set(walk)
# Step 4: Compute adjacency to walk for off-path cells
walk_nbr_ct: dict = {}
for wr, wc in walk:
for dr, dc in dirs:
nr, nc = wr + dr, wc + dc
if 0 <= nr < size and 0 <= nc < size and (nr, nc) not in walk_set:
walk_nbr_ct[(nr, nc)] = walk_nbr_ct.get((nr, nc), 0) + 1
# Step 5: Fill grid.
# ALL non-walk cells are Holes. This guarantees the BFS shortest
# path equals the walk itself (zero shortcut surface).
# The grid will look like a winding corridor through a sea of holes.
grid = [[""] * size for _ in range(size)]
for r in range(size):
for c in range(size):
if (r, c) == start_pos:
grid[r][c] = "S"
elif (r, c) == end_pos:
grid[r][c] = "G"
elif (r, c) in walk_set:
grid[r][c] = "F"
else:
# prob `p` as hole
grid[r][c] = "F" if random.random() < p else "H"
return ["".join(row) for row in grid], walk
# ==================== Generation: Auto ====================
def generate_auto(
self, size: int, p: float = 0.8,
min_path_len: int = 1, max_attempts: int = 200,
) -> Tuple[GridDesc, List[Tuple[int, int]]]:
"""Auto-select: random for easy cases, guided for hard ones."""
expected_max = size * 1.5
if min_path_len > expected_max * 0.5:
return self.generate_guided(size, p, min_path_len, max_attempts)
try:
return self.generate(size, p, min_path_len, max_attempts)
except RuntimeError:
return self.generate_guided(size, p, min_path_len, max_attempts)
# ==================== Batch (multiprocessing) ====================
@staticmethod
def _generate_one(args: tuple) -> Optional[Tuple[GridDesc, list]]:
"""Worker for multiprocessing."""
size, p, min_path_len, seed = args
random.seed(seed)
proc = FrozenLakeProcessor()
try:
return proc.generate_auto(size, p, min_path_len, max_attempts=200)
except RuntimeError:
return None
def generate_batch(
self, size: int, count: int, p: float = 0.8,
min_path_len: int = 1, workers: int = 8, base_seed: int = 42,
) -> List[Tuple[GridDesc, List[Tuple[int, int]]]]:
"""Generate *count* puzzles in parallel."""
tasks = [(size, p, min_path_len, base_seed + i) for i in range(count * 2)]
results = []
with ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(self._generate_one, t): t for t in tasks}
for future in as_completed(futures):
res = future.result()
if res is not None:
results.append(res)
if len(results) >= count:
executor.shutdown(wait=False, cancel_futures=True)
break
return results[:count]
# ==================== Solving (plain BFS) ====================
@staticmethod
def solve(desc: GridDesc) -> Optional[List[Tuple[int, int]]]:
"""BFS shortest path from S to G, avoiding H."""
rows, cols = len(desc), len(desc[0])
start = goal = None
for r in range(rows):
for c in range(cols):
if desc[r][c] == "S":
start = (r, c)
elif desc[r][c] == "G":
goal = (r, c)
if start is None or goal is None:
return None
visited = [[False] * cols for _ in range(rows)]
visited[start[0]][start[1]] = True
queue: deque = deque([(start, [start])])
while queue:
(r, c), path = queue.popleft()
if (r, c) == goal:
return path
for dr, dc in ((-1, 0), (1, 0), (0, -1), (0, 1)):
nr, nc = r + dr, c + dc
if 0 <= nr < rows and 0 <= nc < cols and not visited[nr][nc]:
if desc[nr][nc] != "H":
visited[nr][nc] = True
queue.append(((nr, nc), path + [(nr, nc)]))
return None
# ==================== Path ↔ UDRL ====================
@staticmethod
def path_to_udrl(path: List[Tuple[int, int]]) -> str:
moves = []
for i in range(len(path) - 1):
r1, c1 = path[i]
r2, c2 = path[i + 1]
if r2 < r1: moves.append("U")
elif r2 > r1: moves.append("D")
elif c2 < c1: moves.append("L")
else: moves.append("R")
return "".join(moves)
# ==================== Verification ====================
def verify_path_sim(self, desc: GridDesc, udrl: str) -> bool:
rows, cols = len(desc), len(desc[0])
start = self.find_start(desc)
if start is None:
return False
r, c = start
clean = udrl.replace(",", "").replace(" ", "").strip()
if "Action plan" in clean:
clean = clean.rsplit("Action plan", 1)[-1]
for ch in clean:
if ch not in MOVES:
continue
dr, dc = MOVES[ch]
nr, nc = r + dr, c + dc
if not (0 <= nr < rows and 0 <= nc < cols):
return False
if desc[nr][nc] == "H":
return False
r, c = nr, nc
if desc[nr][nc] == "G":
return True
return desc[r][c] == "G"
def verify_path_gym(self, desc: GridDesc, udrl: str) -> bool:
if not HAS_GYM:
return self.verify_path_sim(desc, udrl)
rows, cols = len(desc), len(desc[0])
try:
env = gym.make(
"FrozenLake-v1", desc=desc,
map_name=f"{rows}x{cols}", is_slippery=False, render_mode=None,
)
env.reset(seed=42)
success = False
clean = udrl.replace(",", "").replace(" ", "").strip()
if "Action plan" in clean:
clean = clean.rsplit("Action plan", 1)[-1]
for ch in clean:
if ch not in GYM_ACTION_MAP:
continue
_, reward, terminated, truncated, _ = env.step(GYM_ACTION_MAP[ch])
if terminated or truncated:
success = reward > 0
break
env.close()
return success
except Exception:
return self.verify_path_sim(desc, udrl)
# ==================== Table I/O ====================
def encode_table(self, desc: GridDesc) -> str:
size = len(desc)
lines = ["| | " + " | ".join(f"Col {i+1}" for i in range(size)) + " |"]
for r in range(size):
mapped = [GRID_TO_TABLE[ch] for ch in desc[r]]
lines.append(f"| Row {r+1} | " + " | ".join(mapped) + " |")
return "\n".join(lines)
def decode_table(self, text: str) -> Optional[GridDesc]:
try:
rows = []
for line in text.strip().splitlines():
line = line.strip()
if not line or "Col" in line or "---" in line:
continue
parts = [p.strip() for p in line.split("|")]
clean = [p for p in parts if p]
if len(clean) < 2:
continue
row_str = "".join(
TABLE_TO_GRID[ch] for ch in clean[1:] if ch in TABLE_TO_GRID
)
if row_str:
rows.append(row_str)
return rows if rows else None
except Exception:
return None
def save_table(self, filepath: str, desc: GridDesc) -> None:
with open(filepath, "w") as f:
f.write(self.encode_table(desc))
def load_table(self, filepath: str) -> Optional[GridDesc]:
try:
with open(filepath) as f:
return self.decode_table(f.read())
except Exception:
return None
def find_start(self, desc: GridDesc) -> Optional[Tuple[int, int]]:
for r, row in enumerate(desc):
for c, ch in enumerate(row):
if ch == "S":
return (r, c)
return None
def fingerprint(self, desc: GridDesc) -> str:
return "".join(desc)
# ==================== Rendering ====================
def render_gym(self, desc: GridDesc) -> Optional[Image.Image]:
if not HAS_GYM:
return None
try:
env = gym.make(
"FrozenLake-v1", desc=desc,
is_slippery=False, render_mode="rgb_array",
)
env.reset()
rgb = env.render()
env.close()
return Image.fromarray(rgb).resize(
(self.img_size, self.img_size), Image.NEAREST
)
except Exception:
return None
def render_simple(self, desc: GridDesc) -> Image.Image:
"""Float-aligned renderer (handles non-power-of-2 sizes correctly)."""
size = len(desc)
cell_f = self.img_size / size
img = Image.new("RGB", (self.img_size, self.img_size), (255, 255, 255))
draw = ImageDraw.Draw(img)
colors = {
"S": (0, 0, 255), "F": (200, 220, 255),
"H": (80, 80, 80), "G": (0, 200, 0),
}
for r in range(size):
for c in range(size):
x0 = int(round(c * cell_f))
y0 = int(round(r * cell_f))
x1 = int(round((c + 1) * cell_f)) - 1
y1 = int(round((r + 1) * cell_f)) - 1
draw.rectangle(
[x0, y0, x1, y1],
fill=colors.get(desc[r][c], (200, 220, 255)),
)
for i in range(size + 1):
pos = int(round(i * cell_f))
draw.line([(pos, 0), (pos, self.img_size)], fill="black", width=1)
draw.line([(0, pos), (self.img_size, pos)], fill="black", width=1)
return img
def render(self, desc: GridDesc, use_gym: bool = True) -> Image.Image:
if use_gym:
img = self.render_gym(desc)
if img is not None:
return img
return self.render_simple(desc)
def draw_solution_line(
self, image: Image.Image, path: List[Tuple[int, int]], grid_size: int,
) -> Image.Image:
draw = ImageDraw.Draw(image)
w, h = image.size
cw, ch_ = w / grid_size, h / grid_size
pts = [(c * cw + cw / 2, r * ch_ + ch_ / 2) for r, c in path]
draw.line(pts, fill=self.path_color, width=max(1, int(cw / 4)), joint="curve")
return image
# ==================== Video Frames ====================
def generate_video_frames(
self, desc: GridDesc, path: List[Tuple[int, int]],
n_start: int = 5, m_end: int = 5,
frames: Optional[int] = None, use_gym: bool = True,
) -> List[Image.Image]:
size = len(desc)
n_steps = len(path) - 1
base_img = self.render(desc, use_gym=use_gym)
if n_steps <= 0:
return [base_img] * (n_start + m_end + 1)
content = frames if frames is not None else n_steps
content = max(1, content)
result = [base_img.copy() for _ in range(n_start)]
def _partial(steps):
return self.draw_solution_line(base_img.copy(), path[:steps+1], size)
if content == n_steps:
for s in range(1, n_steps + 1):
result.append(_partial(s))
elif content > n_steps:
for s in range(1, n_steps + 1):
lo = (s - 1) * content // n_steps
hi = s * content // n_steps
frame = _partial(s)
result.append(frame)
for _ in range(hi - lo - 1):
result.append(frame.copy())
else:
for f in range(content):
result.append(_partial((f + 1) * n_steps // content))
result.extend([_partial(n_steps).copy() for _ in range(m_end)])
return result
# ==================== Red-Path Extraction ====================
def extract_path_from_pixels(
self, pixels: np.ndarray, rows: int, cols: int,
start: Tuple[int, int], desc: Optional[GridDesc] = None,
pixel_threshold: float = 0.01,
) -> str:
"""Detect red path (float-aligned cells to match renderer)."""
img = Image.fromarray(pixels)
w, h = img.size
px = np.array(img, dtype=float)
r_ch, g_ch, b_ch = px[:, :, 0], px[:, :, 1], px[:, :, 2]
red_mask = (r_ch > 100) & (r_ch > g_ch * 1.2) & (r_ch > b_ch * 1.2)
cell_h_f, cell_w_f = h / rows, w / cols
path_grid = np.zeros((rows, cols), dtype=bool)
for r in range(rows):
y0 = int(round(r * cell_h_f))
y1 = int(round((r + 1) * cell_h_f))
for c in range(cols):
x0 = int(round(c * cell_w_f))
x1 = int(round((c + 1) * cell_w_f))
sub = red_mask[y0:y1, x0:x1]
if sub.size > 0 and np.mean(sub) > pixel_threshold:
path_grid[r, c] = True
visited = {start}
cr, cc = start
actions: List[str] = []
for _ in range(rows * cols * 2):
found = False
for act, (dr, dc) in [("R",(0,1)),("D",(1,0)),("L",(0,-1)),("U",(-1,0))]:
nr, nc = cr + dr, cc + dc
if 0 <= nr < rows and 0 <= nc < cols:
if path_grid[nr, nc] and (nr, nc) not in visited:
visited.add((nr, nc))
actions.append(act)
cr, cc = nr, nc
found = True
break
if not found:
break
return "".join(actions)
def extract_path_from_image(self, img_path, rows, cols, start, desc=None):
try:
pixels = np.array(Image.open(img_path).convert("RGB"))
return self.extract_path_from_pixels(pixels, rows, cols, start, desc)
except Exception:
return ""
if __name__ == "__main__":
import time
proc = FrozenLakeProcessor(img_size=512)
# ---- Benchmark: yield rate ----
print("=== Yield Rate: random vs guided ===")
for sz in [8, 16, 32]:
min_len = max(1, int(sz * sz * 0.1))
random.seed(42)
t0 = time.perf_counter()
found_r = 0
for _ in range(500):
desc = proc._random_layout(sz, 0.8)
path = proc.solve(desc)
if path and (len(path) - 1) >= min_len:
found_r += 1
t_rand = time.perf_counter() - t0
random.seed(42)
t0 = time.perf_counter()
found_g = 0
for _ in range(50):
try:
desc, path = proc.generate_guided(sz, 0.8, min_len, max_attempts=5)
found_g += 1
except RuntimeError:
pass
t_guid = time.perf_counter() - t0
print(f" Size {sz:2d} (min={min_len:3d}): "
f"random={found_r}/500 ({found_r/5:.1f}%) {t_rand:.2f}s | "
f"guided={found_g}/50 ({found_g*2:.0f}%) {t_guid:.2f}s")
# ---- generate_auto all sizes ----
print("\n=== generate_auto ===")
for sz in [8, 16, 32, 64]:
min_len = max(1, int(sz * sz * 0.1))
random.seed(42)
t0 = time.perf_counter()
desc, path = proc.generate_auto(sz, 0.8, min_len)
elapsed = time.perf_counter() - t0
udrl = proc.path_to_udrl(path)
ok = proc.verify_path_sim(desc, udrl)
print(f" Size {sz:2d}: path={len(path)-1:3d} (min={min_len:3d}) "
f"verify={ok} {elapsed:.3f}s")
# ---- Extract round-trip (works for random-mode, guided corridors are too winding) ----
print("\n=== Extract round-trip ===")
for sz in [8, 16, 24, 32]:
random.seed(42 + sz)
# Use random mode for smaller sizes (natural-looking grids)
min_len = max(1, sz)
try:
desc, path = proc.generate(sz, 0.8, min_len, max_attempts=1000)
except RuntimeError:
desc, path = proc.generate_guided(sz, 0.8, min_len)
img = proc.render(desc, use_gym=False)
sol = proc.draw_solution_line(img.copy(), path, sz)
start = proc.find_start(desc)
extracted = proc.extract_path_from_pixels(np.array(sol), sz, sz, start)
ok = proc.verify_path_sim(desc, extracted)
print(f" Size {sz:2d}: verify={ok} "
f"(GT={len(path)-1}, extracted={len(extracted)})")
print("\nAll tests passed ✓")