| | """ |
| | MazeProcessor - Maze generation, solving, rendering, and video frame generation. |
| | |
| | Mirrors the SudokuProcessor pattern: a single class encapsulating all maze logic |
| | including DFS generation, BFS solving, image/video rendering, path verification, |
| | and text serialization. |
| | """ |
| | import random |
| | from collections import deque |
| | from typing import List, Tuple, Optional, Dict |
| |
|
| | import numpy as np |
| | from PIL import Image, ImageDraw |
| |
|
| | |
| | WALL_MASKS = {"N": 1, "S": 2, "W": 4, "E": 8} |
| | OPPOSITE = {"N": "S", "S": "N", "E": "W", "W": "E"} |
| | MOVES = { |
| | "U": (-1, 0, "N"), |
| | "D": (1, 0, "S"), |
| | "L": (0, -1, "W"), |
| | "R": (0, 1, "E"), |
| | } |
| | NEIGHBORS = { |
| | "N": (-1, 0), |
| | "S": (1, 0), |
| | "E": (0, 1), |
| | "W": (0, -1), |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | Grid = List[List[Dict[str, bool]]] |
| |
|
| |
|
| | class MazeProcessor: |
| | """Handles maze generation, solving, image rendering, and video frame creation.""" |
| |
|
| | def __init__(self, img_size: int = 512): |
| | self.img_size = img_size |
| |
|
| | |
| | self.bg_color = "black" |
| | self.cell_color = "white" |
| | self.wall_color = "black" |
| | self.grid_color = (224, 224, 224) |
| | self.start_color = "yellow" |
| | self.end_color = "blue" |
| | self.path_color = "red" |
| |
|
| | |
| |
|
| | @staticmethod |
| | def _empty_grid(n: int) -> Grid: |
| | """Create an n×n grid with all walls present.""" |
| | return [ |
| | [{"N": True, "E": True, "S": True, "W": True} for _ in range(n)] |
| | for _ in range(n) |
| | ] |
| |
|
| | @staticmethod |
| | def _remove_wall(grid: Grid, r: int, c: int, direction: str) -> None: |
| | """Remove wall between (r,c) and its neighbour in *direction*.""" |
| | dr, dc = NEIGHBORS[direction] |
| | grid[r][c][direction] = False |
| | grid[r + dr][c + dc][OPPOSITE[direction]] = False |
| |
|
| | def generate( |
| | self, size: int, min_path_len: int = 1, max_attempts: int = 200 |
| | ) -> Tuple[Grid, Tuple[int, int], Tuple[int, int], np.ndarray]: |
| | """ |
| | Generate a perfect maze and a start/end pair whose shortest path |
| | length >= *min_path_len*. |
| | |
| | Returns: |
| | (grid, start, end, path) where path is an (L, 2) int array. |
| | """ |
| | for _ in range(max_attempts): |
| | grid = self._gen_dfs(size) |
| | nodes = [(r, c) for r in range(size) for c in range(size)] |
| | start, end = random.sample(nodes, 2) |
| | path = self.solve_bfs(grid, start, end) |
| | if path is not None and len(path) >= min_path_len: |
| | return grid, tuple(start), tuple(end), path |
| | raise RuntimeError( |
| | f"Failed to generate maze (size={size}, min_path_len={min_path_len}) " |
| | f"after {max_attempts} attempts." |
| | ) |
| |
|
| | def _gen_dfs(self, n: int) -> Grid: |
| | """Randomised DFS (iterative) to carve a perfect maze.""" |
| | grid = self._empty_grid(n) |
| | visited = [[False] * n for _ in range(n)] |
| | sr, sc = random.randrange(n), random.randrange(n) |
| | visited[sr][sc] = True |
| | stack = [(sr, sc)] |
| |
|
| | while stack: |
| | r, c = stack[-1] |
| | nbrs = [] |
| | for d, (dr, dc) in NEIGHBORS.items(): |
| | nr, nc = r + dr, c + dc |
| | if 0 <= nr < n and 0 <= nc < n and not visited[nr][nc]: |
| | nbrs.append((d, nr, nc)) |
| | if nbrs: |
| | d, nr, nc = random.choice(nbrs) |
| | self._remove_wall(grid, r, c, d) |
| | visited[nr][nc] = True |
| | stack.append((nr, nc)) |
| | else: |
| | stack.pop() |
| | return grid |
| |
|
| | |
| |
|
| | def solve_bfs( |
| | self, grid: Grid, start: Tuple[int, int], end: Tuple[int, int] |
| | ) -> Optional[np.ndarray]: |
| | """BFS shortest path. Returns (L,2) int ndarray or None.""" |
| | n = len(grid) |
| | q: deque = deque([(start, [start])]) |
| | visited = {start} |
| |
|
| | while q: |
| | (r, c), path = q.popleft() |
| | if (r, c) == end: |
| | return np.array(path, dtype=int) |
| | cell = grid[r][c] |
| | for d, (dr, dc) in NEIGHBORS.items(): |
| | nr, nc = r + dr, c + dc |
| | if ( |
| | 0 <= nr < n |
| | and 0 <= nc < n |
| | and not cell[d] |
| | and (nr, nc) not in visited |
| | ): |
| | visited.add((nr, nc)) |
| | q.append(((nr, nc), path + [(nr, nc)])) |
| | return None |
| |
|
| | |
| |
|
| | @staticmethod |
| | def path_to_udrl(path) -> str: |
| | """Convert coordinate path to UDRL string.""" |
| | moves = [] |
| | for i in range(len(path) - 1): |
| | r1, c1 = path[i] |
| | r2, c2 = path[i + 1] |
| | if r2 < r1: |
| | moves.append("U") |
| | elif r2 > r1: |
| | moves.append("D") |
| | elif c2 < c1: |
| | moves.append("L") |
| | else: |
| | moves.append("R") |
| | return "".join(moves) |
| |
|
| | |
| |
|
| | def verify_path(self, grid: Grid, start: Tuple, end: Tuple, udrl: str, strict : bool = True) -> bool: |
| | """Verify that *udrl* is a wall-respecting walk from *start* to *end*.""" |
| | n = len(grid) |
| | r, c = start |
| | for ch in udrl.replace(",", "").replace(" ", "").strip(): |
| | if ch not in MOVES: |
| | continue |
| | dr, dc, wall = MOVES[ch] |
| | if grid[r][c][wall]: |
| | return False |
| | nr, nc = r + dr, c + dc |
| | if not (0 <= nr < n and 0 <= nc < n): |
| | return False |
| | r, c = nr, nc |
| | if not strict and (r, c) == end: |
| | break |
| |
|
| | return (r, c) == end |
| |
|
| | |
| |
|
| | def encode_grid(self, grid: Grid) -> str: |
| | """Encode grid to compact bitmask string (one int per cell, row-major).""" |
| | rows = [] |
| | for row in grid: |
| | vals = [] |
| | for cell in row: |
| | v = 0 |
| | for d, mask in WALL_MASKS.items(): |
| | if cell[d]: |
| | v |= mask |
| | vals.append(str(v)) |
| | rows.append(" ".join(vals)) |
| | return "\n".join(rows) |
| |
|
| | def decode_grid(self, text_lines: List[str]) -> Grid: |
| | """Decode bitmask text lines back to grid dicts.""" |
| | grid = [] |
| | for line in text_lines: |
| | row = [] |
| | for val_s in line.split(): |
| | val = int(val_s) |
| | row.append({d: bool(val & mask) for d, mask in WALL_MASKS.items()}) |
| | grid.append(row) |
| | return grid |
| |
|
| | def save_text(self, filepath, grid: Grid, start: Tuple, end: Tuple) -> None: |
| | """Save maze to compact text file.""" |
| | n = len(grid) |
| | with open(filepath, "w") as f: |
| | f.write(f"{n}\n{start[0]} {start[1]}\n{end[0]} {end[1]}\n") |
| | f.write(self.encode_grid(grid) + "\n") |
| |
|
| | def load_text(self, filepath) -> Optional[Dict]: |
| | """ |
| | Load maze from text file. |
| | |
| | Returns dict with keys: size, start, end, grid (dict-based), |
| | grid_raw (list[list[int]] bitmask). None on failure. |
| | """ |
| | try: |
| | with open(filepath) as f: |
| | lines = [l.strip() for l in f if l.strip()] |
| | n = int(lines[0]) |
| | sr, sc = map(int, lines[1].split()) |
| | er, ec = map(int, lines[2].split()) |
| | grid = self.decode_grid(lines[3 : 3 + n]) |
| | grid_raw: List[List[int]] = [] |
| | for r in range(n): |
| | grid_raw.append(list(map(int, lines[3 + r].split()))) |
| | return { |
| | "size": n, |
| | "start": (sr, sc), |
| | "end": (er, ec), |
| | "grid": grid, |
| | "grid_raw": grid_raw, |
| | } |
| | except Exception: |
| | return None |
| |
|
| | def fingerprint(self, grid: Grid, start: Tuple, end: Tuple) -> str: |
| | """Content fingerprint for deduplication.""" |
| | n = len(grid) |
| | parts = [f"{n},{start[0]},{start[1]},{end[0]},{end[1]}"] |
| | for row in grid: |
| | for cell in row: |
| | v = sum(WALL_MASKS[d] for d in WALL_MASKS if cell[d]) |
| | parts.append(str(v)) |
| | return "|".join(parts) |
| |
|
| | |
| |
|
| | def _layout(self, n: int): |
| | """Compute rendering layout parameters.""" |
| | cell_f = float(self.img_size) / n |
| | wall_f = cell_f / 4.0 |
| | half_f = wall_f / 2.0 |
| | grid_w = max(1, int(cell_f / 16.0)) |
| | return cell_f, wall_f, half_f, grid_w |
| |
|
| | def render( |
| | self, |
| | grid: Grid, |
| | start: Tuple[int, int], |
| | end: Tuple[int, int], |
| | path: Optional[np.ndarray] = None, |
| | path_steps: Optional[int] = None, |
| | ) -> Image.Image: |
| | """ |
| | Render maze as a PIL image. |
| | |
| | Args: |
| | grid: The maze grid. |
| | start, end: Coordinates of start/end cells. |
| | path: Full solution path (L, 2). |
| | path_steps: Draw only the first *path_steps* segments (for video). |
| | |
| | Returns: |
| | PIL.Image (RGB, img_size × img_size). |
| | """ |
| | n = len(grid) |
| | cell_f, wall_f, half_f, grid_w = self._layout(n) |
| |
|
| | img = Image.new("RGB", (self.img_size, self.img_size), self.bg_color) |
| | draw = ImageDraw.Draw(img) |
| |
|
| | |
| | for r in range(n): |
| | for c in range(n): |
| | x1 = c * cell_f + half_f |
| | y1 = r * cell_f + half_f |
| | x2 = (c + 1) * cell_f - half_f |
| | y2 = (r + 1) * cell_f - half_f |
| | draw.rectangle([(x1, y1), (x2, y2)], fill=self.cell_color) |
| | cell = grid[r][c] |
| | if not cell["S"] and r < n - 1: |
| | draw.rectangle( |
| | [(x1, y2), (x2, y2 + wall_f)], fill=self.cell_color |
| | ) |
| | if not cell["E"] and c < n - 1: |
| | draw.rectangle( |
| | [(x2, y1), (x2 + wall_f, y2)], fill=self.cell_color |
| | ) |
| |
|
| | |
| | for r in range(n): |
| | for c in range(n): |
| | if r < n - 1 and not grid[r][c]["S"]: |
| | y = (r + 1) * cell_f |
| | draw.line( |
| | [(c * cell_f + half_f, y), ((c + 1) * cell_f - half_f, y)], |
| | fill=self.grid_color, width=grid_w, |
| | ) |
| | if c < n - 1 and not grid[r][c]["E"]: |
| | x = (c + 1) * cell_f |
| | draw.line( |
| | [(x, r * cell_f + half_f), (x, (r + 1) * cell_f - half_f)], |
| | fill=self.grid_color, width=grid_w, |
| | ) |
| |
|
| | |
| | def _dot(rc, color): |
| | rr, cc = rc |
| | cx = cc * cell_f + cell_f / 2 |
| | cy = rr * cell_f + cell_f / 2 |
| | rad = max(2, int((cell_f - wall_f) * 0.25)) |
| | draw.ellipse([cx - rad, cy - rad, cx + rad, cy + rad], fill=color) |
| |
|
| | _dot(start, self.start_color) |
| | _dot(end, self.end_color) |
| |
|
| | |
| | if path is not None and len(path) >= 2: |
| | end_idx = ( |
| | len(path) if path_steps is None |
| | else min(path_steps + 1, len(path)) |
| | ) |
| | if end_idx >= 2: |
| | pts = [ |
| | (c * cell_f + cell_f / 2, r * cell_f + cell_f / 2) |
| | for r, c in path[:end_idx] |
| | ] |
| | draw.line( |
| | pts, fill=self.path_color, |
| | width=max(1, int(wall_f)), joint="curve", |
| | ) |
| |
|
| | return img |
| |
|
| | |
| |
|
| | def generate_video_frames( |
| | self, |
| | grid: Grid, |
| | start: Tuple[int, int], |
| | end: Tuple[int, int], |
| | path: np.ndarray, |
| | n_start: int = 5, |
| | m_end: int = 5, |
| | frames: Optional[int] = None, |
| | ) -> List[Image.Image]: |
| | """ |
| | Generate progressive video frames showing the red line growing. |
| | |
| | *frames* controls the number of **content frames** between holds: |
| | - None → 1 per path step |
| | - frames > steps → slow-motion |
| | - frames < steps → fast-forward |
| | |
| | Total length = n_start + content_frames + m_end. |
| | """ |
| | n_steps = len(path) - 1 |
| | if n_steps <= 0: |
| | blank = self.render(grid, start, end) |
| | return [blank] * (n_start + m_end + 1) |
| |
|
| | content_frames = frames if frames is not None else n_steps |
| | content_frames = max(1, content_frames) |
| |
|
| | result: List[Image.Image] = [] |
| |
|
| | |
| | blank = self.render(grid, start, end) |
| | result.extend([blank.copy() for _ in range(n_start)]) |
| |
|
| | |
| | if content_frames == n_steps: |
| | for step in range(1, n_steps + 1): |
| | result.append( |
| | self.render(grid, start, end, path=path, path_steps=step) |
| | ) |
| | elif content_frames > n_steps: |
| | for step in range(1, n_steps + 1): |
| | f_lo = (step - 1) * content_frames // n_steps |
| | f_hi = step * content_frames // n_steps |
| | count = f_hi - f_lo |
| | frame_img = self.render( |
| | grid, start, end, path=path, path_steps=step |
| | ) |
| | result.append(frame_img) |
| | if count > 1: |
| | result.extend([frame_img.copy() for _ in range(count - 1)]) |
| | else: |
| | for f in range(content_frames): |
| | step = (f + 1) * n_steps // content_frames |
| | result.append( |
| | self.render(grid, start, end, path=path, path_steps=step) |
| | ) |
| |
|
| | |
| | final = self.render(grid, start, end, path=path) |
| | result.extend([final.copy() for _ in range(m_end)]) |
| |
|
| | return result |
| |
|
| | |
| |
|
| | def extract_path_from_pixels( |
| | self, |
| | pixels: np.ndarray, |
| | grid_raw: List[List[int]], |
| | size: int, |
| | start: Tuple[int, int], |
| | pixel_threshold: float = 0.01, |
| | ) -> str: |
| | """ |
| | Detect red path in an RGB pixel array and return UDRL. |
| | |
| | Uses **floating-point** cell boundaries matching the renderer to avoid |
| | misalignment on sizes that don't evenly divide the image (e.g. 24, 48). |
| | |
| | Args: |
| | pixels: (H, W, 3) uint8 RGB array. |
| | grid_raw: Bitmask grid as list[list[int]]. |
| | size: Maze dimension n. |
| | start: Start coordinate (r, c). |
| | pixel_threshold: Min red-pixel fraction to mark a cell. |
| | |
| | Returns: |
| | UDRL action string. |
| | """ |
| | img = Image.fromarray(pixels) |
| | w, h = img.size |
| | px = np.array(img, dtype=float) |
| |
|
| | r_ch, g_ch, b_ch = px[:, :, 0], px[:, :, 1], px[:, :, 2] |
| | red_mask = (r_ch > 100) & (r_ch > g_ch * 1.2) & (r_ch > b_ch * 1.2) |
| |
|
| | |
| | |
| | |
| | cell_h_f = h / size |
| | cell_w_f = w / size |
| |
|
| | path_grid = np.zeros((size, size), dtype=bool) |
| | for r in range(size): |
| | y0 = int(round(r * cell_h_f)) |
| | y1 = int(round((r + 1) * cell_h_f)) |
| | for c in range(size): |
| | x0 = int(round(c * cell_w_f)) |
| | x1 = int(round((c + 1) * cell_w_f)) |
| | |
| | ch = y1 - y0 |
| | cw = x1 - x0 |
| | margin_y = max(1, int(ch * 0.15)) |
| | margin_x = max(1, int(cw * 0.15)) |
| | sub = red_mask[y0 + margin_y : y1 - margin_y, |
| | x0 + margin_x : x1 - margin_x] |
| | if sub.size > 0 and np.mean(sub) > pixel_threshold: |
| | path_grid[r, c] = True |
| |
|
| | |
| | directions = [ |
| | ("R", MOVES["R"]), |
| | ("D", MOVES["D"]), |
| | ("L", MOVES["L"]), |
| | ("U", MOVES["U"]), |
| | ] |
| | visited = {start} |
| | cr, cc = start |
| | actions: List[str] = [] |
| | for _ in range(size * size * 2): |
| | found = False |
| | wval = grid_raw[cr][cc] |
| | for act, (dr, dc, wall_ch) in directions: |
| | nr, nc = cr + dr, cc + dc |
| | if 0 <= nr < size and 0 <= nc < size: |
| | if (wval & WALL_MASKS[wall_ch]) != 0: |
| | continue |
| | if path_grid[nr, nc] and (nr, nc) not in visited: |
| | visited.add((nr, nc)) |
| | actions.append(act) |
| | cr, cc = nr, nc |
| | found = True |
| | break |
| | if not found: |
| | break |
| | return "".join(actions) |
| |
|
| | def extract_path_from_image( |
| | self, img_path: str, grid_raw: List[List[int]], size: int, start: Tuple |
| | ) -> str: |
| | """Extract UDRL from an image file (convenience wrapper).""" |
| | try: |
| | pixels = np.array(Image.open(img_path).convert("RGB")) |
| | return self.extract_path_from_pixels(pixels, grid_raw, size, start) |
| | except Exception: |
| | return "" |
| |
|
| |
|
| | if __name__ == "__main__": |
| | proc = MazeProcessor(img_size=512) |
| |
|
| | |
| | grid, start, end, path = proc.generate(size=8, min_path_len=10) |
| | n_steps = len(path) - 1 |
| | print(f"Maze 8×8 | path length {len(path)} | steps {n_steps}") |
| | print(f"UDRL: {proc.path_to_udrl(path)}") |
| | print(f"Verify: {proc.verify_path(grid, start, end, proc.path_to_udrl(path))}") |
| |
|
| | proc.render(grid, start, end).save("test_maze.png") |
| | proc.render(grid, start, end, path=path).save("test_maze_solution.png") |
| |
|
| | |
| | f1 = proc.generate_video_frames(grid, start, end, path, n_start=3, m_end=3) |
| | assert len(f1) == 3 + n_steps + 3 |
| |
|
| | f2 = proc.generate_video_frames( |
| | grid, start, end, path, n_start=3, m_end=3, frames=n_steps * 3 |
| | ) |
| | assert len(f2) == 3 + n_steps * 3 + 3 |
| |
|
| | half = max(1, n_steps // 2) |
| | f3 = proc.generate_video_frames( |
| | grid, start, end, path, n_start=3, m_end=3, frames=half |
| | ) |
| | assert len(f3) == 3 + half + 3 |
| |
|
| | print(f"frames=None → {len(f1)} total ({n_steps} content)") |
| | print(f"frames={n_steps*3:<4d} → {len(f2)} total (slow-mo)") |
| | print(f"frames={half:<4d} → {len(f3)} total (fast-fwd)") |
| | print("All assertions passed ✓") |