Spaces:
Paused
Paused
import gradio as gr | |
import torch | |
from diffusers import AutoPipelineForInpainting | |
from PIL import Image | |
from transformers import ( | |
AutoModelForCausalLM, | |
AutoTokenizer, | |
BlipForConditionalGeneration, | |
BlipProcessor, | |
OwlViTForObjectDetection, | |
OwlViTProcessor, | |
SamModel, | |
SamProcessor, | |
) | |
def delete_model(model): | |
model.to("cpu") | |
del model | |
torch.cuda.empty_cache() | |
def run_language_model(edit_prompt, device): | |
language_model_id = "Qwen/Qwen1.5-0.5B-Chat" | |
language_model = AutoModelForCausalLM.from_pretrained( | |
language_model_id, device_map="auto" | |
) | |
tokenizer = AutoTokenizer.from_pretrained(language_model_id) | |
messages = [ | |
{ | |
"role": "system", | |
"content": "Follow the examples and return the expected output", | |
}, | |
{"role": "user", "content": "swap mountain and lion"}, # example 1 | |
{"role": "assistant", "content": "mountain, lion"}, # example 1 | |
{"role": "user", "content": "change the dog with cat"}, # example 2 | |
{"role": "assistant", "content": "dog, cat"}, # example 2 | |
{"role": "user", "content": "replace the human with a boat"}, # example 3 | |
{"role": "assistant", "content": "human, boat"}, # example 3 | |
{"role": "user", "content": edit_prompt}, | |
] | |
text = tokenizer.apply_chat_template( | |
messages, tokenize=False, add_generation_prompt=True | |
) | |
model_inputs = tokenizer([text], return_tensors="pt").to(device) | |
generated_ids = language_model.generate(model_inputs.input_ids, max_new_tokens=512) | |
generated_ids = [ | |
output_ids[len(input_ids) :] | |
for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) | |
] | |
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
to_replace, replace_with = response.split(", ") | |
delete_model(language_model) | |
return (to_replace, replace_with) | |
def run_image_captioner(image, device): | |
caption_model_id = "Salesforce/blip-image-captioning-base" | |
caption_model = BlipForConditionalGeneration.from_pretrained(caption_model_id).to( | |
device | |
) | |
caption_processor = BlipProcessor.from_pretrained(caption_model_id) | |
inputs = caption_processor(image, return_tensors="pt").to(device) | |
with torch.no_grad(): | |
outputs = caption_model.generate(**inputs, max_new_tokens=200) | |
caption = caption_processor.decode(outputs[0], skip_special_tokens=True) | |
delete_model(caption_model) | |
return caption | |
def run_segmentation(image, object_to_segment, device): | |
# OWL-ViT for object detection | |
owl_vit_model_id = "google/owlvit-base-patch32" | |
processor = OwlViTProcessor.from_pretrained(owl_vit_model_id) | |
od_model = OwlViTForObjectDetection.from_pretrained(owl_vit_model_id).to(device) | |
text_queries = [object_to_segment] | |
inputs = processor(text=text_queries, images=image, return_tensors="pt").to(device) | |
with torch.no_grad(): | |
outputs = od_model(**inputs) | |
target_sizes = torch.tensor([image.size]).to(device) | |
results = processor.post_process_object_detection( | |
outputs, threshold=0.1, target_sizes=target_sizes | |
)[0] | |
boxes = results["boxes"].tolist() | |
delete_model(od_model) | |
# SAM for image segmentation | |
sam_model_id = "facebook/sam-vit-base" | |
seg_model = SamModel.from_pretrained(sam_model_id).to(device) | |
processor = SamProcessor.from_pretrained(sam_model_id) | |
input_boxes = [boxes] | |
inputs = processor(image, input_boxes=input_boxes, return_tensors="pt").to(device) | |
with torch.no_grad(): | |
outputs = seg_model(**inputs) | |
masks = processor.image_processor.post_process_masks( | |
outputs.pred_masks.cpu(), | |
inputs["original_sizes"].cpu(), | |
inputs["reshaped_input_sizes"].cpu(), | |
) | |
delete_model(seg_model) | |
return masks | |
def run_inpainting(image, replaced_caption, masks, device): | |
pipeline = AutoPipelineForInpainting.from_pretrained( | |
"diffusers/stable-diffusion-xl-1.0-inpainting-0.1", | |
torch_dtype=torch.float16, | |
variant="fp16", | |
).to(device) | |
prompt = replaced_caption | |
negative_prompt = """lowres, bad anatomy, bad hands, | |
text, error, missing fingers, extra digit, fewer digits, | |
cropped, worst quality, low quality""" | |
output = pipeline( | |
prompt=prompt, | |
image=image, | |
mask_image=Image.fromarray(masks[0][0][0, :, :].numpy()), | |
negative_prompt=negative_prompt, | |
guidance_scale=7.5, | |
strength=0.6, | |
).images[0] | |
delete_model(pipeline) | |
return output | |
def run_open_gen_fill(image, edit_prompt): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Resize the image to (512, 512) | |
image = image.resize((512, 512)) | |
# Run the langauge model to extract the objects to be swapped from | |
# the edit prompt | |
to_replace, replace_with = run_language_model( | |
edit_prompt=edit_prompt, device=device | |
) | |
# Caption the input image | |
caption = run_image_captioner(image, device=device) | |
# Replace the object in the caption with the new object | |
replaced_caption = caption.replace(to_replace, replace_with) | |
# Segment the `to_replace` object from the input image | |
masks = run_segmentation(image, to_replace, device=device) | |
# Diffusion pipeline for inpainting | |
return run_inpainting( | |
image=image, replaced_caption=replaced_caption, masks=masks, device=device | |
) | |
def setup_gradio_interface(): | |
block = gr.Blocks() | |
with block: | |
gr.Markdown("<h1><center>Open Generative Fill<h1><center>") | |
with gr.Row(): | |
with gr.Column(): | |
input_image_placeholder = gr.Image(type="pil", label="Input Image") | |
edit_prompt_placeholder = gr.Textbox(label="Enter the editing prompt") | |
run_button_placeholder = gr.Button(value="Run") | |
with gr.Column(): | |
output_image_placeholder = gr.Image(type="pil", label="Output Image") | |
run_button_placeholder.click( | |
fn=lambda image, edit_prompt: run_open_gen_fill( | |
image=image, | |
edit_prompt=edit_prompt, | |
), | |
inputs=[input_image_placeholder, edit_prompt_placeholder], | |
outputs=[output_image_placeholder], | |
) | |
return block | |
if __name__ == "__main__": | |
gradio_interface = setup_gradio_interface() | |
gradio_interface.queue(max_size=5) | |
gradio_interface.launch(share=False, show_api=False, show_error=True) | |