HINTECH / core /dhcp_server.py
Factor Studios
Upload 73 files
aaaaa79 verified
"""
DHCP Server Module
Implements a user-space DHCP server that handles:
- DHCP DISCOVER → OFFER → REQUEST → ACK sequence
- IP lease management
- Lease renewals and expiration
"""
import struct
import time
import socket
import threading
from typing import Dict, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
class DHCPMessageType(Enum):
DISCOVER = 1
OFFER = 2
REQUEST = 3
DECLINE = 4
ACK = 5
NAK = 6
RELEASE = 7
INFORM = 8
@dataclass
class DHCPLease:
"""Represents a DHCP lease"""
mac_address: str
ip_address: str
lease_time: int
lease_start: float
state: str = 'BOUND'
@property
def is_expired(self) -> bool:
return time.time() > (self.lease_start + self.lease_time)
@property
def remaining_time(self) -> int:
remaining = int((self.lease_start + self.lease_time) - time.time())
return max(0, remaining)
class DHCPPacket:
"""DHCP packet parser and builder"""
def __init__(self):
self.op = 0 # Message op code / message type
self.htype = 1 # Hardware address type (Ethernet = 1)
self.hlen = 6 # Hardware address length
self.hops = 0 # Hops
self.xid = 0 # Transaction ID
self.secs = 0 # Seconds elapsed
self.flags = 0 # Flags
self.ciaddr = '0.0.0.0' # Client IP address
self.yiaddr = '0.0.0.0' # Your IP address
self.siaddr = '0.0.0.0' # Server IP address
self.giaddr = '0.0.0.0' # Gateway IP address
self.chaddr = b'\x00' * 16 # Client hardware address
self.sname = b'\x00' * 64 # Server name
self.file = b'\x00' * 128 # Boot file name
self.options = {} # DHCP options
@classmethod
def parse(cls, data: bytes) -> 'DHCPPacket':
"""Parse DHCP packet from raw bytes"""
packet = cls()
# Parse fixed fields (first 236 bytes)
if len(data) < 236:
raise ValueError("DHCP packet too short")
fields = struct.unpack('!BBBBIHH4s4s4s4s16s64s128s', data[:236])
packet.op = fields[0]
packet.htype = fields[1]
packet.hlen = fields[2]
packet.hops = fields[3]
packet.xid = fields[4]
packet.secs = fields[5]
packet.flags = fields[6]
packet.ciaddr = socket.inet_ntoa(fields[7])
packet.yiaddr = socket.inet_ntoa(fields[8])
packet.siaddr = socket.inet_ntoa(fields[9])
packet.giaddr = socket.inet_ntoa(fields[10])
packet.chaddr = fields[11]
packet.sname = fields[12]
packet.file = fields[13]
# Parse options (after magic cookie)
options_data = data[236:]
if len(options_data) >= 4:
magic = struct.unpack('!I', options_data[:4])[0]
if magic == 0x63825363: # DHCP magic cookie
packet.options = packet._parse_options(options_data[4:])
return packet
def _parse_options(self, data: bytes) -> Dict[int, bytes]:
"""Parse DHCP options"""
options = {}
i = 0
while i < len(data):
if data[i] == 255: # End option
break
elif data[i] == 0: # Pad option
i += 1
continue
option_type = data[i]
if i + 1 >= len(data):
break
option_length = data[i + 1]
if i + 2 + option_length > len(data):
break
option_data = data[i + 2:i + 2 + option_length]
options[option_type] = option_data
i += 2 + option_length
return options
def build(self) -> bytes:
"""Build DHCP packet as bytes"""
# Build fixed fields
packet_data = struct.pack(
'!BBBBIHH4s4s4s4s16s64s128s',
self.op, self.htype, self.hlen, self.hops,
self.xid, self.secs, self.flags,
socket.inet_aton(self.ciaddr),
socket.inet_aton(self.yiaddr),
socket.inet_aton(self.siaddr),
socket.inet_aton(self.giaddr),
self.chaddr, self.sname, self.file
)
# Add magic cookie
packet_data += struct.pack('!I', 0x63825363)
# Add options
for option_type, option_data in self.options.items():
packet_data += struct.pack('!BB', option_type, len(option_data))
packet_data += option_data
# Add end option
packet_data += b'\xff'
# Pad to minimum size
while len(packet_data) < 300:
packet_data += b'\x00'
return packet_data
def get_mac_address(self) -> str:
"""Get client MAC address as string"""
return ':'.join(f'{b:02x}' for b in self.chaddr[:6])
def get_message_type(self) -> Optional[DHCPMessageType]:
"""Get DHCP message type from options"""
if 53 in self.options and len(self.options[53]) == 1:
msg_type = self.options[53][0]
try:
return DHCPMessageType(msg_type)
except ValueError:
return None
return None
class DHCPServer:
"""User-space DHCP server implementation"""
def __init__(self, config: Dict):
self.config = config
self.leases: Dict[str, DHCPLease] = {} # MAC -> Lease
self.ip_pool = self._build_ip_pool()
self.running = False
self.server_thread = None
self.lock = threading.Lock()
def _build_ip_pool(self) -> set:
"""Build available IP address pool"""
network = self.config['network']
start_ip = self.config['range_start']
end_ip = self.config['range_end']
# Convert IP addresses to integers for range calculation
start_int = struct.unpack('!I', socket.inet_aton(start_ip))[0]
end_int = struct.unpack('!I', socket.inet_aton(end_ip))[0]
pool = set()
for ip_int in range(start_int, end_int + 1):
ip_str = socket.inet_ntoa(struct.pack('!I', ip_int))
pool.add(ip_str)
return pool
def _get_available_ip(self) -> Optional[str]:
"""Get next available IP address"""
with self.lock:
# Remove expired leases
self._cleanup_expired_leases()
# Find available IP
used_ips = {lease.ip_address for lease in self.leases.values()}
available_ips = self.ip_pool - used_ips
if available_ips:
return min(available_ips) # Return lowest available IP
return None
def _cleanup_expired_leases(self):
"""Remove expired leases"""
expired_macs = [
mac for mac, lease in self.leases.items()
if lease.is_expired
]
for mac in expired_macs:
del self.leases[mac]
def _create_dhcp_offer(self, discover_packet: DHCPPacket) -> DHCPPacket:
"""Create DHCP OFFER response"""
mac_address = discover_packet.get_mac_address()
# Check for existing lease
if mac_address in self.leases and not self.leases[mac_address].is_expired:
offered_ip = self.leases[mac_address].ip_address
else:
offered_ip = self._get_available_ip()
if not offered_ip:
return None # No available IPs
# Create OFFER packet
offer = DHCPPacket()
offer.op = 2 # BOOTREPLY
offer.htype = discover_packet.htype
offer.hlen = discover_packet.hlen
offer.xid = discover_packet.xid
offer.yiaddr = offered_ip
offer.siaddr = self.config['gateway']
offer.chaddr = discover_packet.chaddr
# Add DHCP options
offer.options[53] = bytes([DHCPMessageType.OFFER.value]) # Message type
offer.options[1] = socket.inet_aton('255.255.255.0') # Subnet mask
offer.options[3] = socket.inet_aton(self.config['gateway']) # Router
offer.options[6] = b''.join(socket.inet_aton(dns) for dns in self.config['dns_servers']) # DNS
offer.options[51] = struct.pack('!I', self.config['lease_time']) # Lease time
offer.options[54] = socket.inet_aton(self.config['gateway']) # DHCP server identifier
return offer
def _create_dhcp_ack(self, request_packet: DHCPPacket) -> DHCPPacket:
"""Create DHCP ACK response"""
mac_address = request_packet.get_mac_address()
requested_ip = request_packet.ciaddr
# If no requested IP in ciaddr, check option 50
if requested_ip == '0.0.0.0' and 50 in request_packet.options:
requested_ip = socket.inet_ntoa(request_packet.options[50])
# Validate request
if not self._validate_request(mac_address, requested_ip):
return self._create_dhcp_nak(request_packet)
# Create or update lease
lease = DHCPLease(
mac_address=mac_address,
ip_address=requested_ip,
lease_time=self.config['lease_time'],
lease_start=time.time()
)
with self.lock:
self.leases[mac_address] = lease
# Create ACK packet
ack = DHCPPacket()
ack.op = 2 # BOOTREPLY
ack.htype = request_packet.htype
ack.hlen = request_packet.hlen
ack.xid = request_packet.xid
ack.yiaddr = requested_ip
ack.siaddr = self.config['gateway']
ack.chaddr = request_packet.chaddr
# Add DHCP options
ack.options[53] = bytes([DHCPMessageType.ACK.value]) # Message type
ack.options[1] = socket.inet_aton('255.255.255.0') # Subnet mask
ack.options[3] = socket.inet_aton(self.config['gateway']) # Router
ack.options[6] = b''.join(socket.inet_aton(dns) for dns in self.config['dns_servers']) # DNS
ack.options[51] = struct.pack('!I', self.config['lease_time']) # Lease time
ack.options[54] = socket.inet_aton(self.config['gateway']) # DHCP server identifier
return ack
def _create_dhcp_nak(self, request_packet: DHCPPacket) -> DHCPPacket:
"""Create DHCP NAK response"""
nak = DHCPPacket()
nak.op = 2 # BOOTREPLY
nak.htype = request_packet.htype
nak.hlen = request_packet.hlen
nak.xid = request_packet.xid
nak.chaddr = request_packet.chaddr
# Add DHCP options
nak.options[53] = bytes([DHCPMessageType.NAK.value]) # Message type
nak.options[54] = socket.inet_aton(self.config['gateway']) # DHCP server identifier
return nak
def _validate_request(self, mac_address: str, requested_ip: str) -> bool:
"""Validate DHCP request"""
# Check if IP is in our pool
if requested_ip not in self.ip_pool:
return False
# Check if IP is available or already assigned to this MAC
with self.lock:
for mac, lease in self.leases.items():
if lease.ip_address == requested_ip:
if mac != mac_address and not lease.is_expired:
return False # IP already assigned to different MAC
return True
def process_packet(self, packet_data: bytes, client_addr: Tuple[str, int]) -> Optional[bytes]:
"""Process incoming DHCP packet and return response"""
try:
packet = DHCPPacket.parse(packet_data)
message_type = packet.get_message_type()
if message_type == DHCPMessageType.DISCOVER:
response = self._create_dhcp_offer(packet)
elif message_type == DHCPMessageType.REQUEST:
response = self._create_dhcp_ack(packet)
elif message_type == DHCPMessageType.RELEASE:
# Handle lease release
mac_address = packet.get_mac_address()
with self.lock:
if mac_address in self.leases:
del self.leases[mac_address]
return None
else:
return None
if response:
return response.build()
except Exception as e:
print(f"Error processing DHCP packet: {e}")
return None
def get_leases(self) -> Dict[str, Dict]:
"""Get current lease table"""
with self.lock:
self._cleanup_expired_leases()
return {
mac: {
'ip_address': lease.ip_address,
'lease_time': lease.lease_time,
'lease_start': lease.lease_start,
'remaining_time': lease.remaining_time,
'state': lease.state
}
for mac, lease in self.leases.items()
}
def release_lease(self, mac_address: str) -> bool:
"""Manually release a lease"""
with self.lock:
if mac_address in self.leases:
del self.leases[mac_address]
return True
return False
def start(self):
"""Start DHCP server (placeholder for integration with packet bridge)"""
self.running = True
print(f"DHCP server started - Pool: {self.config['range_start']} - {self.config['range_end']}")
def stop(self):
"""Stop DHCP server"""
self.running = False
print("DHCP server stopped")