Spaces:
Sleeping
Sleeping
File size: 5,148 Bytes
960cd20 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
import logging
import os
import hashlib
import tarfile
import urllib.request
import zipfile
from tqdm import tqdm
from pathlib import Path
from logger import logger
from py7zr import SevenZipFile
class TqdmUpTo(tqdm):
def update_to(self, b=1, bsize=1, tsize=None):
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n)
def _download_file(url, dest_path):
logging.info(f"Downloading: {url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
}
if os.path.exists(dest_path):
file_size = os.path.getsize(dest_path)
headers['Range'] = f'bytes={file_size}-'
request = urllib.request.Request(url, headers=headers)
response = urllib.request.urlopen(request)
if response.geturl() != url:
return _download_file(response.geturl(), dest_path)
total_size = int(response.headers['Content-Length'])
with open(dest_path, 'ab') as file, tqdm(total=total_size, unit='B', unit_scale=True, unit_divisor=1024, miniters=1,
desc=url.split('/')[-1]) as t:
chunk_size = 1024 * 1024 # 1MB
while True:
chunk = response.read(chunk_size)
if not chunk:
break
file.write(chunk)
t.update(len(chunk))
def verify_md5(file_path, expected_md5):
md5 = hashlib.md5(file_path.read_bytes()).hexdigest()
if md5 != expected_md5:
return False, f"MD5 mismatch: {md5} != {expected_md5}"
return True, ""
def verify_sha256(file_path, expected_sha256):
sha256 = hashlib.sha256(file_path.read_bytes()).hexdigest()
if sha256 != expected_sha256:
return False, f"SHA256 mismatch: {sha256} != {expected_sha256}"
return True, ""
def extract_file(file_path, destination=None):
"""
Extract a compressed file based on its extension.
If destination is not specified, it will be extracted to its parent directory.
"""
if destination is None:
destination = Path(file_path).parent
logging.info(f"Extracting to {destination}")
if file_path.endswith('.zip'):
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(destination)
elif file_path.endswith('.tar.gz'):
with tarfile.open(file_path, 'r:gz') as tar_ref:
tar_ref.extractall(destination)
elif file_path.endswith('.tar.bz2'):
with tarfile.open(file_path, 'r:bz2') as tar_ref:
tar_ref.extractall(destination)
elif file_path.endswith('.7z'):
with SevenZipFile(file_path, mode='r') as z:
z.extractall(destination)
else:
logging.error(f"Unsupported compression format for file {file_path}")
def download_file(urls, target_path, extract_destination=None, expected_md5=None, expected_sha256=None):
if os.path.exists(target_path):
if expected_md5 is not None:
success, message = verify_md5(Path(target_path), expected_md5)
if not success:
os.remove(target_path)
return False, message
if expected_sha256 is not None:
success, message = verify_sha256(Path(target_path), expected_sha256)
if not success:
os.remove(target_path)
return False, message
# If it's a compressed file and the target_path already exists, skip the download
if extract_destination and target_path.endswith(('.zip', '.tar.gz', '.tar.bz2', '.7z')):
extract_file(target_path, extract_destination)
os.remove(target_path)
return True, "File already exists and verified successfully!"
is_download = False
for url in urls:
try:
_download_file(url, target_path)
is_download = True
break
except Exception as error:
logger.error(f"downloading from URL {url}: {error}")
if not is_download:
return False, "Error downloading from all provided URLs."
if expected_md5 is not None:
success, message = verify_md5(Path(target_path), expected_md5)
if not success:
os.remove(target_path)
return False, message
if expected_sha256 is not None:
success, message = verify_sha256(Path(target_path), expected_sha256)
if not success:
os.remove(target_path)
return False, message
# If it's a compressed file, extract it
if target_path.endswith(('.zip', '.tar.gz', '.tar.bz2', '.7z')):
extract_file(target_path, extract_destination)
os.remove(target_path)
return True, "File downloaded, verified, and extracted successfully!"
if __name__ == "__main__":
URLS = [
"YOUR_PRIMARY_URL_HERE",
"YOUR_FIRST_BACKUP_URL_HERE",
# ... you can add more backup URLs as needed
]
TARGET_PATH = ""
EXPECTED_MD5 = ""
EXTRACT_DESTINATION = ""
success, message = download_file(URLS, TARGET_PATH, EXPECTED_MD5, EXTRACT_DESTINATION)
print(message)
|