Spaces:
Paused
Paused
| """ | |
| DHCP Server Module | |
| Implements a user-space DHCP server that handles: | |
| - DHCP DISCOVER → OFFER → REQUEST → ACK sequence | |
| - IP lease management | |
| - Lease renewals and expiration | |
| """ | |
| import struct | |
| import time | |
| import socket | |
| import threading | |
| from typing import Dict, Optional, Tuple | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| class DHCPMessageType(Enum): | |
| DISCOVER = 1 | |
| OFFER = 2 | |
| REQUEST = 3 | |
| DECLINE = 4 | |
| ACK = 5 | |
| NAK = 6 | |
| RELEASE = 7 | |
| INFORM = 8 | |
| class DHCPLease: | |
| """Represents a DHCP lease""" | |
| mac_address: str | |
| ip_address: str | |
| lease_time: int | |
| lease_start: float | |
| state: str = 'BOUND' | |
| def is_expired(self) -> bool: | |
| return time.time() > (self.lease_start + self.lease_time) | |
| def remaining_time(self) -> int: | |
| remaining = int((self.lease_start + self.lease_time) - time.time()) | |
| return max(0, remaining) | |
| class DHCPPacket: | |
| """DHCP packet parser and builder""" | |
| def __init__(self): | |
| self.op = 0 # Message op code / message type | |
| self.htype = 1 # Hardware address type (Ethernet = 1) | |
| self.hlen = 6 # Hardware address length | |
| self.hops = 0 # Hops | |
| self.xid = 0 # Transaction ID | |
| self.secs = 0 # Seconds elapsed | |
| self.flags = 0 # Flags | |
| self.ciaddr = '0.0.0.0' # Client IP address | |
| self.yiaddr = '0.0.0.0' # Your IP address | |
| self.siaddr = '0.0.0.0' # Server IP address | |
| self.giaddr = '0.0.0.0' # Gateway IP address | |
| self.chaddr = b'\x00' * 16 # Client hardware address | |
| self.sname = b'\x00' * 64 # Server name | |
| self.file = b'\x00' * 128 # Boot file name | |
| self.options = {} # DHCP options | |
| def parse(cls, data: bytes) -> 'DHCPPacket': | |
| """Parse DHCP packet from raw bytes""" | |
| packet = cls() | |
| # Parse fixed fields (first 236 bytes) | |
| if len(data) < 236: | |
| raise ValueError("DHCP packet too short") | |
| fields = struct.unpack('!BBBBIHH4s4s4s4s16s64s128s', data[:236]) | |
| packet.op = fields[0] | |
| packet.htype = fields[1] | |
| packet.hlen = fields[2] | |
| packet.hops = fields[3] | |
| packet.xid = fields[4] | |
| packet.secs = fields[5] | |
| packet.flags = fields[6] | |
| packet.ciaddr = socket.inet_ntoa(fields[7]) | |
| packet.yiaddr = socket.inet_ntoa(fields[8]) | |
| packet.siaddr = socket.inet_ntoa(fields[9]) | |
| packet.giaddr = socket.inet_ntoa(fields[10]) | |
| packet.chaddr = fields[11] | |
| packet.sname = fields[12] | |
| packet.file = fields[13] | |
| # Parse options (after magic cookie) | |
| options_data = data[236:] | |
| if len(options_data) >= 4: | |
| magic = struct.unpack('!I', options_data[:4])[0] | |
| if magic == 0x63825363: # DHCP magic cookie | |
| packet.options = packet._parse_options(options_data[4:]) | |
| return packet | |
| def _parse_options(self, data: bytes) -> Dict[int, bytes]: | |
| """Parse DHCP options""" | |
| options = {} | |
| i = 0 | |
| while i < len(data): | |
| if data[i] == 255: # End option | |
| break | |
| elif data[i] == 0: # Pad option | |
| i += 1 | |
| continue | |
| option_type = data[i] | |
| if i + 1 >= len(data): | |
| break | |
| option_length = data[i + 1] | |
| if i + 2 + option_length > len(data): | |
| break | |
| option_data = data[i + 2:i + 2 + option_length] | |
| options[option_type] = option_data | |
| i += 2 + option_length | |
| return options | |
| def build(self) -> bytes: | |
| """Build DHCP packet as bytes""" | |
| # Build fixed fields | |
| packet_data = struct.pack( | |
| '!BBBBIHH4s4s4s4s16s64s128s', | |
| self.op, self.htype, self.hlen, self.hops, | |
| self.xid, self.secs, self.flags, | |
| socket.inet_aton(self.ciaddr), | |
| socket.inet_aton(self.yiaddr), | |
| socket.inet_aton(self.siaddr), | |
| socket.inet_aton(self.giaddr), | |
| self.chaddr, self.sname, self.file | |
| ) | |
| # Add magic cookie | |
| packet_data += struct.pack('!I', 0x63825363) | |
| # Add options | |
| for option_type, option_data in self.options.items(): | |
| packet_data += struct.pack('!BB', option_type, len(option_data)) | |
| packet_data += option_data | |
| # Add end option | |
| packet_data += b'\xff' | |
| # Pad to minimum size | |
| while len(packet_data) < 300: | |
| packet_data += b'\x00' | |
| return packet_data | |
| def get_mac_address(self) -> str: | |
| """Get client MAC address as string""" | |
| return ':'.join(f'{b:02x}' for b in self.chaddr[:6]) | |
| def get_message_type(self) -> Optional[DHCPMessageType]: | |
| """Get DHCP message type from options""" | |
| if 53 in self.options and len(self.options[53]) == 1: | |
| msg_type = self.options[53][0] | |
| try: | |
| return DHCPMessageType(msg_type) | |
| except ValueError: | |
| return None | |
| return None | |
| class DHCPServer: | |
| """User-space DHCP server implementation""" | |
| def __init__(self, config: Dict): | |
| self.config = config | |
| self.leases: Dict[str, DHCPLease] = {} # MAC -> Lease | |
| self.ip_pool = self._build_ip_pool() | |
| self.running = False | |
| self.server_thread = None | |
| self.lock = threading.Lock() | |
| def _build_ip_pool(self) -> set: | |
| """Build available IP address pool""" | |
| network = self.config['network'] | |
| start_ip = self.config['range_start'] | |
| end_ip = self.config['range_end'] | |
| # Convert IP addresses to integers for range calculation | |
| start_int = struct.unpack('!I', socket.inet_aton(start_ip))[0] | |
| end_int = struct.unpack('!I', socket.inet_aton(end_ip))[0] | |
| pool = set() | |
| for ip_int in range(start_int, end_int + 1): | |
| ip_str = socket.inet_ntoa(struct.pack('!I', ip_int)) | |
| pool.add(ip_str) | |
| return pool | |
| def _get_available_ip(self) -> Optional[str]: | |
| """Get next available IP address""" | |
| with self.lock: | |
| # Remove expired leases | |
| self._cleanup_expired_leases() | |
| # Find available IP | |
| used_ips = {lease.ip_address for lease in self.leases.values()} | |
| available_ips = self.ip_pool - used_ips | |
| if available_ips: | |
| return min(available_ips) # Return lowest available IP | |
| return None | |
| def _cleanup_expired_leases(self): | |
| """Remove expired leases""" | |
| expired_macs = [ | |
| mac for mac, lease in self.leases.items() | |
| if lease.is_expired | |
| ] | |
| for mac in expired_macs: | |
| del self.leases[mac] | |
| def _create_dhcp_offer(self, discover_packet: DHCPPacket) -> DHCPPacket: | |
| """Create DHCP OFFER response""" | |
| mac_address = discover_packet.get_mac_address() | |
| # Check for existing lease | |
| if mac_address in self.leases and not self.leases[mac_address].is_expired: | |
| offered_ip = self.leases[mac_address].ip_address | |
| else: | |
| offered_ip = self._get_available_ip() | |
| if not offered_ip: | |
| return None # No available IPs | |
| # Create OFFER packet | |
| offer = DHCPPacket() | |
| offer.op = 2 # BOOTREPLY | |
| offer.htype = discover_packet.htype | |
| offer.hlen = discover_packet.hlen | |
| offer.xid = discover_packet.xid | |
| offer.yiaddr = offered_ip | |
| offer.siaddr = self.config['gateway'] | |
| offer.chaddr = discover_packet.chaddr | |
| # Add DHCP options | |
| offer.options[53] = bytes([DHCPMessageType.OFFER.value]) # Message type | |
| offer.options[1] = socket.inet_aton('255.255.255.0') # Subnet mask | |
| offer.options[3] = socket.inet_aton(self.config['gateway']) # Router | |
| offer.options[6] = b''.join(socket.inet_aton(dns) for dns in self.config['dns_servers']) # DNS | |
| offer.options[51] = struct.pack('!I', self.config['lease_time']) # Lease time | |
| offer.options[54] = socket.inet_aton(self.config['gateway']) # DHCP server identifier | |
| return offer | |
| def _create_dhcp_ack(self, request_packet: DHCPPacket) -> DHCPPacket: | |
| """Create DHCP ACK response""" | |
| mac_address = request_packet.get_mac_address() | |
| requested_ip = request_packet.ciaddr | |
| # If no requested IP in ciaddr, check option 50 | |
| if requested_ip == '0.0.0.0' and 50 in request_packet.options: | |
| requested_ip = socket.inet_ntoa(request_packet.options[50]) | |
| # Validate request | |
| if not self._validate_request(mac_address, requested_ip): | |
| return self._create_dhcp_nak(request_packet) | |
| # Create or update lease | |
| lease = DHCPLease( | |
| mac_address=mac_address, | |
| ip_address=requested_ip, | |
| lease_time=self.config['lease_time'], | |
| lease_start=time.time() | |
| ) | |
| with self.lock: | |
| self.leases[mac_address] = lease | |
| # Create ACK packet | |
| ack = DHCPPacket() | |
| ack.op = 2 # BOOTREPLY | |
| ack.htype = request_packet.htype | |
| ack.hlen = request_packet.hlen | |
| ack.xid = request_packet.xid | |
| ack.yiaddr = requested_ip | |
| ack.siaddr = self.config['gateway'] | |
| ack.chaddr = request_packet.chaddr | |
| # Add DHCP options | |
| ack.options[53] = bytes([DHCPMessageType.ACK.value]) # Message type | |
| ack.options[1] = socket.inet_aton('255.255.255.0') # Subnet mask | |
| ack.options[3] = socket.inet_aton(self.config['gateway']) # Router | |
| ack.options[6] = b''.join(socket.inet_aton(dns) for dns in self.config['dns_servers']) # DNS | |
| ack.options[51] = struct.pack('!I', self.config['lease_time']) # Lease time | |
| ack.options[54] = socket.inet_aton(self.config['gateway']) # DHCP server identifier | |
| return ack | |
| def _create_dhcp_nak(self, request_packet: DHCPPacket) -> DHCPPacket: | |
| """Create DHCP NAK response""" | |
| nak = DHCPPacket() | |
| nak.op = 2 # BOOTREPLY | |
| nak.htype = request_packet.htype | |
| nak.hlen = request_packet.hlen | |
| nak.xid = request_packet.xid | |
| nak.chaddr = request_packet.chaddr | |
| # Add DHCP options | |
| nak.options[53] = bytes([DHCPMessageType.NAK.value]) # Message type | |
| nak.options[54] = socket.inet_aton(self.config['gateway']) # DHCP server identifier | |
| return nak | |
| def _validate_request(self, mac_address: str, requested_ip: str) -> bool: | |
| """Validate DHCP request""" | |
| # Check if IP is in our pool | |
| if requested_ip not in self.ip_pool: | |
| return False | |
| # Check if IP is available or already assigned to this MAC | |
| with self.lock: | |
| for mac, lease in self.leases.items(): | |
| if lease.ip_address == requested_ip: | |
| if mac != mac_address and not lease.is_expired: | |
| return False # IP already assigned to different MAC | |
| return True | |
| def process_packet(self, packet_data: bytes, client_addr: Tuple[str, int]) -> Optional[bytes]: | |
| """Process incoming DHCP packet and return response""" | |
| try: | |
| packet = DHCPPacket.parse(packet_data) | |
| message_type = packet.get_message_type() | |
| if message_type == DHCPMessageType.DISCOVER: | |
| response = self._create_dhcp_offer(packet) | |
| elif message_type == DHCPMessageType.REQUEST: | |
| response = self._create_dhcp_ack(packet) | |
| elif message_type == DHCPMessageType.RELEASE: | |
| # Handle lease release | |
| mac_address = packet.get_mac_address() | |
| with self.lock: | |
| if mac_address in self.leases: | |
| del self.leases[mac_address] | |
| return None | |
| else: | |
| return None | |
| if response: | |
| return response.build() | |
| except Exception as e: | |
| print(f"Error processing DHCP packet: {e}") | |
| return None | |
| def get_leases(self) -> Dict[str, Dict]: | |
| """Get current lease table""" | |
| with self.lock: | |
| self._cleanup_expired_leases() | |
| return { | |
| mac: { | |
| 'ip_address': lease.ip_address, | |
| 'lease_time': lease.lease_time, | |
| 'lease_start': lease.lease_start, | |
| 'remaining_time': lease.remaining_time, | |
| 'state': lease.state | |
| } | |
| for mac, lease in self.leases.items() | |
| } | |
| def release_lease(self, mac_address: str) -> bool: | |
| """Manually release a lease""" | |
| with self.lock: | |
| if mac_address in self.leases: | |
| del self.leases[mac_address] | |
| return True | |
| return False | |
| def start(self): | |
| """Start DHCP server (placeholder for integration with packet bridge)""" | |
| self.running = True | |
| print(f"DHCP server started - Pool: {self.config['range_start']} - {self.config['range_end']}") | |
| def stop(self): | |
| """Stop DHCP server""" | |
| self.running = False | |
| print("DHCP server stopped") | |