|
from typing import Optional, Union |
|
from urllib.parse import urlparse |
|
|
|
import dns.asyncbackend |
|
import dns.asyncquery |
|
import dns.inet |
|
import dns.message |
|
import dns.query |
|
|
|
|
|
class Nameserver: |
|
def __init__(self): |
|
pass |
|
|
|
def __str__(self): |
|
raise NotImplementedError |
|
|
|
def kind(self) -> str: |
|
raise NotImplementedError |
|
|
|
def is_always_max_size(self) -> bool: |
|
raise NotImplementedError |
|
|
|
def answer_nameserver(self) -> str: |
|
raise NotImplementedError |
|
|
|
def answer_port(self) -> int: |
|
raise NotImplementedError |
|
|
|
def query( |
|
self, |
|
request: dns.message.QueryMessage, |
|
timeout: float, |
|
source: Optional[str], |
|
source_port: int, |
|
max_size: bool, |
|
one_rr_per_rrset: bool = False, |
|
ignore_trailing: bool = False, |
|
) -> dns.message.Message: |
|
raise NotImplementedError |
|
|
|
async def async_query( |
|
self, |
|
request: dns.message.QueryMessage, |
|
timeout: float, |
|
source: Optional[str], |
|
source_port: int, |
|
max_size: bool, |
|
backend: dns.asyncbackend.Backend, |
|
one_rr_per_rrset: bool = False, |
|
ignore_trailing: bool = False, |
|
) -> dns.message.Message: |
|
raise NotImplementedError |
|
|
|
|
|
class AddressAndPortNameserver(Nameserver): |
|
def __init__(self, address: str, port: int): |
|
super().__init__() |
|
self.address = address |
|
self.port = port |
|
|
|
def kind(self) -> str: |
|
raise NotImplementedError |
|
|
|
def is_always_max_size(self) -> bool: |
|
return False |
|
|
|
def __str__(self): |
|
ns_kind = self.kind() |
|
return f"{ns_kind}:{self.address}@{self.port}" |
|
|
|
def answer_nameserver(self) -> str: |
|
return self.address |
|
|
|
def answer_port(self) -> int: |
|
return self.port |
|
|
|
|
|
class Do53Nameserver(AddressAndPortNameserver): |
|
def __init__(self, address: str, port: int = 53): |
|
super().__init__(address, port) |
|
|
|
def kind(self): |
|
return "Do53" |
|
|
|
def query( |
|
self, |
|
request: dns.message.QueryMessage, |
|
timeout: float, |
|
source: Optional[str], |
|
source_port: int, |
|
max_size: bool, |
|
one_rr_per_rrset: bool = False, |
|
ignore_trailing: bool = False, |
|
) -> dns.message.Message: |
|
if max_size: |
|
response = dns.query.tcp( |
|
request, |
|
self.address, |
|
timeout=timeout, |
|
port=self.port, |
|
source=source, |
|
source_port=source_port, |
|
one_rr_per_rrset=one_rr_per_rrset, |
|
ignore_trailing=ignore_trailing, |
|
) |
|
else: |
|
response = dns.query.udp( |
|
request, |
|
self.address, |
|
timeout=timeout, |
|
port=self.port, |
|
source=source, |
|
source_port=source_port, |
|
raise_on_truncation=True, |
|
one_rr_per_rrset=one_rr_per_rrset, |
|
ignore_trailing=ignore_trailing, |
|
ignore_errors=True, |
|
ignore_unexpected=True, |
|
) |
|
return response |
|
|
|
async def async_query( |
|
self, |
|
request: dns.message.QueryMessage, |
|
timeout: float, |
|
source: Optional[str], |
|
source_port: int, |
|
max_size: bool, |
|
backend: dns.asyncbackend.Backend, |
|
one_rr_per_rrset: bool = False, |
|
ignore_trailing: bool = False, |
|
) -> dns.message.Message: |
|
if max_size: |
|
response = await dns.asyncquery.tcp( |
|
request, |
|
self.address, |
|
timeout=timeout, |
|
port=self.port, |
|
source=source, |
|
source_port=source_port, |
|
backend=backend, |
|
one_rr_per_rrset=one_rr_per_rrset, |
|
ignore_trailing=ignore_trailing, |
|
) |
|
else: |
|
response = await dns.asyncquery.udp( |
|
request, |
|
self.address, |
|
timeout=timeout, |
|
port=self.port, |
|
source=source, |
|
source_port=source_port, |
|
raise_on_truncation=True, |
|
backend=backend, |
|
one_rr_per_rrset=one_rr_per_rrset, |
|
ignore_trailing=ignore_trailing, |
|
ignore_errors=True, |
|
ignore_unexpected=True, |
|
) |
|
return response |
|
|
|
|
|
class DoHNameserver(Nameserver): |
|
def __init__( |
|
self, |
|
url: str, |
|
bootstrap_address: Optional[str] = None, |
|
verify: Union[bool, str] = True, |
|
want_get: bool = False, |
|
): |
|
super().__init__() |
|
self.url = url |
|
self.bootstrap_address = bootstrap_address |
|
self.verify = verify |
|
self.want_get = want_get |
|
|
|
def kind(self): |
|
return "DoH" |
|
|
|
def is_always_max_size(self) -> bool: |
|
return True |
|
|
|
def __str__(self): |
|
return self.url |
|
|
|
def answer_nameserver(self) -> str: |
|
return self.url |
|
|
|
def answer_port(self) -> int: |
|
port = urlparse(self.url).port |
|
if port is None: |
|
port = 443 |
|
return port |
|
|
|
def query( |
|
self, |
|
request: dns.message.QueryMessage, |
|
timeout: float, |
|
source: Optional[str], |
|
source_port: int, |
|
max_size: bool = False, |
|
one_rr_per_rrset: bool = False, |
|
ignore_trailing: bool = False, |
|
) -> dns.message.Message: |
|
return dns.query.https( |
|
request, |
|
self.url, |
|
timeout=timeout, |
|
source=source, |
|
source_port=source_port, |
|
bootstrap_address=self.bootstrap_address, |
|
one_rr_per_rrset=one_rr_per_rrset, |
|
ignore_trailing=ignore_trailing, |
|
verify=self.verify, |
|
post=(not self.want_get), |
|
) |
|
|
|
async def async_query( |
|
self, |
|
request: dns.message.QueryMessage, |
|
timeout: float, |
|
source: Optional[str], |
|
source_port: int, |
|
max_size: bool, |
|
backend: dns.asyncbackend.Backend, |
|
one_rr_per_rrset: bool = False, |
|
ignore_trailing: bool = False, |
|
) -> dns.message.Message: |
|
return await dns.asyncquery.https( |
|
request, |
|
self.url, |
|
timeout=timeout, |
|
source=source, |
|
source_port=source_port, |
|
bootstrap_address=self.bootstrap_address, |
|
one_rr_per_rrset=one_rr_per_rrset, |
|
ignore_trailing=ignore_trailing, |
|
verify=self.verify, |
|
post=(not self.want_get), |
|
) |
|
|
|
|
|
class DoTNameserver(AddressAndPortNameserver): |
|
def __init__( |
|
self, |
|
address: str, |
|
port: int = 853, |
|
hostname: Optional[str] = None, |
|
verify: Union[bool, str] = True, |
|
): |
|
super().__init__(address, port) |
|
self.hostname = hostname |
|
self.verify = verify |
|
|
|
def kind(self): |
|
return "DoT" |
|
|
|
def query( |
|
self, |
|
request: dns.message.QueryMessage, |
|
timeout: float, |
|
source: Optional[str], |
|
source_port: int, |
|
max_size: bool = False, |
|
one_rr_per_rrset: bool = False, |
|
ignore_trailing: bool = False, |
|
) -> dns.message.Message: |
|
return dns.query.tls( |
|
request, |
|
self.address, |
|
port=self.port, |
|
timeout=timeout, |
|
one_rr_per_rrset=one_rr_per_rrset, |
|
ignore_trailing=ignore_trailing, |
|
server_hostname=self.hostname, |
|
verify=self.verify, |
|
) |
|
|
|
async def async_query( |
|
self, |
|
request: dns.message.QueryMessage, |
|
timeout: float, |
|
source: Optional[str], |
|
source_port: int, |
|
max_size: bool, |
|
backend: dns.asyncbackend.Backend, |
|
one_rr_per_rrset: bool = False, |
|
ignore_trailing: bool = False, |
|
) -> dns.message.Message: |
|
return await dns.asyncquery.tls( |
|
request, |
|
self.address, |
|
port=self.port, |
|
timeout=timeout, |
|
one_rr_per_rrset=one_rr_per_rrset, |
|
ignore_trailing=ignore_trailing, |
|
server_hostname=self.hostname, |
|
verify=self.verify, |
|
) |
|
|
|
|
|
class DoQNameserver(AddressAndPortNameserver): |
|
def __init__( |
|
self, |
|
address: str, |
|
port: int = 853, |
|
verify: Union[bool, str] = True, |
|
server_hostname: Optional[str] = None, |
|
): |
|
super().__init__(address, port) |
|
self.verify = verify |
|
self.server_hostname = server_hostname |
|
|
|
def kind(self): |
|
return "DoQ" |
|
|
|
def query( |
|
self, |
|
request: dns.message.QueryMessage, |
|
timeout: float, |
|
source: Optional[str], |
|
source_port: int, |
|
max_size: bool = False, |
|
one_rr_per_rrset: bool = False, |
|
ignore_trailing: bool = False, |
|
) -> dns.message.Message: |
|
return dns.query.quic( |
|
request, |
|
self.address, |
|
port=self.port, |
|
timeout=timeout, |
|
one_rr_per_rrset=one_rr_per_rrset, |
|
ignore_trailing=ignore_trailing, |
|
verify=self.verify, |
|
server_hostname=self.server_hostname, |
|
) |
|
|
|
async def async_query( |
|
self, |
|
request: dns.message.QueryMessage, |
|
timeout: float, |
|
source: Optional[str], |
|
source_port: int, |
|
max_size: bool, |
|
backend: dns.asyncbackend.Backend, |
|
one_rr_per_rrset: bool = False, |
|
ignore_trailing: bool = False, |
|
) -> dns.message.Message: |
|
return await dns.asyncquery.quic( |
|
request, |
|
self.address, |
|
port=self.port, |
|
timeout=timeout, |
|
one_rr_per_rrset=one_rr_per_rrset, |
|
ignore_trailing=ignore_trailing, |
|
verify=self.verify, |
|
server_hostname=self.server_hostname, |
|
) |
|
|