test / cli /image-watermark.py
bilegentile's picture
Upload folder using huggingface_hub
c19ca42 verified
#!/usr/bin/env python
import os
import io
import pathlib
import argparse
import filetype
import numpy as np
from imwatermark import WatermarkEncoder, WatermarkDecoder
from PIL import Image
from PIL.ExifTags import TAGS
from PIL.TiffImagePlugin import ImageFileDirectory_v2
from util import log, Map
import piexif
import piexif.helper
options = Map({ 'method': 'dwtDctSvd', 'type': 'bytes' })
def get_exif(image):
# using piexif
res1 = {}
try:
exif = piexif.load(image.info["exif"])
exif = exif.get("Exif", {})
for k, v in exif.items():
key = list(vars(piexif.ExifIFD).keys())[list(vars(piexif.ExifIFD).values()).index(k)]
res1[key] = piexif.helper.UserComment.load(v)
except Exception:
pass
# using pillow
res2 = {}
try:
res2 = { TAGS[k]: v for k, v in image.getexif().items() if k in TAGS }
except Exception:
pass
return {**res1, **res2}
def set_exif(d: dict):
ifd = ImageFileDirectory_v2()
_TAGS = {v: k for k, v in TAGS.items()} # enumerate possible exif tags
for k, v in d.items():
ifd[_TAGS[k]] = v
exif_stream = io.BytesIO()
ifd.save(exif_stream)
encoded = b'Exif\x00\x00' + exif_stream.getvalue()
return encoded
def get_watermark(image, params):
data = np.asarray(image)
decoder = WatermarkDecoder(options.type, params.length)
decoded = decoder.decode(data, options.method)
wm = decoded.decode(encoding='ascii', errors='ignore')
return wm
def set_watermark(image, params):
data = np.asarray(image)
encoder = WatermarkEncoder()
length = params.length // 8
text = f"{params.wm:<{length}}"[:length]
bytearr = text.encode(encoding='ascii', errors='ignore')
encoder.set_watermark(options.type, bytearr)
encoded = encoder.encode(data, options.method)
image = Image.fromarray(encoded)
return image
def watermark(params, file):
if not os.path.exists(file):
log.error({ 'watermark': 'file not found' })
return
if not filetype.is_image(file):
log.error({ 'watermark': 'file is not an image' })
return
image = Image.open(file)
if image.width * image.height < 256 * 256:
log.error({ 'watermark': 'image too small' })
return
exif = get_exif(image)
if params.command == 'read':
fn = params.input
wm = get_watermark(image, params)
elif params.command == 'write':
metadata = b'' if params.strip else set_exif(exif)
if params.output != '':
pathlib.Path(params.output).mkdir(parents = True, exist_ok = True)
image=set_watermark(image, params)
fn = os.path.join(params.output, file)
image.save(fn, exif=metadata)
if params.verify:
image = Image.open(fn)
data = np.asarray(image)
decoder = WatermarkDecoder(options.type, params.length)
decoded = decoder.decode(data, options.method)
wm = decoded.decode(encoding='ascii', errors='ignore')
else:
wm = params.wm
log.info({ 'file': fn })
log.info({ 'resolution': f'{image.width}x{image.height}' })
log.info({ 'watermark': wm })
log.info({ 'exif': None if params.strip else exif })
if __name__ == '__main__':
parser = argparse.ArgumentParser(description = 'image watermarking')
parser.add_argument('command', choices = ['read', 'write'])
parser.add_argument('--wm', type=str, required=False, default='sdnext', help='watermark string')
parser.add_argument('--strip', default=False, action='store_true', help = "strip existing exif data")
parser.add_argument('--verify', default=False, action='store_true', help = "verify watermark during write")
parser.add_argument('--length', type=int, default=32, help="watermark length in bits")
parser.add_argument('--output', type=str, required=False, default='', help='folder to store images, default is overwrite in-place')
parser.add_argument('input', type=str, nargs='*')
args = parser.parse_args()
# log.info({ 'watermark args': vars(args), 'options': options })
for arg in args.input:
if os.path.isfile(arg):
watermark(args, arg)
elif os.path.isdir(arg):
for root, _dirs, files in os.walk(arg):
for f in files:
watermark(args, os.path.join(root, f))