Spaces:
Paused
Paused
| """ | |
| Socket Translator Module | |
| Bridges virtual connections to real host sockets: | |
| - Map virtual connections to host sockets/HTTP clients | |
| - Bidirectional data streaming | |
| - Connection lifecycle management | |
| - Protocol translation (TCP/UDP to host sockets) | |
| """ | |
| import socket | |
| import threading | |
| import time | |
| import asyncio | |
| import aiohttp | |
| import ssl | |
| from typing import Dict, Optional, Callable, Tuple, Any | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| import urllib.parse | |
| import json | |
| from .tcp_engine import TCPConnection | |
| class ConnectionType(Enum): | |
| TCP_SOCKET = "TCP_SOCKET" | |
| UDP_SOCKET = "UDP_SOCKET" | |
| HTTP_CLIENT = "HTTP_CLIENT" | |
| HTTPS_CLIENT = "HTTPS_CLIENT" | |
| class SocketConnection: | |
| """Represents a socket connection""" | |
| connection_id: str | |
| connection_type: ConnectionType | |
| virtual_connection: Optional[TCPConnection] | |
| host_socket: Optional[socket.socket] | |
| remote_host: str | |
| remote_port: int | |
| created_time: float | |
| last_activity: float | |
| bytes_sent: int = 0 | |
| bytes_received: int = 0 | |
| is_connected: bool = False | |
| error_count: int = 0 | |
| def update_activity(self, bytes_transferred: int = 0, direction: str = 'sent'): | |
| """Update connection activity""" | |
| self.last_activity = time.time() | |
| if direction == 'sent': | |
| self.bytes_sent += bytes_transferred | |
| else: | |
| self.bytes_received += bytes_transferred | |
| def to_dict(self) -> Dict: | |
| """Convert to dictionary""" | |
| return { | |
| 'connection_id': self.connection_id, | |
| 'connection_type': self.connection_type.value, | |
| 'remote_host': self.remote_host, | |
| 'remote_port': self.remote_port, | |
| 'created_time': self.created_time, | |
| 'last_activity': self.last_activity, | |
| 'bytes_sent': self.bytes_sent, | |
| 'bytes_received': self.bytes_received, | |
| 'is_connected': self.is_connected, | |
| 'error_count': self.error_count, | |
| 'duration': time.time() - self.created_time | |
| } | |
| class HTTPRequest: | |
| """Represents an HTTP request""" | |
| def __init__(self, method: str = 'GET', path: str = '/', headers: Dict[str, str] = None, body: bytes = b''): | |
| self.method = method.upper() | |
| self.path = path | |
| self.headers = headers or {} | |
| self.body = body | |
| self.version = 'HTTP/1.1' | |
| def parse(cls, data: bytes) -> Optional['HTTPRequest']: | |
| """Parse HTTP request from raw data""" | |
| try: | |
| lines = data.decode('utf-8', errors='ignore').split('\r\n') | |
| if not lines: | |
| return None | |
| # Parse request line | |
| request_line = lines[0].split(' ') | |
| if len(request_line) < 3: | |
| return None | |
| method, path, version = request_line[0], request_line[1], request_line[2] | |
| # Parse headers | |
| headers = {} | |
| body_start = 1 | |
| for i, line in enumerate(lines[1:], 1): | |
| if line == '': | |
| body_start = i + 1 | |
| break | |
| if ':' in line: | |
| key, value = line.split(':', 1) | |
| headers[key.strip().lower()] = value.strip() | |
| # Parse body | |
| body_lines = lines[body_start:] | |
| body = '\r\n'.join(body_lines).encode('utf-8') | |
| return cls(method, path, headers, body) | |
| except Exception: | |
| return None | |
| def to_bytes(self) -> bytes: | |
| """Convert to raw HTTP request""" | |
| request_line = f"{self.method} {self.path} {self.version}\r\n" | |
| # Add default headers | |
| if 'host' not in self.headers: | |
| self.headers['host'] = 'localhost' | |
| if 'user-agent' not in self.headers: | |
| self.headers['user-agent'] = 'VirtualISP/1.0' | |
| if self.body and 'content-length' not in self.headers: | |
| self.headers['content-length'] = str(len(self.body)) | |
| # Build headers | |
| header_lines = [] | |
| for key, value in self.headers.items(): | |
| header_lines.append(f"{key}: {value}\r\n") | |
| # Combine all parts | |
| request_data = request_line + ''.join(header_lines) + '\r\n' | |
| return request_data.encode('utf-8') + self.body | |
| class HTTPResponse: | |
| """Represents an HTTP response""" | |
| def __init__(self, status_code: int = 200, reason: str = 'OK', headers: Dict[str, str] = None, body: bytes = b''): | |
| self.status_code = status_code | |
| self.reason = reason | |
| self.headers = headers or {} | |
| self.body = body | |
| self.version = 'HTTP/1.1' | |
| def parse(cls, data: bytes) -> Optional['HTTPResponse']: | |
| """Parse HTTP response from raw data""" | |
| try: | |
| lines = data.decode('utf-8', errors='ignore').split('\r\n') | |
| if not lines: | |
| return None | |
| # Parse status line | |
| status_line = lines[0].split(' ', 2) | |
| if len(status_line) < 3: | |
| return None | |
| version, status_code, reason = status_line[0], int(status_line[1]), status_line[2] | |
| # Parse headers | |
| headers = {} | |
| body_start = 1 | |
| for i, line in enumerate(lines[1:], 1): | |
| if line == '': | |
| body_start = i + 1 | |
| break | |
| if ':' in line: | |
| key, value = line.split(':', 1) | |
| headers[key.strip().lower()] = value.strip() | |
| # Parse body | |
| body_lines = lines[body_start:] | |
| body = '\r\n'.join(body_lines).encode('utf-8') | |
| return cls(status_code, reason, headers, body) | |
| except Exception: | |
| return None | |
| def to_bytes(self) -> bytes: | |
| """Convert to raw HTTP response""" | |
| status_line = f"{self.version} {self.status_code} {self.reason}\r\n" | |
| # Add default headers | |
| if 'content-length' not in self.headers and self.body: | |
| self.headers['content-length'] = str(len(self.body)) | |
| if 'server' not in self.headers: | |
| self.headers['server'] = 'VirtualISP/1.0' | |
| # Build headers | |
| header_lines = [] | |
| for key, value in self.headers.items(): | |
| header_lines.append(f"{key}: {value}\r\n") | |
| # Combine all parts | |
| response_data = status_line + ''.join(header_lines) + '\r\n' | |
| return response_data.encode('utf-8') + self.body | |
| class SocketTranslator: | |
| """Socket translator implementation""" | |
| def __init__(self, config: Dict): | |
| self.config = config | |
| self.connections: Dict[str, SocketConnection] = {} | |
| self.lock = threading.Lock() | |
| # Configuration | |
| self.connect_timeout = config.get('connect_timeout', 10) | |
| self.read_timeout = config.get('read_timeout', 30) | |
| self.max_connections = config.get('max_connections', 1000) | |
| self.buffer_size = config.get('buffer_size', 8192) | |
| # HTTP client session | |
| self.http_session = None | |
| self.loop = None | |
| # Statistics | |
| self.stats = { | |
| 'total_connections': 0, | |
| 'active_connections': 0, | |
| 'failed_connections': 0, | |
| 'bytes_transferred': 0, | |
| 'http_requests': 0, | |
| 'tcp_connections': 0, | |
| 'udp_connections': 0 | |
| } | |
| # Background tasks | |
| self.running = False | |
| self.cleanup_thread = None | |
| async def _init_http_session(self): | |
| """Initialize HTTP client session""" | |
| connector = aiohttp.TCPConnector( | |
| limit=100, | |
| limit_per_host=10, | |
| ttl_dns_cache=300, | |
| use_dns_cache=True, | |
| ) | |
| timeout = aiohttp.ClientTimeout( | |
| total=self.read_timeout, | |
| connect=self.connect_timeout | |
| ) | |
| self.http_session = aiohttp.ClientSession( | |
| connector=connector, | |
| timeout=timeout, | |
| headers={'User-Agent': 'VirtualISP/1.0'} | |
| ) | |
| def _is_http_request(self, data: bytes) -> bool: | |
| """Check if data looks like an HTTP request""" | |
| try: | |
| first_line = data.split(b'\r\n')[0].decode('utf-8', errors='ignore') | |
| methods = ['GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS', 'PATCH', 'TRACE'] | |
| return any(first_line.startswith(method + ' ') for method in methods) | |
| except: | |
| return False | |
| def _determine_connection_type(self, remote_host: str, remote_port: int, data: bytes = b'') -> ConnectionType: | |
| """Determine the appropriate connection type""" | |
| # Check for HTTP/HTTPS based on port and data | |
| if remote_port == 80 or (data and self._is_http_request(data)): | |
| return ConnectionType.HTTP_CLIENT | |
| elif remote_port == 443: | |
| return ConnectionType.HTTPS_CLIENT | |
| else: | |
| return ConnectionType.TCP_SOCKET | |
| def create_connection(self, virtual_conn: TCPConnection, remote_host: str, remote_port: int, | |
| initial_data: bytes = b'') -> Optional[SocketConnection]: | |
| """Create a new socket connection""" | |
| connection_id = f"{virtual_conn.connection_id}->{remote_host}:{remote_port}" | |
| # Check connection limit | |
| with self.lock: | |
| if len(self.connections) >= self.max_connections: | |
| return None | |
| # Determine connection type | |
| conn_type = self._determine_connection_type(remote_host, remote_port, initial_data) | |
| # Create socket connection | |
| socket_conn = SocketConnection( | |
| connection_id=connection_id, | |
| connection_type=conn_type, | |
| virtual_connection=virtual_conn, | |
| host_socket=None, | |
| remote_host=remote_host, | |
| remote_port=remote_port, | |
| created_time=time.time(), | |
| last_activity=time.time() | |
| ) | |
| with self.lock: | |
| self.connections[connection_id] = socket_conn | |
| # Establish connection based on type | |
| if conn_type in [ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT]: | |
| success = self._create_http_connection(socket_conn, initial_data) | |
| else: | |
| success = self._create_tcp_connection(socket_conn, initial_data) | |
| if success: | |
| self.stats['total_connections'] += 1 | |
| self.stats['active_connections'] = len(self.connections) | |
| if conn_type in [ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT]: | |
| self.stats['http_requests'] += 1 | |
| else: | |
| self.stats['tcp_connections'] += 1 | |
| else: | |
| self.stats['failed_connections'] += 1 | |
| with self.lock: | |
| if connection_id in self.connections: | |
| del self.connections[connection_id] | |
| return None | |
| return socket_conn | |
| def _create_tcp_connection(self, socket_conn: SocketConnection, initial_data: bytes) -> bool: | |
| """Create TCP socket connection""" | |
| try: | |
| # Create socket | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| sock.settimeout(self.connect_timeout) | |
| # Connect | |
| sock.connect((socket_conn.remote_host, socket_conn.remote_port)) | |
| sock.settimeout(self.read_timeout) | |
| socket_conn.host_socket = sock | |
| socket_conn.is_connected = True | |
| # Send initial data if any | |
| if initial_data: | |
| sock.send(initial_data) | |
| socket_conn.update_activity(len(initial_data), 'sent') | |
| # Start background thread for receiving data | |
| thread = threading.Thread( | |
| target=self._tcp_receive_loop, | |
| args=(socket_conn,), | |
| daemon=True | |
| ) | |
| thread.start() | |
| return True | |
| except Exception as e: | |
| print(f"Failed to create TCP connection to {socket_conn.remote_host}:{socket_conn.remote_port}: {e}") | |
| socket_conn.error_count += 1 | |
| return False | |
| def _create_http_connection(self, socket_conn: SocketConnection, initial_data: bytes) -> bool: | |
| """Create HTTP connection""" | |
| try: | |
| # Parse HTTP request | |
| http_request = HTTPRequest.parse(initial_data) | |
| if not http_request: | |
| return False | |
| # Set host header | |
| http_request.headers['host'] = socket_conn.remote_host | |
| # Start async HTTP request | |
| if self.loop and not self.loop.is_closed(): | |
| asyncio.run_coroutine_threadsafe( | |
| self._handle_http_request(socket_conn, http_request), | |
| self.loop | |
| ) | |
| else: | |
| # Fallback to sync HTTP handling | |
| return self._handle_http_request_sync(socket_conn, http_request) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to create HTTP connection to {socket_conn.remote_host}:{socket_conn.remote_port}: {e}") | |
| socket_conn.error_count += 1 | |
| return False | |
| async def _handle_http_request(self, socket_conn: SocketConnection, http_request: HTTPRequest): | |
| """Handle HTTP request asynchronously""" | |
| try: | |
| if not self.http_session: | |
| await self._init_http_session() | |
| # Build URL | |
| scheme = 'https' if socket_conn.connection_type == ConnectionType.HTTPS_CLIENT else 'http' | |
| url = f"{scheme}://{socket_conn.remote_host}:{socket_conn.remote_port}{http_request.path}" | |
| # Make request | |
| async with self.http_session.request( | |
| method=http_request.method, | |
| url=url, | |
| headers=http_request.headers, | |
| data=http_request.body | |
| ) as response: | |
| # Read response | |
| response_body = await response.read() | |
| # Create HTTP response | |
| http_response = HTTPResponse( | |
| status_code=response.status, | |
| reason=response.reason or 'OK', | |
| headers=dict(response.headers), | |
| body=response_body | |
| ) | |
| # Send response back to virtual connection | |
| response_data = http_response.to_bytes() | |
| if socket_conn.virtual_connection and socket_conn.virtual_connection.on_data_received: | |
| socket_conn.virtual_connection.on_data_received(response_data) | |
| socket_conn.update_activity(len(response_data), 'received') | |
| self.stats['bytes_transferred'] += len(response_data) | |
| except Exception as e: | |
| print(f"HTTP request failed: {e}") | |
| socket_conn.error_count += 1 | |
| # Send error response | |
| error_response = HTTPResponse( | |
| status_code=500, | |
| reason='Internal Server Error', | |
| body=f"Error: {str(e)}".encode('utf-8') | |
| ) | |
| response_data = error_response.to_bytes() | |
| if socket_conn.virtual_connection and socket_conn.virtual_connection.on_data_received: | |
| socket_conn.virtual_connection.on_data_received(response_data) | |
| def _handle_http_request_sync(self, socket_conn: SocketConnection, http_request: HTTPRequest) -> bool: | |
| """Handle HTTP request synchronously (fallback)""" | |
| try: | |
| # Use urllib for sync HTTP requests | |
| scheme = 'https' if socket_conn.connection_type == ConnectionType.HTTPS_CLIENT else 'http' | |
| url = f"{scheme}://{socket_conn.remote_host}:{socket_conn.remote_port}{http_request.path}" | |
| import urllib.request | |
| import urllib.error | |
| # Create request | |
| req = urllib.request.Request( | |
| url, | |
| data=http_request.body if http_request.body else None, | |
| headers=http_request.headers, | |
| method=http_request.method | |
| ) | |
| # Make request | |
| with urllib.request.urlopen(req, timeout=self.read_timeout) as response: | |
| response_body = response.read() | |
| # Create HTTP response | |
| http_response = HTTPResponse( | |
| status_code=response.getcode(), | |
| reason='OK', | |
| headers=dict(response.headers), | |
| body=response_body | |
| ) | |
| # Send response back to virtual connection | |
| response_data = http_response.to_bytes() | |
| if socket_conn.virtual_connection and socket_conn.virtual_connection.on_data_received: | |
| socket_conn.virtual_connection.on_data_received(response_data) | |
| socket_conn.update_activity(len(response_data), 'received') | |
| self.stats['bytes_transferred'] += len(response_data) | |
| return True | |
| except Exception as e: | |
| print(f"Sync HTTP request failed: {e}") | |
| socket_conn.error_count += 1 | |
| return False | |
| def _tcp_receive_loop(self, socket_conn: SocketConnection): | |
| """Background loop for receiving TCP data""" | |
| sock = socket_conn.host_socket | |
| if not sock: | |
| return | |
| try: | |
| while socket_conn.is_connected: | |
| try: | |
| data = sock.recv(self.buffer_size) | |
| if not data: | |
| break | |
| # Forward data to virtual connection | |
| if socket_conn.virtual_connection and socket_conn.virtual_connection.on_data_received: | |
| socket_conn.virtual_connection.on_data_received(data) | |
| socket_conn.update_activity(len(data), 'received') | |
| self.stats['bytes_transferred'] += len(data) | |
| except socket.timeout: | |
| continue | |
| except Exception as e: | |
| print(f"TCP receive error: {e}") | |
| break | |
| finally: | |
| self._close_connection(socket_conn.connection_id) | |
| def send_data(self, connection_id: str, data: bytes) -> bool: | |
| """Send data through socket connection""" | |
| with self.lock: | |
| socket_conn = self.connections.get(connection_id) | |
| if not socket_conn or not socket_conn.is_connected: | |
| return False | |
| try: | |
| if socket_conn.connection_type in [ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT]: | |
| # For HTTP connections, treat as new request | |
| return self._create_http_connection(socket_conn, data) | |
| else: | |
| # TCP connection | |
| if socket_conn.host_socket: | |
| socket_conn.host_socket.send(data) | |
| socket_conn.update_activity(len(data), 'sent') | |
| self.stats['bytes_transferred'] += len(data) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to send data: {e}") | |
| socket_conn.error_count += 1 | |
| self._close_connection(connection_id) | |
| return False | |
| def _close_connection(self, connection_id: str): | |
| """Close socket connection""" | |
| with self.lock: | |
| socket_conn = self.connections.get(connection_id) | |
| if not socket_conn: | |
| return | |
| # Close socket | |
| if socket_conn.host_socket: | |
| try: | |
| socket_conn.host_socket.close() | |
| except: | |
| pass | |
| socket_conn.is_connected = False | |
| # Remove from connections | |
| del self.connections[connection_id] | |
| self.stats['active_connections'] = len(self.connections) | |
| def close_connection(self, connection_id: str) -> bool: | |
| """Manually close connection""" | |
| self._close_connection(connection_id) | |
| return True | |
| def get_connection(self, connection_id: str) -> Optional[SocketConnection]: | |
| """Get socket connection""" | |
| with self.lock: | |
| return self.connections.get(connection_id) | |
| def get_connections(self) -> Dict[str, Dict]: | |
| """Get all socket connections""" | |
| with self.lock: | |
| return { | |
| conn_id: conn.to_dict() | |
| for conn_id, conn in self.connections.items() | |
| } | |
| def get_stats(self) -> Dict: | |
| """Get socket translator statistics""" | |
| with self.lock: | |
| stats = self.stats.copy() | |
| stats['active_connections'] = len(self.connections) | |
| return stats | |
| def _cleanup_loop(self): | |
| """Background cleanup loop""" | |
| while self.running: | |
| try: | |
| current_time = time.time() | |
| expired_connections = [] | |
| with self.lock: | |
| for conn_id, conn in self.connections.items(): | |
| # Close connections that have been inactive too long | |
| if current_time - conn.last_activity > self.read_timeout * 2: | |
| expired_connections.append(conn_id) | |
| for conn_id in expired_connections: | |
| self._close_connection(conn_id) | |
| time.sleep(30) # Cleanup every 30 seconds | |
| except Exception as e: | |
| print(f"Socket translator cleanup error: {e}") | |
| time.sleep(5) | |
| def start(self): | |
| """Start socket translator""" | |
| self.running = True | |
| # Start event loop for async HTTP | |
| try: | |
| self.loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(self.loop) | |
| # Start cleanup thread | |
| self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) | |
| self.cleanup_thread.start() | |
| print("Socket translator started") | |
| except Exception as e: | |
| print(f"Failed to start socket translator: {e}") | |
| def stop(self): | |
| """Stop socket translator""" | |
| self.running = False | |
| # Close all connections | |
| with self.lock: | |
| connection_ids = list(self.connections.keys()) | |
| for conn_id in connection_ids: | |
| self._close_connection(conn_id) | |
| # Close HTTP session | |
| if self.http_session: | |
| asyncio.run_coroutine_threadsafe(self.http_session.close(), self.loop) | |
| # Close event loop | |
| if self.loop and not self.loop.is_closed(): | |
| self.loop.call_soon_threadsafe(self.loop.stop) | |
| # Wait for cleanup thread | |
| if self.cleanup_thread: | |
| self.cleanup_thread.join() | |
| print("Socket translator stopped") | |