noni27's picture
Update app.py
87f24e2 verified
raw
history blame
9.88 kB
import os
import warnings
warnings.filterwarnings('ignore')
import subprocess, io, os, sys, time
os.system("pip install -q gradio")
os.system("pip install -q diffusers")
os.system("pip install -q segment_anything")
os.system("pip install accelerate")
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"
result = subprocess.run(['pip', 'install', '-e', 'GroundingDINO'], check=True)
print(f'pip install GroundingDINO = {result}')
sys.path.insert(0, './GroundingDINO')
# text_prompt = 'basket'
# image_path = '/kaggle/input/avataar/wall hanging.jpg'
# output_image_path = '/kaggle/working'
'''Importing Libraries'''
import os
import groundingdino.datasets.transforms as T
import numpy as np
import torch
from groundingdino.models import build_model
from groundingdino.util import box_ops
from groundingdino.util.inference import predict
from groundingdino.util.slconfig import SLConfig
from groundingdino.util.utils import clean_state_dict
from huggingface_hub import hf_hub_download
from segment_anything import sam_model_registry
from segment_anything import SamPredictor
from diffusers import StableDiffusionInpaintPipeline
from scipy.ndimage import binary_dilation
import cv2
import matplotlib.pyplot as plt
from PIL import Image
from torchvision.utils import draw_bounding_boxes
from torchvision.utils import draw_segmentation_masks
def load_model_hf(repo_id, filename, ckpt_config_filename, device='cpu'):
'''
Loads model from hugging face, we use it to get grounding dino model checkpoints
'''
cache_config_file = hf_hub_download(repo_id=repo_id, filename=ckpt_config_filename)
args = SLConfig.fromfile(cache_config_file)
model = build_model(args)
args.device = device
cache_file = hf_hub_download(repo_id=repo_id, filename=filename)
checkpoint = torch.load(cache_file, map_location='cpu')
log = model.load_state_dict(clean_state_dict(checkpoint['model']), strict=False)
model.eval()
return model
def transform_image(image) -> torch.Tensor:
transform = T.Compose([
# T.RandomResize([800], max_size=1333),
T.ToTensor(),
# T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])
image_transformed, _ = transform(image, None)
return image_transformed
class CFG:
'''
Defines variables used in our code
'''
sam_type = "vit_h"
SAM_MODELS = {
"vit_h": "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth",
"vit_l": "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_l_0b3195.pth",
"vit_b": "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_b_01ec64.pth"
}
device = 'cuda' if torch.cuda.is_available() else 'cpu'
ckpt_repo_id = "ShilongLiu/GroundingDINO"
ckpt_filename = "groundingdino_swinb_cogcoor.pth"
ckpt_config_filename = "GroundingDINO_SwinB.cfg.py"
# image_path = os.path.join(os.getcwd(), 'fruits.jpg')
# image_path = '/kaggle/input/avataar/wall hanging.jpg'
# text_prompt = 'chair'
'''Build models'''
def build_sam():
checkpoint_url = CFG.SAM_MODELS[CFG.sam_type]
sam = sam_model_registry[CFG.sam_type]()
state_dict = torch.hub.load_state_dict_from_url(checkpoint_url)
sam.load_state_dict(state_dict, strict=True)
sam.to(device = CFG.device)
sam = SamPredictor(sam)
print('SAM is built !')
return sam
def build_groundingdino():
ckpt_repo_id = CFG.ckpt_repo_id
ckpt_filename = CFG.ckpt_filename
ckpt_config_filename = CFG.ckpt_config_filename
groundingdino = load_model_hf(ckpt_repo_id, ckpt_filename, ckpt_config_filename)
print('Grounding DINO is built !')
return groundingdino
model_sam = build_sam()
model_groundingdino = build_groundingdino()
'''Predictions'''
def predict_dino(image_pil, text_prompt, box_threshold, text_threshold):
image_trans = transform_image(image_pil)
boxes, logits, phrases = predict(model = model_groundingdino,
image = image_trans,
caption = text_prompt,
box_threshold = box_threshold,
text_threshold = text_threshold,
device = CFG.device)
W, H = image_pil.size
boxes = box_ops.box_cxcywh_to_xyxy(boxes) * torch.Tensor([W, H, W, H]) # center cood to corner cood
print('DINO prediction done !')
return boxes, logits, phrases
def predict_sam(image_pil, boxes):
image_array = np.asarray(image_pil)
model_sam.set_image(image_array)
transformed_boxes = model_sam.transform.apply_boxes_torch(boxes, image_array.shape[:2])
masks, _, _ = model_sam.predict_torch(
point_coords=None,
point_labels=None,
boxes=transformed_boxes.to(model_sam.device),
multimask_output=False,
)
print('SAM prediction done !')
return masks.cpu()
def mask_predict(image_pil, text_prompt, box_threshold=0.3, text_threshold=0.25):
boxes, logits, phrases = predict_dino(image_pil, text_prompt, box_threshold, text_threshold)
masks = torch.tensor([])
if len(boxes) > 0:
masks = predict_sam(image_pil, boxes)
masks = masks.squeeze(1)
return masks, boxes, phrases, logits
'''Utils'''
def load_image(image_path):
return Image.open(image_path).convert("RGB")
def draw_image(image_pil, masks, boxes, alpha=0.4):
image = np.asarray(image_pil)
image = torch.from_numpy(image).permute(2, 0, 1)
if len(masks) > 0:
image = draw_segmentation_masks(image, masks=masks, colors=['red'] * len(masks), alpha=alpha)
return image.numpy().transpose(1, 2, 0)
# torch.save(masks, 'masks.pt')
'''Visualise segmented results'''
def visualize_results(img1, img2, task):
fig, axes = plt.subplots(1, 2, figsize=(20, 10))
axes[0].imshow(img1)
axes[0].set_title('Original Image')
axes[1].imshow(img2)
axes[1].set_title(f'{text_prompt} : {task}')
for ax in axes:
ax.axis('off')
# visualize_results(image_pil, output, 'segmented')
# x_units = 200
# y_units = -100
# text_prompt = 'wooden stool'
# image_path = '/kaggle/input/avataar/stool.jpeg'
# output_image_path = '/kaggle/working'
def main_fun(image_pil, x_units, y_units, text_prompt):
# x_units = 200
# y_units = -100
# text_prompt = 'wooden stool'
# image_pil = load_image(image_path)
masks, boxes, phrases, logits = mask_predict(image_pil, text_prompt=text_prompt, box_threshold=0.23, text_threshold=0.25)
output = draw_image(image_pil, masks, boxes, alpha=0.4)
'''Get masked object and background as two separate images'''
mask = np.expand_dims(masks[0], axis=-1)
masked_object = image_pil * mask
background = image_pil * ~mask
'''Shifts image by x_units and y_units'''
M = np.float32([[1, 0, x_units], [0, 1, y_units]])
shifted_image = cv2.warpAffine(masked_object, M, (masked_object.shape[1] , masked_object.shape[0] ), borderMode=cv2.BORDER_CONSTANT, borderValue=(0, 0, 0))
masked_shifted_image = np.where(shifted_image[:, :, 0] != 0, True, False)
'''Load stable diffuser model at checkpoint finetuned for inpainting task'''
pipe = StableDiffusionInpaintPipeline.from_pretrained(
# "runwayml/stable-diffusion-inpainting", torch_dtype=torch.float16
"stabilityai/stable-diffusion-2-inpainting",torch_dtype=torch.float16)
pipe.to(CFG.device)
print('StableDiffusion model loaded !')
# With Dilation
structuring_element = np.ones((15, 15, 1), dtype=bool)
extrapolated_mask = binary_dilation(mask, structure=structuring_element)
mask_as_uint8 = extrapolated_mask.astype(np.uint8) * 255
pil_mask = Image.fromarray(mask_as_uint8.squeeze(), mode='L').resize((1024, 1024))
# # Without Dilation
# pil_background = Image.fromarray(background)
# mask_as_uint8 = mask.astype(np.uint8) * 255
# pil_mask = Image.fromarray(mask_as_uint8.squeeze(), mode='L')
print('Image Inpainting in process.....')
'''Do inpainting on masked locations of original image'''
prompt = 'fill as per background and neighborhood'
inpainted_image = pipe(prompt=prompt, image=image_pil, mask_image=pil_mask).images[0]
print('Image INPAINTED !')
# inpainted_image
'''Get composite of shifted object and background inpainted imaage'''
pil_shifted_image = Image.fromarray(shifted_image).resize(inpainted_image.size)
np_shifted_image = np.array(pil_shifted_image)
masked_shifted_image = np.where(np_shifted_image[:, :, 0] != 0, True, False)
masked_shifted_image = np.expand_dims(masked_shifted_image, axis=-1)
inpainted_shifted = np.array(inpainted_image) * ~masked_shifted_image
shifted_image = cv2.resize(shifted_image, inpainted_image.size)
output = inpainted_shifted + shifted_image
output = Image.fromarray(output)
# visualize_results(image_pil, output, 'shifted')
return output
import gradio as gr
image_blocks = gr.Blocks()
with image_blocks as demo:
with gr.Row():
with gr.Column():
image = gr.Image(sources=['upload'], type="pil", label="Upload")
# with gr.Row(elem_id="prompt-container").style(mobile_collapse=False, equal_height=True):
text_prompt = gr.Textbox(placeholder = 'Your prompt (what you want in place of what is erased)', show_label=True)
x_units = gr.Slider(minimum=50, maximum=300, step=1, value=10, label="x_units")
y_units = gr.Slider(minimum=30, maximum=300, step=0.1, value=5, label="y_units")
# text_prompt = gr.Textbox(lines=1, label="Prompt")
btn = gr.Button(value="Submit")
with gr.Column():
image_out = gr.Image(label="Output")
btn.click(fn=main_fun, inputs=[image, x_units, y_units, text_prompt], outputs=[image_out])
image_blocks.launch()