import socket from socketserver import BaseRequestHandler, TCPServer from typing import Any, Optional, Tuple, Type class RoboflowTCPServer(TCPServer): def __init__( self, server_address: Tuple[str, int], handler_class: Type[BaseRequestHandler], socket_operations_timeout: Optional[float] = None, ): TCPServer.__init__(self, server_address, handler_class) self._socket_operations_timeout = socket_operations_timeout def get_request(self) -> Tuple[socket.socket, Any]: connection, address = self.socket.accept() connection.settimeout(self._socket_operations_timeout) return connection, address