Spaces:
Sleeping
Sleeping
from __future__ import absolute_import | |
import re | |
from collections import namedtuple | |
from ..exceptions import LocationParseError | |
from ..packages import six | |
url_attrs = ["scheme", "auth", "host", "port", "path", "query", "fragment"] | |
# We only want to normalize urls with an HTTP(S) scheme. | |
# urllib3 infers URLs without a scheme (None) to be http. | |
NORMALIZABLE_SCHEMES = ("http", "https", None) | |
# Almost all of these patterns were derived from the | |
# 'rfc3986' module: https://github.com/python-hyper/rfc3986 | |
PERCENT_RE = re.compile(r"%[a-fA-F0-9]{2}") | |
SCHEME_RE = re.compile(r"^(?:[a-zA-Z][a-zA-Z0-9+-]*:|/)") | |
URI_RE = re.compile( | |
r"^(?:([a-zA-Z][a-zA-Z0-9+.-]*):)?" | |
r"(?://([^\\/?#]*))?" | |
r"([^?#]*)" | |
r"(?:\?([^#]*))?" | |
r"(?:#(.*))?$", | |
re.UNICODE | re.DOTALL, | |
) | |
IPV4_PAT = r"(?:[0-9]{1,3}\.){3}[0-9]{1,3}" | |
HEX_PAT = "[0-9A-Fa-f]{1,4}" | |
LS32_PAT = "(?:{hex}:{hex}|{ipv4})".format(hex=HEX_PAT, ipv4=IPV4_PAT) | |
_subs = {"hex": HEX_PAT, "ls32": LS32_PAT} | |
_variations = [ | |
# 6( h16 ":" ) ls32 | |
"(?:%(hex)s:){6}%(ls32)s", | |
# "::" 5( h16 ":" ) ls32 | |
"::(?:%(hex)s:){5}%(ls32)s", | |
# [ h16 ] "::" 4( h16 ":" ) ls32 | |
"(?:%(hex)s)?::(?:%(hex)s:){4}%(ls32)s", | |
# [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32 | |
"(?:(?:%(hex)s:)?%(hex)s)?::(?:%(hex)s:){3}%(ls32)s", | |
# [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32 | |
"(?:(?:%(hex)s:){0,2}%(hex)s)?::(?:%(hex)s:){2}%(ls32)s", | |
# [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32 | |
"(?:(?:%(hex)s:){0,3}%(hex)s)?::%(hex)s:%(ls32)s", | |
# [ *4( h16 ":" ) h16 ] "::" ls32 | |
"(?:(?:%(hex)s:){0,4}%(hex)s)?::%(ls32)s", | |
# [ *5( h16 ":" ) h16 ] "::" h16 | |
"(?:(?:%(hex)s:){0,5}%(hex)s)?::%(hex)s", | |
# [ *6( h16 ":" ) h16 ] "::" | |
"(?:(?:%(hex)s:){0,6}%(hex)s)?::", | |
] | |
UNRESERVED_PAT = r"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._!\-~" | |
IPV6_PAT = "(?:" + "|".join([x % _subs for x in _variations]) + ")" | |
ZONE_ID_PAT = "(?:%25|%)(?:[" + UNRESERVED_PAT + "]|%[a-fA-F0-9]{2})+" | |
IPV6_ADDRZ_PAT = r"\[" + IPV6_PAT + r"(?:" + ZONE_ID_PAT + r")?\]" | |
REG_NAME_PAT = r"(?:[^\[\]%:/?#]|%[a-fA-F0-9]{2})*" | |
TARGET_RE = re.compile(r"^(/[^?#]*)(?:\?([^#]*))?(?:#.*)?$") | |
IPV4_RE = re.compile("^" + IPV4_PAT + "$") | |
IPV6_RE = re.compile("^" + IPV6_PAT + "$") | |
IPV6_ADDRZ_RE = re.compile("^" + IPV6_ADDRZ_PAT + "$") | |
BRACELESS_IPV6_ADDRZ_RE = re.compile("^" + IPV6_ADDRZ_PAT[2:-2] + "$") | |
ZONE_ID_RE = re.compile("(" + ZONE_ID_PAT + r")\]$") | |
_HOST_PORT_PAT = ("^(%s|%s|%s)(?::([0-9]{0,5}))?$") % ( | |
REG_NAME_PAT, | |
IPV4_PAT, | |
IPV6_ADDRZ_PAT, | |
) | |
_HOST_PORT_RE = re.compile(_HOST_PORT_PAT, re.UNICODE | re.DOTALL) | |
UNRESERVED_CHARS = set( | |
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._-~" | |
) | |
SUB_DELIM_CHARS = set("!$&'()*+,;=") | |
USERINFO_CHARS = UNRESERVED_CHARS | SUB_DELIM_CHARS | {":"} | |
PATH_CHARS = USERINFO_CHARS | {"@", "/"} | |
QUERY_CHARS = FRAGMENT_CHARS = PATH_CHARS | {"?"} | |
class Url(namedtuple("Url", url_attrs)): | |
""" | |
Data structure for representing an HTTP URL. Used as a return value for | |
:func:`parse_url`. Both the scheme and host are normalized as they are | |
both case-insensitive according to RFC 3986. | |
""" | |
__slots__ = () | |
def __new__( | |
cls, | |
scheme=None, | |
auth=None, | |
host=None, | |
port=None, | |
path=None, | |
query=None, | |
fragment=None, | |
): | |
if path and not path.startswith("/"): | |
path = "/" + path | |
if scheme is not None: | |
scheme = scheme.lower() | |
return super(Url, cls).__new__( | |
cls, scheme, auth, host, port, path, query, fragment | |
) | |
def hostname(self): | |
"""For backwards-compatibility with urlparse. We're nice like that.""" | |
return self.host | |
def request_uri(self): | |
"""Absolute path including the query string.""" | |
uri = self.path or "/" | |
if self.query is not None: | |
uri += "?" + self.query | |
return uri | |
def netloc(self): | |
"""Network location including host and port""" | |
if self.port: | |
return "%s:%d" % (self.host, self.port) | |
return self.host | |
def url(self): | |
""" | |
Convert self into a url | |
This function should more or less round-trip with :func:`.parse_url`. The | |
returned url may not be exactly the same as the url inputted to | |
:func:`.parse_url`, but it should be equivalent by the RFC (e.g., urls | |
with a blank port will have : removed). | |
Example: :: | |
>>> U = parse_url('http://google.com/mail/') | |
>>> U.url | |
'http://google.com/mail/' | |
>>> Url('http', 'username:password', 'host.com', 80, | |
... '/path', 'query', 'fragment').url | |
'http://username:password@host.com:80/path?query#fragment' | |
""" | |
scheme, auth, host, port, path, query, fragment = self | |
url = u"" | |
# We use "is not None" we want things to happen with empty strings (or 0 port) | |
if scheme is not None: | |
url += scheme + u"://" | |
if auth is not None: | |
url += auth + u"@" | |
if host is not None: | |
url += host | |
if port is not None: | |
url += u":" + str(port) | |
if path is not None: | |
url += path | |
if query is not None: | |
url += u"?" + query | |
if fragment is not None: | |
url += u"#" + fragment | |
return url | |
def __str__(self): | |
return self.url | |
def split_first(s, delims): | |
""" | |
.. deprecated:: 1.25 | |
Given a string and an iterable of delimiters, split on the first found | |
delimiter. Return two split parts and the matched delimiter. | |
If not found, then the first part is the full input string. | |
Example:: | |
>>> split_first('foo/bar?baz', '?/=') | |
('foo', 'bar?baz', '/') | |
>>> split_first('foo/bar?baz', '123') | |
('foo/bar?baz', '', None) | |
Scales linearly with number of delims. Not ideal for large number of delims. | |
""" | |
min_idx = None | |
min_delim = None | |
for d in delims: | |
idx = s.find(d) | |
if idx < 0: | |
continue | |
if min_idx is None or idx < min_idx: | |
min_idx = idx | |
min_delim = d | |
if min_idx is None or min_idx < 0: | |
return s, "", None | |
return s[:min_idx], s[min_idx + 1 :], min_delim | |
def _encode_invalid_chars(component, allowed_chars, encoding="utf-8"): | |
"""Percent-encodes a URI component without reapplying | |
onto an already percent-encoded component. | |
""" | |
if component is None: | |
return component | |
component = six.ensure_text(component) | |
# Normalize existing percent-encoded bytes. | |
# Try to see if the component we're encoding is already percent-encoded | |
# so we can skip all '%' characters but still encode all others. | |
component, percent_encodings = PERCENT_RE.subn( | |
lambda match: match.group(0).upper(), component | |
) | |
uri_bytes = component.encode("utf-8", "surrogatepass") | |
is_percent_encoded = percent_encodings == uri_bytes.count(b"%") | |
encoded_component = bytearray() | |
for i in range(0, len(uri_bytes)): | |
# Will return a single character bytestring on both Python 2 & 3 | |
byte = uri_bytes[i : i + 1] | |
byte_ord = ord(byte) | |
if (is_percent_encoded and byte == b"%") or ( | |
byte_ord < 128 and byte.decode() in allowed_chars | |
): | |
encoded_component += byte | |
continue | |
encoded_component.extend(b"%" + (hex(byte_ord)[2:].encode().zfill(2).upper())) | |
return encoded_component.decode(encoding) | |
def _remove_path_dot_segments(path): | |
# See http://tools.ietf.org/html/rfc3986#section-5.2.4 for pseudo-code | |
segments = path.split("/") # Turn the path into a list of segments | |
output = [] # Initialize the variable to use to store output | |
for segment in segments: | |
# '.' is the current directory, so ignore it, it is superfluous | |
if segment == ".": | |
continue | |
# Anything other than '..', should be appended to the output | |
elif segment != "..": | |
output.append(segment) | |
# In this case segment == '..', if we can, we should pop the last | |
# element | |
elif output: | |
output.pop() | |
# If the path starts with '/' and the output is empty or the first string | |
# is non-empty | |
if path.startswith("/") and (not output or output[0]): | |
output.insert(0, "") | |
# If the path starts with '/.' or '/..' ensure we add one more empty | |
# string to add a trailing '/' | |
if path.endswith(("/.", "/..")): | |
output.append("") | |
return "/".join(output) | |
def _normalize_host(host, scheme): | |
if host: | |
if isinstance(host, six.binary_type): | |
host = six.ensure_str(host) | |
if scheme in NORMALIZABLE_SCHEMES: | |
is_ipv6 = IPV6_ADDRZ_RE.match(host) | |
if is_ipv6: | |
# IPv6 hosts of the form 'a::b%zone' are encoded in a URL as | |
# such per RFC 6874: 'a::b%25zone'. Unquote the ZoneID | |
# separator as necessary to return a valid RFC 4007 scoped IP. | |
match = ZONE_ID_RE.search(host) | |
if match: | |
start, end = match.span(1) | |
zone_id = host[start:end] | |
if zone_id.startswith("%25") and zone_id != "%25": | |
zone_id = zone_id[3:] | |
else: | |
zone_id = zone_id[1:] | |
zone_id = "%" + _encode_invalid_chars(zone_id, UNRESERVED_CHARS) | |
return host[:start].lower() + zone_id + host[end:] | |
else: | |
return host.lower() | |
elif not IPV4_RE.match(host): | |
return six.ensure_str( | |
b".".join([_idna_encode(label) for label in host.split(".")]) | |
) | |
return host | |
def _idna_encode(name): | |
if name and any([ord(x) > 128 for x in name]): | |
try: | |
from pip._vendor import idna | |
except ImportError: | |
six.raise_from( | |
LocationParseError("Unable to parse URL without the 'idna' module"), | |
None, | |
) | |
try: | |
return idna.encode(name.lower(), strict=True, std3_rules=True) | |
except idna.IDNAError: | |
six.raise_from( | |
LocationParseError(u"Name '%s' is not a valid IDNA label" % name), None | |
) | |
return name.lower().encode("ascii") | |
def _encode_target(target): | |
"""Percent-encodes a request target so that there are no invalid characters""" | |
path, query = TARGET_RE.match(target).groups() | |
target = _encode_invalid_chars(path, PATH_CHARS) | |
query = _encode_invalid_chars(query, QUERY_CHARS) | |
if query is not None: | |
target += "?" + query | |
return target | |
def parse_url(url): | |
""" | |
Given a url, return a parsed :class:`.Url` namedtuple. Best-effort is | |
performed to parse incomplete urls. Fields not provided will be None. | |
This parser is RFC 3986 and RFC 6874 compliant. | |
The parser logic and helper functions are based heavily on | |
work done in the ``rfc3986`` module. | |
:param str url: URL to parse into a :class:`.Url` namedtuple. | |
Partly backwards-compatible with :mod:`urlparse`. | |
Example:: | |
>>> parse_url('http://google.com/mail/') | |
Url(scheme='http', host='google.com', port=None, path='/mail/', ...) | |
>>> parse_url('google.com:80') | |
Url(scheme=None, host='google.com', port=80, path=None, ...) | |
>>> parse_url('/foo?bar') | |
Url(scheme=None, host=None, port=None, path='/foo', query='bar', ...) | |
""" | |
if not url: | |
# Empty | |
return Url() | |
source_url = url | |
if not SCHEME_RE.search(url): | |
url = "//" + url | |
try: | |
scheme, authority, path, query, fragment = URI_RE.match(url).groups() | |
normalize_uri = scheme is None or scheme.lower() in NORMALIZABLE_SCHEMES | |
if scheme: | |
scheme = scheme.lower() | |
if authority: | |
auth, _, host_port = authority.rpartition("@") | |
auth = auth or None | |
host, port = _HOST_PORT_RE.match(host_port).groups() | |
if auth and normalize_uri: | |
auth = _encode_invalid_chars(auth, USERINFO_CHARS) | |
if port == "": | |
port = None | |
else: | |
auth, host, port = None, None, None | |
if port is not None: | |
port = int(port) | |
if not (0 <= port <= 65535): | |
raise LocationParseError(url) | |
host = _normalize_host(host, scheme) | |
if normalize_uri and path: | |
path = _remove_path_dot_segments(path) | |
path = _encode_invalid_chars(path, PATH_CHARS) | |
if normalize_uri and query: | |
query = _encode_invalid_chars(query, QUERY_CHARS) | |
if normalize_uri and fragment: | |
fragment = _encode_invalid_chars(fragment, FRAGMENT_CHARS) | |
except (ValueError, AttributeError): | |
return six.raise_from(LocationParseError(source_url), None) | |
# For the sake of backwards compatibility we put empty | |
# string values for path if there are any defined values | |
# beyond the path in the URL. | |
# TODO: Remove this when we break backwards compatibility. | |
if not path: | |
if query is not None or fragment is not None: | |
path = "" | |
else: | |
path = None | |
# Ensure that each part of the URL is a `str` for | |
# backwards compatibility. | |
if isinstance(url, six.text_type): | |
ensure_func = six.ensure_text | |
else: | |
ensure_func = six.ensure_str | |
def ensure_type(x): | |
return x if x is None else ensure_func(x) | |
return Url( | |
scheme=ensure_type(scheme), | |
auth=ensure_type(auth), | |
host=ensure_type(host), | |
port=port, | |
path=ensure_type(path), | |
query=ensure_type(query), | |
fragment=ensure_type(fragment), | |
) | |
def get_host(url): | |
""" | |
Deprecated. Use :func:`parse_url` instead. | |
""" | |
p = parse_url(url) | |
return p.scheme or "http", p.hostname, p.port | |