|
"""upload_docs |
|
|
|
Implements a Distutils 'upload_docs' subcommand (upload documentation to |
|
sites other than PyPi such as devpi). |
|
""" |
|
|
|
from base64 import standard_b64encode |
|
from distutils import log |
|
from distutils.errors import DistutilsOptionError |
|
import os |
|
import socket |
|
import zipfile |
|
import tempfile |
|
import shutil |
|
import itertools |
|
import functools |
|
import http.client |
|
import urllib.parse |
|
|
|
from .._importlib import metadata |
|
from ..warnings import SetuptoolsDeprecationWarning |
|
|
|
from .upload import upload |
|
|
|
|
|
def _encode(s): |
|
return s.encode('utf-8', 'surrogateescape') |
|
|
|
|
|
class upload_docs(upload): |
|
|
|
|
|
DEFAULT_REPOSITORY = 'https://pypi.python.org/pypi/' |
|
|
|
description = 'Upload documentation to sites other than PyPi such as devpi' |
|
|
|
user_options = [ |
|
('repository=', 'r', |
|
"url of repository [default: %s]" % upload.DEFAULT_REPOSITORY), |
|
('show-response', None, |
|
'display full response text from server'), |
|
('upload-dir=', None, 'directory to upload'), |
|
] |
|
boolean_options = upload.boolean_options |
|
|
|
def has_sphinx(self): |
|
return bool( |
|
self.upload_dir is None |
|
and metadata.entry_points(group='distutils.commands', name='build_sphinx') |
|
) |
|
|
|
sub_commands = [('build_sphinx', has_sphinx)] |
|
|
|
def initialize_options(self): |
|
upload.initialize_options(self) |
|
self.upload_dir = None |
|
self.target_dir = None |
|
|
|
def finalize_options(self): |
|
log.warn( |
|
"Upload_docs command is deprecated. Use Read the Docs " |
|
"(https://readthedocs.org) instead.") |
|
upload.finalize_options(self) |
|
if self.upload_dir is None: |
|
if self.has_sphinx(): |
|
build_sphinx = self.get_finalized_command('build_sphinx') |
|
self.target_dir = dict(build_sphinx.builder_target_dirs)['html'] |
|
else: |
|
build = self.get_finalized_command('build') |
|
self.target_dir = os.path.join(build.build_base, 'docs') |
|
else: |
|
self.ensure_dirname('upload_dir') |
|
self.target_dir = self.upload_dir |
|
self.announce('Using upload directory %s' % self.target_dir) |
|
|
|
def create_zipfile(self, filename): |
|
zip_file = zipfile.ZipFile(filename, "w") |
|
try: |
|
self.mkpath(self.target_dir) |
|
for root, dirs, files in os.walk(self.target_dir): |
|
if root == self.target_dir and not files: |
|
tmpl = "no files found in upload directory '%s'" |
|
raise DistutilsOptionError(tmpl % self.target_dir) |
|
for name in files: |
|
full = os.path.join(root, name) |
|
relative = root[len(self.target_dir):].lstrip(os.path.sep) |
|
dest = os.path.join(relative, name) |
|
zip_file.write(full, dest) |
|
finally: |
|
zip_file.close() |
|
|
|
def run(self): |
|
SetuptoolsDeprecationWarning.emit( |
|
"Deprecated command", |
|
""" |
|
upload_docs is deprecated and will be removed in a future version. |
|
Instead, use tools like devpi and Read the Docs; or lower level tools like |
|
httpie and curl to interact directly with your hosting service API. |
|
""", |
|
due_date=(2023, 9, 26), |
|
) |
|
|
|
|
|
for cmd_name in self.get_sub_commands(): |
|
self.run_command(cmd_name) |
|
|
|
tmp_dir = tempfile.mkdtemp() |
|
name = self.distribution.metadata.get_name() |
|
zip_file = os.path.join(tmp_dir, "%s.zip" % name) |
|
try: |
|
self.create_zipfile(zip_file) |
|
self.upload_file(zip_file) |
|
finally: |
|
shutil.rmtree(tmp_dir) |
|
|
|
@staticmethod |
|
def _build_part(item, sep_boundary): |
|
key, values = item |
|
title = '\nContent-Disposition: form-data; name="%s"' % key |
|
|
|
if not isinstance(values, list): |
|
values = [values] |
|
for value in values: |
|
if isinstance(value, tuple): |
|
title += '; filename="%s"' % value[0] |
|
value = value[1] |
|
else: |
|
value = _encode(value) |
|
yield sep_boundary |
|
yield _encode(title) |
|
yield b"\n\n" |
|
yield value |
|
if value and value[-1:] == b'\r': |
|
yield b'\n' |
|
|
|
@classmethod |
|
def _build_multipart(cls, data): |
|
""" |
|
Build up the MIME payload for the POST data |
|
""" |
|
boundary = '--------------GHSKFJDLGDS7543FJKLFHRE75642756743254' |
|
sep_boundary = b'\n--' + boundary.encode('ascii') |
|
end_boundary = sep_boundary + b'--' |
|
end_items = end_boundary, b"\n", |
|
builder = functools.partial( |
|
cls._build_part, |
|
sep_boundary=sep_boundary, |
|
) |
|
part_groups = map(builder, data.items()) |
|
parts = itertools.chain.from_iterable(part_groups) |
|
body_items = itertools.chain(parts, end_items) |
|
content_type = 'multipart/form-data; boundary=%s' % boundary |
|
return b''.join(body_items), content_type |
|
|
|
def upload_file(self, filename): |
|
with open(filename, 'rb') as f: |
|
content = f.read() |
|
meta = self.distribution.metadata |
|
data = { |
|
':action': 'doc_upload', |
|
'name': meta.get_name(), |
|
'content': (os.path.basename(filename), content), |
|
} |
|
|
|
credentials = _encode(self.username + ':' + self.password) |
|
credentials = standard_b64encode(credentials).decode('ascii') |
|
auth = "Basic " + credentials |
|
|
|
body, ct = self._build_multipart(data) |
|
|
|
msg = "Submitting documentation to %s" % (self.repository) |
|
self.announce(msg, log.INFO) |
|
|
|
|
|
|
|
|
|
schema, netloc, url, params, query, fragments = \ |
|
urllib.parse.urlparse(self.repository) |
|
assert not params and not query and not fragments |
|
if schema == 'http': |
|
conn = http.client.HTTPConnection(netloc) |
|
elif schema == 'https': |
|
conn = http.client.HTTPSConnection(netloc) |
|
else: |
|
raise AssertionError("unsupported schema " + schema) |
|
|
|
data = '' |
|
try: |
|
conn.connect() |
|
conn.putrequest("POST", url) |
|
content_type = ct |
|
conn.putheader('Content-type', content_type) |
|
conn.putheader('Content-length', str(len(body))) |
|
conn.putheader('Authorization', auth) |
|
conn.endheaders() |
|
conn.send(body) |
|
except socket.error as e: |
|
self.announce(str(e), log.ERROR) |
|
return |
|
|
|
r = conn.getresponse() |
|
if r.status == 200: |
|
msg = 'Server response (%s): %s' % (r.status, r.reason) |
|
self.announce(msg, log.INFO) |
|
elif r.status == 301: |
|
location = r.getheader('Location') |
|
if location is None: |
|
location = 'https://pythonhosted.org/%s/' % meta.get_name() |
|
msg = 'Upload successful. Visit %s' % location |
|
self.announce(msg, log.INFO) |
|
else: |
|
msg = 'Upload failed (%s): %s' % (r.status, r.reason) |
|
self.announce(msg, log.ERROR) |
|
if self.show_response: |
|
print('-' * 75, r.read(), '-' * 75) |
|
|