Vivien Chappelier
use new proxy model
b2cfd5f
raw
history blame
9.26 kB
import gradio as gr
import os
import torch
import numpy as np
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
from transformers import AutoModelForImageClassification, BlipImageProcessor
from diffusers import DiffusionPipeline, AutoencoderKL
import torchvision.transforms as transforms
from huggingface_hub import hf_hub_download
from safetensors import safe_open
from copy import deepcopy
from collections import OrderedDict
import requests
import json
from PIL import Image, ImageEnhance
import base64
import io
import random
import math
class BZHStableSignatureDemo(object):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pipe = DiffusionPipeline.from_pretrained("stabilityai/sdxl-turbo", torch_dtype=torch.float16, variant="fp16").to("cuda")
# disable invisible-watermark
self.pipe.watermark = None
# save the original VAE
decoders = OrderedDict([("no watermark", self.pipe.vae)])
# load the patched VAEs
for name in ("weak", "medium", "strong", "extreme"):
vae = AutoencoderKL.from_pretrained(f"imatag/stable-signature-bzh-sdxl-vae-{name}", torch_dtype=torch.float16).to("cuda")
decoders[name] = vae
self.decoders = decoders
# load the proxy detector
self.detector_image_processor = BlipImageProcessor.from_pretrained("imatag/stable-signature-bzh-detector-resnet18")
self.detector_model = AutoModelForImageClassification.from_pretrained("imatag/stable-signature-bzh-detector-resnet18")
calibration = hf_hub_download("imatag/stable-signature-bzh-detector-resnet18", filename="calibration.safetensors")
with safe_open(calibration, framework="pt") as f:
self.calibration_logits = f.get_tensor("logits")
def generate(self, mode, seed, prompt):
generator = torch.Generator(device=device)
torch.manual_seed(seed)
# load the patched VAE
vae = self.decoders[mode]
self.pipe.vae = vae
output = self.pipe(prompt, num_inference_steps=4, guidance_scale=0.0, output_type="pil")
return output.images[0]
def attack(self, img, jpeg_compression, downscale, crop, saturation, brightness, contrast):
img = img.convert("RGB")
# attack
if downscale != 1:
size = img.size
size = (int(size[0] / downscale), int(size[1] / downscale))
img = img.resize(size, Image.Resampling.LANCZOS)
if crop != 0:
width, height = img.size
area = width * height
log_rmin = math.log(0.5)
log_rmax = math.log(2.0)
for _ in range(10):
target_area = area * (1 - crop)
aspect_ratio = math.exp(random.random() * (log_rmax - log_rmin) + log_rmin)
w = int(round(math.sqrt(target_area * aspect_ratio)))
h = int(round(math.sqrt(target_area / aspect_ratio)))
if 0 < w <= width and 0 < h <= height:
top = random.randint(0, height - h + 1)
left = random.randint(0, width - w + 1)
img = img.crop((left, top, left+w, top+h))
break
converter = ImageEnhance.Color(img)
img = converter.enhance(saturation)
converter = ImageEnhance.Brightness(img)
img = converter.enhance(brightness)
converter = ImageEnhance.Contrast(img)
img = converter.enhance(contrast)
# JPEG attack
mf = io.BytesIO()
img.save(mf, format='JPEG', quality=jpeg_compression)
filesize = mf.tell()
mf.seek(0)
img = Image.open(mf)
image_info = "resolution: %dx%d" % img.size
image_info += " JPEG file size: %d" % filesize
return img, image_info
def detect_api(self, img):
# send to detection API and apply JPEG compression attack
mf = io.BytesIO()
img.save(mf, format='PNG')
b64 = base64.b64encode(mf.getvalue())
data = {
'image': b64.decode('utf8')
}
headers = {}
api_key = os.getenv('BZH_API_KEY')
if api_key:
headers['x-api-key'] = api_key
response = requests.post('https://bzh.imatag.com/bzh/api/v1.0/detect',
json=data, headers=headers)
response.raise_for_status()
data = response.json()
pvalue = data['p-value']
return pvalue
def detect_proxy(self, img):
img = img.convert("RGB")
inputs = self.detector_image_processor(img, return_tensors="pt")
with torch.no_grad():
logit = self.detector_model(**inputs).logits[...,0]
pvalue = (1 + torch.searchsorted(self.calibration_logits, logit)) / self.calibration_logits.shape[0]
pvalue = pvalue.item()
return pvalue
def detect(self, img, detection_method):
if detection_method == "API":
pvalue = self.detect_api(img)
else:
pvalue = self.detect_proxy(img)
result = "No watermark detected."
rpv = 10**int(math.log10(pvalue))
if pvalue < 1e-3:
result = "Watermark detected with low confidence" # (p-value<%.0e)" % rpv
if pvalue < 1e-6:
result = "Watermark detected with high confidence" # (p-value<%.0e)" % rpv
score = min(int(-math.log10(pvalue)), 10)
#print("score = ", score)
return { result: score/10 }
def interface():
prompt = "sailing ship in storm by Rembrandt"
backend = BZHStableSignatureDemo()
decoders = list(backend.decoders.keys())
with gr.Blocks() as demo:
gr.Markdown("""# Watermarked SDXL-Turbo demo
This demo brought to you by [IMATAG](https://www.imatag.com/) presents watermarking of images generated via [StableDiffusion XL Turbo](https://huggingface.co/stabilityai/sdxl-turbo).
Using the method presented in [StableSignature](https://ai.meta.com/blog/stable-signature-watermarking-generative-ai/),
the VAE decoder of StableDiffusion is fine-tuned to produce images including a specific invisible watermark. We combined
this method with a demo version of [IMATAG](https://www.imatag.com/)'s in-house decoder. The watermarking system operates in zero-bit mode for improved robustness.""")
gr.Markdown("""## 1. Generate
Select a watermarking strength and generate images with StableDiffusion-XL Turbo from prompt and seed as usual.""")
with gr.Row():
inp = gr.Textbox(label="Prompt", value=prompt)
seed = gr.Number(label="Seed", precision=0)
mode = gr.Dropdown(choices=decoders, label="Watermark strength", value="medium")
with gr.Row():
btn1 = gr.Button("Generate")
with gr.Row():
watermarked_image = gr.Image(type="pil", width=512, height=512, sources=[], interactive=False)
gr.Markdown("""## 2. Edit
With these controls you may alter the generated image before detection. You may also upload your own edited image instead.""")
with gr.Row():
with gr.Column():
with gr.Row():
downscale = gr.Slider(1, 3, value=1, step=0.1, label="Downscale ratio")
crop = gr.Slider(0, 0.9, value=0, step=0.01, label="Random crop ratio")
with gr.Row():
brightness = gr.Slider(0, 2, value=1, step=0.1, label="Brightness")
contrast = gr.Slider(0, 2, value=1, step=0.1, label="Contrast")
with gr.Row():
saturation = gr.Slider(0, 2, value=1, step=0.1, label="Color saturation")
jpeg_compression = gr.Slider(value=100, step=5, label="JPEG quality")
btn2 = gr.Button("Edit")
with gr.Row():
attacked_image = gr.Image(type="pil", width=512, sources=['upload', 'clipboard'])
with gr.Row():
image_info_label = gr.Label(label="Image info")
gr.Markdown("""## 3. Detect
Detect the watermark on the altered image. Watermark may not be detected if the image is altered too strongly.
You may choose to detect with our fast [proxy model](https://huggingface.co/imatag/stable-signature-bzh-detector-resnet18), or via API for improved robustness.
""")
with gr.Row():
detection_method = gr.Dropdown(choices=["proxy model", "API"], label="Detection method", value="proxy model")
btn3 = gr.Button("Detect")
with gr.Row():
detection_label = gr.Label(label="Detection info")
btn1.click(fn=backend.generate, inputs=[mode, seed, inp], outputs=[watermarked_image], api_name="generate")
btn2.click(fn=backend.attack, inputs=[watermarked_image, jpeg_compression, downscale, crop, saturation, brightness, contrast], outputs=[attacked_image, image_info_label], api_name="attack")
btn3.click(fn=backend.detect, inputs=[attacked_image, detection_method], outputs=[detection_label], api_name="detect")
return demo
if __name__ == '__main__':
demo = interface()
demo.launch()