Spaces:
Runtime error
Runtime error
#!/usr/bin/env python | |
import os | |
import io | |
import pathlib | |
import argparse | |
import filetype | |
import numpy as np | |
from imwatermark import WatermarkEncoder, WatermarkDecoder | |
from PIL import Image | |
from PIL.ExifTags import TAGS | |
from PIL.TiffImagePlugin import ImageFileDirectory_v2 | |
from util import log, Map | |
import piexif | |
import piexif.helper | |
options = Map({ 'method': 'dwtDctSvd', 'type': 'bytes' }) | |
def get_exif(image): | |
# using piexif | |
res1 = {} | |
try: | |
exif = piexif.load(image.info["exif"]) | |
exif = exif.get("Exif", {}) | |
for k, v in exif.items(): | |
key = list(vars(piexif.ExifIFD).keys())[list(vars(piexif.ExifIFD).values()).index(k)] | |
res1[key] = piexif.helper.UserComment.load(v) | |
except Exception: | |
pass | |
# using pillow | |
res2 = {} | |
try: | |
res2 = { TAGS[k]: v for k, v in image.getexif().items() if k in TAGS } | |
except Exception: | |
pass | |
return {**res1, **res2} | |
def set_exif(d: dict): | |
ifd = ImageFileDirectory_v2() | |
_TAGS = {v: k for k, v in TAGS.items()} # enumerate possible exif tags | |
for k, v in d.items(): | |
ifd[_TAGS[k]] = v | |
exif_stream = io.BytesIO() | |
ifd.save(exif_stream) | |
encoded = b'Exif\x00\x00' + exif_stream.getvalue() | |
return encoded | |
def get_watermark(image, params): | |
data = np.asarray(image) | |
decoder = WatermarkDecoder(options.type, params.length) | |
decoded = decoder.decode(data, options.method) | |
wm = decoded.decode(encoding='ascii', errors='ignore') | |
return wm | |
def set_watermark(image, params): | |
data = np.asarray(image) | |
encoder = WatermarkEncoder() | |
length = params.length // 8 | |
text = f"{params.wm:<{length}}"[:length] | |
bytearr = text.encode(encoding='ascii', errors='ignore') | |
encoder.set_watermark(options.type, bytearr) | |
encoded = encoder.encode(data, options.method) | |
image = Image.fromarray(encoded) | |
return image | |
def watermark(params, file): | |
if not os.path.exists(file): | |
log.error({ 'watermark': 'file not found' }) | |
return | |
if not filetype.is_image(file): | |
log.error({ 'watermark': 'file is not an image' }) | |
return | |
image = Image.open(file) | |
if image.width * image.height < 256 * 256: | |
log.error({ 'watermark': 'image too small' }) | |
return | |
exif = get_exif(image) | |
if params.command == 'read': | |
fn = params.input | |
wm = get_watermark(image, params) | |
elif params.command == 'write': | |
metadata = b'' if params.strip else set_exif(exif) | |
if params.output != '': | |
pathlib.Path(params.output).mkdir(parents = True, exist_ok = True) | |
image=set_watermark(image, params) | |
fn = os.path.join(params.output, file) | |
image.save(fn, exif=metadata) | |
if params.verify: | |
image = Image.open(fn) | |
data = np.asarray(image) | |
decoder = WatermarkDecoder(options.type, params.length) | |
decoded = decoder.decode(data, options.method) | |
wm = decoded.decode(encoding='ascii', errors='ignore') | |
else: | |
wm = params.wm | |
log.info({ 'file': fn }) | |
log.info({ 'resolution': f'{image.width}x{image.height}' }) | |
log.info({ 'watermark': wm }) | |
log.info({ 'exif': None if params.strip else exif }) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description = 'image watermarking') | |
parser.add_argument('command', choices = ['read', 'write']) | |
parser.add_argument('--wm', type=str, required=False, default='sdnext', help='watermark string') | |
parser.add_argument('--strip', default=False, action='store_true', help = "strip existing exif data") | |
parser.add_argument('--verify', default=False, action='store_true', help = "verify watermark during write") | |
parser.add_argument('--length', type=int, default=32, help="watermark length in bits") | |
parser.add_argument('--output', type=str, required=False, default='', help='folder to store images, default is overwrite in-place') | |
parser.add_argument('input', type=str, nargs='*') | |
args = parser.parse_args() | |
# log.info({ 'watermark args': vars(args), 'options': options }) | |
for arg in args.input: | |
if os.path.isfile(arg): | |
watermark(args, arg) | |
elif os.path.isdir(arg): | |
for root, _dirs, files in os.walk(arg): | |
for f in files: | |
watermark(args, os.path.join(root, f)) | |