Spaces:
Runtime error
Runtime error
| """ | |
| Display Module - Output System | |
| This module handles the final output of rendered frames, supporting multiple | |
| output methods including WebSocket to browser, GUI windows, and image files. | |
| """ | |
| import asyncio | |
| import json | |
| import base64 | |
| import time | |
| import numpy as np | |
| from typing import Optional, Dict, Any, Callable | |
| from io import BytesIO | |
| import threading | |
| try: | |
| import websockets | |
| WEBSOCKETS_AVAILABLE = True | |
| except ImportError: | |
| WEBSOCKETS_AVAILABLE = False | |
| print("Warning: websockets not available. WebSocket display will not work.") | |
| try: | |
| import tkinter as tk | |
| from tkinter import Canvas | |
| from PIL import Image, ImageTk | |
| TKINTER_AVAILABLE = True | |
| except ImportError: | |
| TKINTER_AVAILABLE = False | |
| print("Warning: tkinter or PIL not available. GUI display will not work.") | |
| try: | |
| from PIL import Image | |
| PIL_AVAILABLE = True | |
| except ImportError: | |
| PIL_AVAILABLE = False | |
| print("Warning: PIL not available. Image saving will not work.") | |
| class DisplayMode: | |
| """Enumeration of display modes.""" | |
| WEBSOCKET = "websocket" | |
| GUI = "gui" | |
| FILE = "file" | |
| CONSOLE = "console" | |
| class WebSocketDisplay: | |
| """WebSocket-based display that sends frames to a web browser.""" | |
| def __init__(self, host: str = "localhost", port: int = 8765): | |
| self.host = host | |
| self.port = port | |
| self.server = None | |
| self.clients = set() | |
| self.is_running = False | |
| async def start_server(self): | |
| """Start the WebSocket server.""" | |
| if not WEBSOCKETS_AVAILABLE: | |
| raise RuntimeError("WebSocket support not available") | |
| async def handle_client(websocket, path): | |
| self.clients.add(websocket) | |
| print(f"Client connected: {websocket.remote_address}") | |
| try: | |
| await websocket.wait_closed() | |
| finally: | |
| self.clients.remove(websocket) | |
| print(f"Client disconnected: {websocket.remote_address}") | |
| self.server = await websockets.serve(handle_client, self.host, self.port) | |
| self.is_running = True | |
| print(f"WebSocket server started on ws://{self.host}:{self.port}") | |
| async def stop_server(self): | |
| """Stop the WebSocket server.""" | |
| if self.server: | |
| self.server.close() | |
| await self.server.wait_closed() | |
| self.is_running = False | |
| print("WebSocket server stopped") | |
| async def send_frame(self, frame_data: np.ndarray, frame_id: int = 0): | |
| """Send a frame to all connected clients.""" | |
| if not self.clients or not PIL_AVAILABLE: | |
| return | |
| try: | |
| # Convert numpy array to PIL Image | |
| if len(frame_data.shape) == 3: | |
| height, width, channels = frame_data.shape | |
| if channels == 3: | |
| image = Image.fromarray(frame_data.astype(np.uint8), 'RGB') | |
| elif channels == 4: | |
| image = Image.fromarray(frame_data.astype(np.uint8), 'RGBA') | |
| else: | |
| # Convert single channel to RGB | |
| rgb_data = np.stack([frame_data[:,:,0]] * 3, axis=-1) | |
| image = Image.fromarray(rgb_data.astype(np.uint8), 'RGB') | |
| else: | |
| # Grayscale | |
| image = Image.fromarray(frame_data.astype(np.uint8), 'L') | |
| # Convert to base64 | |
| buffer = BytesIO() | |
| image.save(buffer, format='PNG') | |
| img_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') | |
| # Create message | |
| message = { | |
| "type": "frame", | |
| "frame_id": frame_id, | |
| "width": image.width, | |
| "height": image.height, | |
| "data": f"data:image/png;base64,{img_base64}", | |
| "timestamp": time.time() | |
| } | |
| # Send to all clients | |
| if self.clients: | |
| await asyncio.gather( | |
| *[client.send(json.dumps(message)) for client in self.clients], | |
| return_exceptions=True | |
| ) | |
| except Exception as e: | |
| print(f"Error sending frame via WebSocket: {e}") | |
| class GUIDisplay: | |
| """Tkinter-based GUI display window.""" | |
| def __init__(self, title: str = "vGPU Display", width: int = 800, height: int = 600): | |
| if not TKINTER_AVAILABLE: | |
| raise RuntimeError("GUI display not available (tkinter/PIL missing)") | |
| self.title = title | |
| self.width = width | |
| self.height = height | |
| self.window = None | |
| self.canvas = None | |
| self.is_running = False | |
| self.update_callback = None | |
| def start(self): | |
| """Start the GUI display in a separate thread.""" | |
| if self.is_running: | |
| return | |
| def run_gui(): | |
| self.window = tk.Tk() | |
| self.window.title(self.title) | |
| self.window.geometry(f"{self.width}x{self.height}") | |
| self.canvas = Canvas(self.window, width=self.width, height=self.height, bg='black') | |
| self.canvas.pack() | |
| self.is_running = True | |
| # Set up periodic update | |
| def update(): | |
| if self.update_callback: | |
| self.update_callback() | |
| if self.is_running: | |
| self.window.after(16, update) # ~60 FPS | |
| update() | |
| self.window.protocol("WM_DELETE_WINDOW", self.stop) | |
| self.window.mainloop() | |
| self.gui_thread = threading.Thread(target=run_gui, daemon=True) | |
| self.gui_thread.start() | |
| def stop(self): | |
| """Stop the GUI display.""" | |
| self.is_running = False | |
| if self.window: | |
| self.window.quit() | |
| def show_frame(self, frame_data: np.ndarray): | |
| """Display a frame in the GUI window.""" | |
| if not self.is_running or not self.canvas: | |
| return | |
| try: | |
| # Convert numpy array to PIL Image | |
| if len(frame_data.shape) == 3: | |
| height, width, channels = frame_data.shape | |
| if channels >= 3: | |
| image = Image.fromarray(frame_data[:,:,:3].astype(np.uint8), 'RGB') | |
| else: | |
| # Convert single channel to RGB | |
| rgb_data = np.stack([frame_data[:,:,0]] * 3, axis=-1) | |
| image = Image.fromarray(rgb_data.astype(np.uint8), 'RGB') | |
| else: | |
| # Grayscale | |
| image = Image.fromarray(frame_data.astype(np.uint8), 'L') | |
| # Resize to fit canvas | |
| image = image.resize((self.width, self.height), Image.Resampling.LANCZOS) | |
| # Convert to PhotoImage | |
| photo = ImageTk.PhotoImage(image) | |
| # Update canvas | |
| self.canvas.delete("all") | |
| self.canvas.create_image(self.width//2, self.height//2, image=photo) | |
| # Keep a reference to prevent garbage collection | |
| self.canvas.image = photo | |
| except Exception as e: | |
| print(f"Error displaying frame in GUI: {e}") | |
| def set_update_callback(self, callback: Callable): | |
| """Set a callback function to be called periodically.""" | |
| self.update_callback = callback | |
| class FileDisplay: | |
| """File-based display that saves frames as image files.""" | |
| def __init__(self, output_dir: str = "./frames", format: str = "png"): | |
| self.output_dir = output_dir | |
| self.format = format.lower() | |
| self.frame_counter = 0 | |
| # Create output directory | |
| import os | |
| os.makedirs(output_dir, exist_ok=True) | |
| def save_frame(self, frame_data: np.ndarray, filename: Optional[str] = None): | |
| """Save a frame to a file.""" | |
| if not PIL_AVAILABLE: | |
| print("Error: PIL not available for saving images") | |
| return False | |
| try: | |
| if filename is None: | |
| filename = f"frame_{self.frame_counter:06d}.{self.format}" | |
| self.frame_counter += 1 | |
| filepath = f"{self.output_dir}/{filename}" | |
| # Convert numpy array to PIL Image | |
| if len(frame_data.shape) == 3: | |
| height, width, channels = frame_data.shape | |
| if channels == 3: | |
| image = Image.fromarray(frame_data.astype(np.uint8), 'RGB') | |
| elif channels == 4: | |
| image = Image.fromarray(frame_data.astype(np.uint8), 'RGBA') | |
| else: | |
| # Convert single channel to RGB | |
| rgb_data = np.stack([frame_data[:,:,0]] * 3, axis=-1) | |
| image = Image.fromarray(rgb_data.astype(np.uint8), 'RGB') | |
| else: | |
| # Grayscale | |
| image = Image.fromarray(frame_data.astype(np.uint8), 'L') | |
| # Save image | |
| image.save(filepath) | |
| print(f"Frame saved: {filepath}") | |
| return True | |
| except Exception as e: | |
| print(f"Error saving frame: {e}") | |
| return False | |
| class ConsoleDisplay: | |
| """Console-based display that shows ASCII art representation.""" | |
| def __init__(self, width: int = 80, height: int = 24): | |
| self.width = width | |
| self.height = height | |
| self.ascii_chars = " .:-=+*#%@" | |
| def show_frame(self, frame_data: np.ndarray): | |
| """Display frame as ASCII art in console.""" | |
| try: | |
| # Convert to grayscale if needed | |
| if len(frame_data.shape) == 3: | |
| # Convert RGB to grayscale | |
| gray = np.dot(frame_data[...,:3], [0.299, 0.587, 0.114]) | |
| else: | |
| gray = frame_data | |
| # Resize to console dimensions | |
| from scipy import ndimage | |
| resized = ndimage.zoom(gray, (self.height / gray.shape[0], self.width / gray.shape[1])) | |
| # Convert to ASCII | |
| ascii_frame = [] | |
| for row in resized: | |
| ascii_row = "" | |
| for pixel in row: | |
| # Map pixel value to ASCII character | |
| char_index = int((pixel / 255.0) * (len(self.ascii_chars) - 1)) | |
| ascii_row += self.ascii_chars[char_index] | |
| ascii_frame.append(ascii_row) | |
| # Clear screen and display | |
| print("\033[2J\033[H") # Clear screen and move cursor to top | |
| for row in ascii_frame: | |
| print(row) | |
| except Exception as e: | |
| print(f"Error displaying ASCII frame: {e}") | |
| class DisplayManager: | |
| """Manages multiple display outputs and coordinates frame updates.""" | |
| def __init__(self, vram=None): | |
| self.vram = vram | |
| self.displays = {} | |
| self.active_framebuffer = None | |
| self.frame_counter = 0 | |
| self.fps_target = 60 | |
| self.last_frame_time = 0 | |
| # Statistics | |
| self.frames_displayed = 0 | |
| self.total_display_time = 0.0 | |
| def add_display(self, name: str, display_type: str, **kwargs): | |
| """Add a display output.""" | |
| if display_type == DisplayMode.WEBSOCKET: | |
| display = WebSocketDisplay(**kwargs) | |
| elif display_type == DisplayMode.GUI: | |
| display = GUIDisplay(**kwargs) | |
| elif display_type == DisplayMode.FILE: | |
| display = FileDisplay(**kwargs) | |
| elif display_type == DisplayMode.CONSOLE: | |
| display = ConsoleDisplay(**kwargs) | |
| else: | |
| raise ValueError(f"Unknown display type: {display_type}") | |
| self.displays[name] = { | |
| "display": display, | |
| "type": display_type, | |
| "enabled": True | |
| } | |
| return display | |
| def remove_display(self, name: str): | |
| """Remove a display output.""" | |
| if name in self.displays: | |
| display_info = self.displays[name] | |
| if display_info["type"] == DisplayMode.WEBSOCKET: | |
| asyncio.create_task(display_info["display"].stop_server()) | |
| elif display_info["type"] == DisplayMode.GUI: | |
| display_info["display"].stop() | |
| del self.displays[name] | |
| def set_active_framebuffer(self, framebuffer_id: str): | |
| """Set the active framebuffer to display.""" | |
| self.active_framebuffer = framebuffer_id | |
| async def update_displays(self): | |
| """Update all active displays with the current framebuffer.""" | |
| if not self.vram or not self.active_framebuffer: | |
| return | |
| start_time = time.time() | |
| # Get framebuffer data | |
| framebuffer = self.vram.get_framebuffer(self.active_framebuffer) | |
| if not framebuffer: | |
| return | |
| frame_data = framebuffer.pixel_buffer | |
| # Update each display | |
| for name, display_info in self.displays.items(): | |
| if not display_info["enabled"]: | |
| continue | |
| display = display_info["display"] | |
| display_type = display_info["type"] | |
| try: | |
| if display_type == DisplayMode.WEBSOCKET: | |
| await display.send_frame(frame_data, self.frame_counter) | |
| elif display_type == DisplayMode.GUI: | |
| display.show_frame(frame_data) | |
| elif display_type == DisplayMode.FILE: | |
| display.save_frame(frame_data) | |
| elif display_type == DisplayMode.CONSOLE: | |
| display.show_frame(frame_data) | |
| except Exception as e: | |
| print(f"Error updating display {name}: {e}") | |
| # Update statistics | |
| self.frame_counter += 1 | |
| self.frames_displayed += 1 | |
| self.total_display_time += time.time() - start_time | |
| self.last_frame_time = time.time() | |
| def enable_display(self, name: str, enabled: bool = True): | |
| """Enable or disable a specific display.""" | |
| if name in self.displays: | |
| self.displays[name]["enabled"] = enabled | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get display manager statistics.""" | |
| avg_display_time = self.total_display_time / max(1, self.frames_displayed) | |
| current_fps = 1.0 / max(0.001, time.time() - self.last_frame_time) if self.last_frame_time > 0 else 0 | |
| return { | |
| "frames_displayed": self.frames_displayed, | |
| "total_display_time": self.total_display_time, | |
| "avg_display_time": avg_display_time, | |
| "current_fps": current_fps, | |
| "target_fps": self.fps_target, | |
| "active_displays": len([d for d in self.displays.values() if d["enabled"]]), | |
| "total_displays": len(self.displays), | |
| "active_framebuffer": self.active_framebuffer | |
| } | |
| if __name__ == "__main__": | |
| # Test the display system | |
| async def test_display(): | |
| from vram import VRAM | |
| from render import Renderer | |
| print("Testing Display System...") | |
| # Create VRAM and renderer | |
| vram = VRAM(memory_size_gb=1) | |
| renderer = Renderer(vram) | |
| # Create display manager | |
| display_manager = DisplayManager(vram) | |
| # Create a test framebuffer | |
| fb_id = vram.create_framebuffer(400, 300, 3) | |
| display_manager.set_active_framebuffer(fb_id) | |
| # Add displays | |
| if WEBSOCKETS_AVAILABLE: | |
| ws_display = display_manager.add_display("websocket", DisplayMode.WEBSOCKET) | |
| await ws_display.start_server() | |
| if TKINTER_AVAILABLE: | |
| gui_display = display_manager.add_display("gui", DisplayMode.GUI, width=400, height=300) | |
| gui_display.start() | |
| file_display = display_manager.add_display("file", DisplayMode.FILE, output_dir="./test_frames") | |
| console_display = display_manager.add_display("console", DisplayMode.CONSOLE, width=40, height=20) | |
| # Render some test content | |
| renderer.clear(fb_id, (64, 128, 255)) | |
| renderer.draw_rect(fb_id, 50, 50, 100, 80, (255, 0, 0)) | |
| renderer.draw_circle(fb_id, 200, 150, 40, (0, 255, 0), filled=True) | |
| # Update displays | |
| await display_manager.update_displays() | |
| # Animate for a few seconds | |
| for i in range(60): # 1 second at 60 FPS | |
| # Clear and draw animated content | |
| renderer.clear(fb_id, (32, 64, 128)) | |
| # Moving rectangle | |
| x = 50 + int(50 * np.sin(i * 0.1)) | |
| renderer.draw_rect(fb_id, x, 50, 50, 50, (255, 255, 0)) | |
| # Rotating line effect | |
| center_x, center_y = 200, 150 | |
| for j in range(8): | |
| angle = (i + j * 8) * 0.1 | |
| end_x = center_x + int(40 * np.cos(angle)) | |
| end_y = center_y + int(40 * np.sin(angle)) | |
| renderer.draw_line(fb_id, center_x, center_y, end_x, end_y, (0, 255, 255)) | |
| # Update displays | |
| await display_manager.update_displays() | |
| await asyncio.sleep(1/60) # 60 FPS | |
| # Print statistics | |
| stats = display_manager.get_stats() | |
| print(f"Display Manager stats: {stats}") | |
| # Cleanup | |
| if WEBSOCKETS_AVAILABLE: | |
| await ws_display.stop_server() | |
| if TKINTER_AVAILABLE: | |
| gui_display.stop() | |
| print("Display system test completed!") | |
| # Run the test | |
| asyncio.run(test_display()) | |