File size: 5,694 Bytes
ddb7519 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
#!/usr/bin/env python3
"""Compress images in a folder to a maximum megapixel size."""
import argparse
import asyncio
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from glob import iglob
from multiprocessing import cpu_count
from queue import Queue
from PIL import Image, ImageFile, ImageOps
# Prevent errors from halting the script.
ImageFile.LOAD_TRUNCATED_IMAGES = True
Image.warnings.simplefilter("error", Image.DecompressionBombWarning)
VERSION = "2.0"
SHORT_DESCRIPTION = "Compress images in a directory."
SUPPORTED_EXTENSIONS = [".jpg", ".jpeg", ".png", ".webp"]
def get_args(**parser_kwargs):
"""Get command-line options."""
parser = argparse.ArgumentParser(**parser_kwargs)
parser.add_argument(
"--img_dir",
type=str,
default="input",
help="path to image directory (default: 'input')",
)
parser.add_argument(
"--out_dir",
type=str,
default=None,
help="path to output directory (default: IMG_DIR)",
)
parser.add_argument(
"--max_mp",
type=float,
default=1.5,
help="maximum megapixels (default: 1.5)",
)
parser.add_argument(
"--quality",
type=int,
default=95,
help="save quality (default: 95, range: 0-100, suggested: 90+)",
)
parser.add_argument(
"--overwrite",
action="store_true",
default=False,
help="overwrite files in output directory",
)
parser.add_argument(
"--noresize",
action="store_true",
default=False,
help="do not resize, just fix orientation",
)
parser.add_argument(
"--delete",
action="store_true",
default=False,
help="delete original files after processing",
)
args = parser.parse_args()
args.out_dir = args.out_dir or args.img_dir
args.max_mp = args.max_mp * 1024000
return args
def images(img_dir):
"""Return each image in the input directory."""
for file in iglob(f"{img_dir}/*.*"):
if file.lower().endswith(tuple(SUPPORTED_EXTENSIONS)):
yield file
def inline(msg, newline=False):
"""Print a message on the same line."""
msg = f"\r{msg}"
msg += " " * (79 - len(msg))
print(msg, end="\n" if newline else "", flush=True)
def launch_workers(queue, args):
"""Launch a pool of workers."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [loop.create_task(worker(queue, args)) for _ in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
async def open_img(path):
"""Open an image."""
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(None, Image.open, path)
except Exception as err:
inline(f"[!] Error Opening: {path} - {err}", True)
return None
def oversize(img, max_mp):
"""Check if an image is larger than the maximum size."""
return (img.width * img.height) > max_mp
async def process(image, args):
"""Process an image."""
outfile = image.replace(args.img_dir, args.out_dir).replace(
os.path.splitext(image)[1], ".webp"
)
if args.overwrite or not os.path.exists(outfile):
img = await open_img(image)
if img:
newimg = transpose(img)
if not args.noresize and oversize(newimg, args.max_mp):
newimg = shrink(newimg, args)
if newimg != img:
await save_img(newimg, outfile, args)
if args.delete and outfile != image:
os.remove(image)
def slow_save(path, args, img):
"""Save an image."""
try:
img.save(path, "webp", quality=args.quality)
inline(f"[+] Compressed: {path}")
except Exception as err:
inline(f"[!] Error Saving: {path} - {err}", True)
async def save_img(img, path, args):
"""Save an image."""
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, slow_save, path, args, img)
def scan_path(queue, args):
"""Scan the input directory for images."""
inline("[*] Scanning for images...", True)
for image in images(args.img_dir):
inline(f"[+] {image}")
queue.put(image)
def shrink(img, args):
"""Shrink an image."""
hw = img.size
ratio = args.max_mp / (hw[0]*hw[1])
newhw = (int(hw[0]*ratio**0.5), int(hw[1]*ratio**0.5))
try:
return img.resize(newhw, Image.BICUBIC)
except Exception as err:
inline(f"[!] Error Shrinking: {img.filename} - {err}", True)
return img
def start_compression(queue, args):
"""Start the compression process."""
inline("[*] Compressing images...", True)
inline("[-] (scanning...)")
with ThreadPoolExecutor() as executor:
workers = {
executor.submit(launch_workers, queue, args): None
for _ in range(cpu_count())
}
for _ in as_completed(workers):
pass
inline("[!] Done!", True)
def transpose(img):
"""Transpose an image."""
try:
return ImageOps.exif_transpose(img)
except Exception as err:
inline(f"[!] Error Transposing: {img.filename} - {err}", True)
return img
async def worker(queue, args):
"""Handle images from the queue until they're gone."""
while not queue.empty():
image = queue.get()
await process(image, args)
def main():
"""Run the program."""
queue = Queue()
args = get_args(description=SHORT_DESCRIPTION)
inline(f"[>] Image Compression Utility v{VERSION}", True)
scan_path(queue, args)
start_compression(queue, args)
if __name__ == "__main__":
main()
|