Spaces:
Running
Running
| import subprocess | |
| import os | |
| from typing import Dict, Any, Optional, List | |
| from openspace.utils.logging import Logger | |
| from PIL import Image | |
| import pyautogui | |
| try: | |
| import pyatspi | |
| from pyatspi import Accessible, StateType, STATE_SHOWING | |
| import Xlib | |
| from Xlib import display, X | |
| LINUX_LIBS_AVAILABLE = True | |
| except ImportError: | |
| LINUX_LIBS_AVAILABLE = False | |
| logger = Logger.get_logger(__name__) | |
| class LinuxAdapter: | |
| def __init__(self): | |
| if not LINUX_LIBS_AVAILABLE: | |
| logger.warning("Linux libraries are not fully installed, some features may not be available") | |
| self.available = LINUX_LIBS_AVAILABLE | |
| def capture_screenshot_with_cursor(self, output_path: str) -> bool: | |
| """ | |
| Use pyautogui + pyxcursor to capture screenshot (including cursor) | |
| Args: | |
| output_path: Output file path | |
| Returns: | |
| Whether the screenshot is successful | |
| """ | |
| try: | |
| # Use pyautogui to capture screenshot | |
| screenshot = pyautogui.screenshot() | |
| # Try to add cursor | |
| try: | |
| # Import pyxcursor (should be in the same directory) | |
| import sys | |
| import os | |
| sys.path.insert(0, os.path.dirname(__file__)) | |
| from pyxcursor import Xcursor | |
| cursor_obj = Xcursor() | |
| imgarray = cursor_obj.getCursorImageArrayFast() | |
| cursor_img = Image.fromarray(imgarray) | |
| cursor_x, cursor_y = pyautogui.position() | |
| screenshot.paste(cursor_img, (cursor_x, cursor_y), cursor_img) | |
| logger.info("Linux screenshot successfully (with cursor)") | |
| except Exception as e: | |
| logger.warning(f"Failed to add cursor to screenshot: {e}") | |
| logger.info("Linux screenshot successfully (without cursor)") | |
| screenshot.save(output_path) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Linux screenshot failed: {e}") | |
| return False | |
| def activate_window(self, window_name: str, strict: bool = False, by_class: bool = False) -> Dict[str, Any]: | |
| """ | |
| Activate window (Linux uses wmctrl) | |
| Args: | |
| window_name: Window name | |
| strict: Whether to strictly match | |
| by_class: Whether to match by class name | |
| Returns: | |
| Result dictionary | |
| """ | |
| try: | |
| # Build wmctrl command | |
| flags = f"-{'x' if by_class else ''}{'F' if strict else ''}a" | |
| cmd = ["wmctrl", flags, window_name] | |
| subprocess.run(cmd, check=True, timeout=5) | |
| logger.info(f"Linux window activated successfully: {window_name}") | |
| return {'status': 'success', 'message': 'Window activated'} | |
| except subprocess.CalledProcessError as e: | |
| logger.warning(f"wmctrl command execution failed: {e}") | |
| return {'status': 'error', 'message': f'Window {window_name} not found or wmctrl failed'} | |
| except FileNotFoundError: | |
| logger.error("wmctrl not installed, please install: sudo apt install wmctrl") | |
| return {'status': 'error', 'message': 'wmctrl not installed'} | |
| except Exception as e: | |
| logger.error(f"Linux window activation failed: {e}") | |
| return {'status': 'error', 'message': str(e)} | |
| def close_window(self, window_name: str, strict: bool = False, by_class: bool = False) -> Dict[str, Any]: | |
| """ | |
| Close window (Linux uses wmctrl) | |
| Args: | |
| window_name: Window name | |
| strict: Whether to strictly match | |
| by_class: Whether to match by class name | |
| Returns: | |
| Result dictionary | |
| """ | |
| try: | |
| # Build wmctrl command | |
| flags = f"-{'x' if by_class else ''}{'F' if strict else ''}c" | |
| cmd = ["wmctrl", flags, window_name] | |
| subprocess.run(cmd, check=True, timeout=5) | |
| logger.info(f"Linux window closed successfully: {window_name}") | |
| return {'status': 'success', 'message': 'Window closed'} | |
| except subprocess.CalledProcessError as e: | |
| logger.warning(f"wmctrl command execution failed: {e}") | |
| return {'status': 'error', 'message': f'Window {window_name} not found or wmctrl failed'} | |
| except FileNotFoundError: | |
| logger.error("wmctrl not installed") | |
| return {'status': 'error', 'message': 'wmctrl not installed'} | |
| except Exception as e: | |
| logger.error(f"Linux window close failed: {e}") | |
| return {'status': 'error', 'message': str(e)} | |
| def get_accessibility_tree(self, max_depth: int = 10, max_width: int = 50) -> Dict[str, Any]: | |
| """ | |
| Get Linux accessibility tree (using AT-SPI) | |
| Args: | |
| max_depth: Maximum depth | |
| max_width: Maximum number of child elements per level | |
| Returns: | |
| Accessibility tree data | |
| """ | |
| if not LINUX_LIBS_AVAILABLE: | |
| return {'error': 'Linux accessibility libraries not available'} | |
| try: | |
| # Get desktop root node | |
| desktop = pyatspi.Registry.getDesktop(0) | |
| # Serialize accessibility tree | |
| tree = self._serialize_atspi_element( | |
| desktop, | |
| depth=0, | |
| max_depth=max_depth, | |
| max_width=max_width | |
| ) | |
| return { | |
| 'tree': tree, | |
| 'platform': 'Linux' | |
| } | |
| except Exception as e: | |
| logger.error(f"Linux get accessibility tree failed: {e}") | |
| return {'error': str(e)} | |
| def _serialize_atspi_element( | |
| self, | |
| element: Accessible, | |
| depth: int = 0, | |
| max_depth: int = 10, | |
| max_width: int = 50 | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Serialize AT-SPI element to dictionary | |
| Args: | |
| element: AT-SPI accessible element | |
| depth: Current depth | |
| max_depth: Maximum depth | |
| max_width: Maximum width | |
| Returns: | |
| Serialized dictionary | |
| """ | |
| if depth > max_depth: | |
| return None | |
| try: | |
| result = { | |
| 'depth': depth, | |
| 'role': element.getRoleName(), | |
| 'name': element.name, | |
| } | |
| # Get states | |
| try: | |
| states = element.getState().get_states() | |
| result['states'] = [StateType._enum_lookup[st].split('_', 1)[1].lower() | |
| for st in states if st in StateType._enum_lookup] | |
| except: | |
| result['states'] = [] | |
| # Get attributes | |
| try: | |
| attributes = element.get_attributes() | |
| if attributes: | |
| result['attributes'] = dict(attributes) | |
| except: | |
| result['attributes'] = {} | |
| # Get position and size (if visible) | |
| if STATE_SHOWING in element.getState().get_states(): | |
| try: | |
| component = element.queryComponent() | |
| bbox = component.getExtents(pyatspi.XY_SCREEN) | |
| result['position'] = {'x': bbox[0], 'y': bbox[1]} | |
| result['size'] = {'width': bbox[2], 'height': bbox[3]} | |
| except: | |
| pass | |
| # Get text content | |
| try: | |
| text_obj = element.queryText() | |
| text = text_obj.getText(0, text_obj.characterCount) | |
| if text: | |
| result['text'] = text.replace("\ufffc", "").replace("\ufffd", "") | |
| except: | |
| pass | |
| # Recursively get child elements | |
| result['children'] = [] | |
| try: | |
| child_count = min(element.childCount, max_width) | |
| for i in range(child_count): | |
| try: | |
| child = element.getChildAtIndex(i) | |
| child_data = self._serialize_atspi_element( | |
| child, | |
| depth + 1, | |
| max_depth, | |
| max_width | |
| ) | |
| if child_data: | |
| result['children'].append(child_data) | |
| except Exception as e: | |
| logger.debug(f"Cannot serialize child element {i}: {e}") | |
| continue | |
| except Exception as e: | |
| logger.debug(f"Cannot get child elements: {e}") | |
| return result | |
| except Exception as e: | |
| logger.debug(f"Failed to serialize element (depth={depth}): {e}") | |
| return None | |
| def get_screen_size(self) -> Dict[str, int]: | |
| """ | |
| Get screen size | |
| Returns: | |
| Screen size dictionary | |
| """ | |
| try: | |
| if LINUX_LIBS_AVAILABLE: | |
| d = display.Display() | |
| screen = d.screen() | |
| return { | |
| 'width': screen.width_in_pixels, | |
| 'height': screen.height_in_pixels | |
| } | |
| else: | |
| # Use pyautogui as fallback | |
| size = pyautogui.size() | |
| return {'width': size.width, 'height': size.height} | |
| except Exception as e: | |
| logger.error(f"Failed to get screen size: {e}") | |
| return {'width': 1920, 'height': 1080} # Default value | |
| def list_windows(self) -> List[Dict[str, Any]]: | |
| """ | |
| List all windows | |
| Returns: | |
| Window list | |
| """ | |
| try: | |
| result = subprocess.run( | |
| ['wmctrl', '-l'], | |
| capture_output=True, | |
| text=True, | |
| check=True | |
| ) | |
| windows = [] | |
| for line in result.stdout.strip().split('\n'): | |
| if line: | |
| parts = line.split(None, 3) | |
| if len(parts) >= 4: | |
| windows.append({ | |
| 'id': parts[0], | |
| 'desktop': parts[1], | |
| 'hostname': parts[2], | |
| 'title': parts[3] | |
| }) | |
| return windows | |
| except FileNotFoundError: | |
| logger.error("wmctrl not installed") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"List windows failed: {e}") | |
| return [] | |
| def get_terminal_output(self) -> Optional[str]: | |
| """ | |
| Get terminal output (GNOME Terminal) | |
| Returns: | |
| Terminal output content | |
| """ | |
| if not LINUX_LIBS_AVAILABLE: | |
| return None | |
| try: | |
| desktop = pyatspi.Registry.getDesktop(0) | |
| # Find gnome-terminal-server | |
| for app in desktop: | |
| if app.getRoleName() == "application" and app.name == "gnome-terminal-server": | |
| for frame in app: | |
| if frame.getRoleName() == "frame" and frame.getState().contains(pyatspi.STATE_ACTIVE): | |
| # Find terminal component | |
| for component in self._find_terminals(frame): | |
| try: | |
| text_obj = component.queryText() | |
| output = text_obj.getText(0, text_obj.characterCount) | |
| return output.rstrip() if output else None | |
| except: | |
| continue | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to get terminal output: {e}") | |
| return None | |
| def _find_terminals(self, element) -> List[Accessible]: | |
| """Recursively find terminal components""" | |
| terminals = [] | |
| try: | |
| if element.getRoleName() == "terminal": | |
| terminals.append(element) | |
| for i in range(element.childCount): | |
| child = element.getChildAtIndex(i) | |
| terminals.extend(self._find_terminals(child)) | |
| except: | |
| pass | |
| return terminals | |
| def set_wallpaper(self, image_path: str) -> Dict[str, Any]: | |
| """ | |
| Set desktop wallpaper (GNOME) | |
| Args: | |
| image_path: Image path | |
| Returns: | |
| Result dictionary | |
| """ | |
| try: | |
| image_path = os.path.expanduser(image_path) | |
| image_path = os.path.abspath(image_path) | |
| if not os.path.exists(image_path): | |
| return {'status': 'error', 'message': f'Image not found: {image_path}'} | |
| # Use gsettings to set wallpaper (GNOME) | |
| subprocess.run([ | |
| 'gsettings', 'set', | |
| 'org.gnome.desktop.background', | |
| 'picture-uri', | |
| f'file://{image_path}' | |
| ], check=True, timeout=5) | |
| logger.info(f"Linux wallpaper set successfully: {image_path}") | |
| return {'status': 'success', 'message': 'Wallpaper set successfully'} | |
| except Exception as e: | |
| logger.error(f"Linux set wallpaper failed: {e}") | |
| return {'status': 'error', 'message': str(e)} | |
| def get_system_info(self) -> Dict[str, Any]: | |
| """ | |
| Get Linux system information | |
| Returns: | |
| System information dictionary | |
| """ | |
| try: | |
| # Get distribution information | |
| try: | |
| with open('/etc/os-release', 'r') as f: | |
| os_info = {} | |
| for line in f: | |
| if '=' in line: | |
| key, value = line.strip().split('=', 1) | |
| os_info[key] = value.strip('"') | |
| distro = os_info.get('PRETTY_NAME', 'Unknown Linux') | |
| except: | |
| distro = 'Unknown Linux' | |
| # Get kernel version | |
| kernel = subprocess.run( | |
| ['uname', '-r'], | |
| capture_output=True, | |
| text=True | |
| ).stdout.strip() | |
| return { | |
| 'platform': 'Linux', | |
| 'distro': distro, | |
| 'kernel': kernel, | |
| 'available': self.available | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get system information: {e}") | |
| return { | |
| 'platform': 'Linux', | |
| 'error': str(e) | |
| } | |
| def start_recording(self, output_path: str) -> Dict[str, Any]: | |
| try: | |
| try: | |
| subprocess.run(['ffmpeg', '-version'], | |
| capture_output=True, | |
| check=True, | |
| timeout=5) | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| return { | |
| 'status': 'error', | |
| 'message': 'ffmpeg not installed. Install with: sudo apt install ffmpeg' | |
| } | |
| try: | |
| if LINUX_LIBS_AVAILABLE: | |
| from Xlib import display as xdisplay | |
| d = xdisplay.Display() | |
| screen_width = d.screen().width_in_pixels | |
| screen_height = d.screen().height_in_pixels | |
| else: | |
| # use pyautogui as fallback | |
| size = pyautogui.size() | |
| screen_width = size.width | |
| screen_height = size.height | |
| except: | |
| screen_width, screen_height = 1920, 1080 | |
| command = [ | |
| 'ffmpeg', | |
| '-y', | |
| '-f', 'x11grab', | |
| '-draw_mouse', '1', | |
| '-s', f'{screen_width}x{screen_height}', | |
| '-i', ':0.0', | |
| '-c:v', 'libx264', | |
| '-preset', 'ultrafast', | |
| '-r', '30', | |
| output_path | |
| ] | |
| process = subprocess.Popen( | |
| command, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.PIPE, | |
| text=True | |
| ) | |
| import time | |
| time.sleep(1) | |
| if process.poll() is not None: | |
| error_output = process.stderr.read() if process.stderr else "Unknown error" | |
| return { | |
| 'status': 'error', | |
| 'message': f'Failed to start recording: {error_output}' | |
| } | |
| logger.info(f"Linux recording started: {output_path}") | |
| return { | |
| 'status': 'success', | |
| 'message': 'Recording started', | |
| 'process': process | |
| } | |
| except Exception as e: | |
| logger.error(f"Linux start recording failed: {e}") | |
| return { | |
| 'status': 'error', | |
| 'message': str(e) | |
| } | |
| def stop_recording(self, process) -> Dict[str, Any]: | |
| try: | |
| import signal | |
| if not process or process.poll() is not None: | |
| return { | |
| 'status': 'error', | |
| 'message': 'No recording in progress' | |
| } | |
| process.send_signal(signal.SIGINT) | |
| try: | |
| process.wait(timeout=15) | |
| except subprocess.TimeoutExpired: | |
| logger.warning("ffmpeg did not respond to SIGINT, killing process") | |
| process.kill() | |
| process.wait() | |
| logger.info("Linux recording stopped successfully") | |
| return { | |
| 'status': 'success', | |
| 'message': 'Recording stopped' | |
| } | |
| except Exception as e: | |
| logger.error(f"Linux stop recording failed: {e}") | |
| return { | |
| 'status': 'error', | |
| 'message': str(e) | |
| } | |
| def get_running_applications(self) -> List[Dict[str, str]]: | |
| """ | |
| Get list of all running applications | |
| Returns: | |
| Application list | |
| """ | |
| try: | |
| import psutil | |
| apps = [] | |
| seen_names = set() | |
| for proc in psutil.process_iter(['pid', 'name', 'exe', 'cmdline']): | |
| try: | |
| pinfo = proc.info | |
| name = pinfo['name'] | |
| exe = pinfo['exe'] | |
| # Skip kernel processes and system daemons | |
| if not exe or name.startswith('['): | |
| continue | |
| # Skip duplicates | |
| if name in seen_names: | |
| continue | |
| seen_names.add(name) | |
| apps.append({ | |
| 'name': name, | |
| 'pid': pinfo['pid'], | |
| 'path': exe or '', | |
| 'cmdline': ' '.join(pinfo.get('cmdline', [])) | |
| }) | |
| except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): | |
| pass | |
| return apps | |
| except ImportError: | |
| logger.warning("psutil not installed, cannot get running applications") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Failed to get running applications list: {e}") | |
| return [] |