Fucius's picture
Upload 422 files
df6c67d verified
import socket
from socketserver import BaseRequestHandler, TCPServer
from typing import Any, Optional, Tuple, Type
class RoboflowTCPServer(TCPServer):
def __init__(
self,
server_address: Tuple[str, int],
handler_class: Type[BaseRequestHandler],
socket_operations_timeout: Optional[float] = None,
):
TCPServer.__init__(self, server_address, handler_class)
self._socket_operations_timeout = socket_operations_timeout
def get_request(self) -> Tuple[socket.socket, Any]:
connection, address = self.socket.accept()
connection.settimeout(self._socket_operations_timeout)
return connection, address