import cv2 import requests from PIL import Image import PIL from PIL import ImageDraw from matplotlib import pyplot as plt import matplotlib from matplotlib import rcParams import os import tempfile from io import BytesIO from pathlib import Path import argparse import random import numpy as np import torch import matplotlib.cm as cm import pandas as pd from transformers import OwlViTProcessor, OwlViTForObjectDetection from transformers.image_utils import ImageFeatureExtractionMixin from SuperGluePretrainedNetwork.models.matching import Matching from SuperGluePretrainedNetwork.models.utils import (compute_pose_error, compute_epipolar_error, estimate_pose, error_colormap, AverageTimer, pose_auc, read_image, rotate_intrinsics, rotate_pose_inplane, scale_intrinsics) torch.set_grad_enabled(False) mixin = ImageFeatureExtractionMixin() model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32") processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32") # Use GPU if available if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") import requests from PIL import Image, ImageDraw from io import BytesIO import matplotlib.pyplot as plt import numpy as np import torch import cv2 import tempfile def detect_and_crop2(target_image_path, query_image_path, model, processor, mixin, device, threshold=0.5, nms_threshold=0.3, visualize=True): # Open target image image = Image.open(target_image_path).convert('RGB') image_size = model.config.vision_config.image_size + 5 image = mixin.resize(image, image_size) target_sizes = torch.Tensor([image.size[::-1]]) # Open query image query_image = Image.open(query_image_path).convert('RGB') image_size = model.config.vision_config.image_size + 5 query_image = mixin.resize(query_image, image_size) # Process input and query image inputs = processor(images=image, query_images=query_image, return_tensors="pt").to(device) # Get predictions with torch.no_grad(): outputs = model.image_guided_detection(**inputs) # Convert predictions to CPU img = cv2.cvtColor(np.array(image), cv2.COLOR_BGR2RGB) outputs.logits = outputs.logits.cpu() outputs.target_pred_boxes = outputs.target_pred_boxes.cpu() # Post process the predictions results = processor.post_process_image_guided_detection(outputs=outputs, threshold=threshold, nms_threshold=nms_threshold, target_sizes=target_sizes) boxes, scores = results[0]["boxes"], results[0]["scores"] # If no boxes, return an empty list if len(boxes) == 0 and visualize: print(f"No boxes detected for image: {target_image_path}") fig, ax = plt.subplots(figsize=(6, 6)) ax.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) ax.set_title("Original Image") ax.axis("off") plt.show() return [] # Filter boxes img_with_all_boxes = img.copy() filtered_boxes = [] filtered_scores = [] img_width, img_height = img.shape[1], img.shape[0] for box, score in zip(boxes, scores): x1, y1, x2, y2 = [int(i) for i in box.tolist()] if x1 < 0 or y1 < 0 or x2 < 0 or y2 < 0: continue if (x2 - x1) / img_width >= 0.94 and (y2 - y1) / img_height >= 0.94: continue filtered_boxes.append([x1, y1, x2, y2]) filtered_scores.append(score) # Draw boxes on original image draw = ImageDraw.Draw(image) for box in filtered_boxes: draw.rectangle(box, outline="red",width=3) cropped_images = [] for box in filtered_boxes: x1, y1, x2, y2 = box cropped_img = img[y1:y2, x1:x2] if cropped_img.size != 0: cropped_images.append(cropped_img) if visualize: # Visualization if not filtered_boxes: fig, ax = plt.subplots(figsize=(6, 6)) ax.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) ax.set_title("Original Image") ax.axis("off") plt.show() else: fig, axs = plt.subplots(1, len(cropped_images) + 2, figsize=(15, 5)) axs[0].imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) axs[0].set_title("Original Image") axs[0].axis("off") for i, (box, score) in enumerate(zip(filtered_boxes, filtered_scores)): x1, y1, x2, y2 = box cropped_img = img[y1:y2, x1:x2] font = cv2.FONT_HERSHEY_SIMPLEX text = f"{score:.2f}" cv2.putText(cropped_img, text, (5, cropped_img.shape[0]-10), font, 0.5, (255,0,0), 1, cv2.LINE_AA) axs[i+2].imshow(cv2.cvtColor(cropped_img, cv2.COLOR_BGR2RGB)) axs[i+2].set_title("Score: " + text) axs[i+2].axis("off") plt.tight_layout() plt.show() return cropped_images, image # return original image with boxes drawn def save_array_to_temp_image(arr): # Convert the array to an image img = Image.fromarray(arr) # Create a temporary file for the image temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png', dir=tempfile.gettempdir()) temp_file_name = temp_file.name temp_file.close() # We close it because we're not writing to it directly, PIL will handle the writing # Save the image to the temp file img.save(temp_file_name) return temp_file_name ''' def process_resize(w: int, h: int, resize_dims: list) -> tuple: if len(resize_dims) == 1 and resize_dims[0] > -1: scale = resize_dims[0] / max(h, w) w_new, h_new = int(round(w * scale)), int(round(h * scale)) return w_new, h_new return w, h ''' def plot_image_pair(imgs, dpi=100, size=6, pad=.5): n = len(imgs) assert n == 2, 'number of images must be two' figsize = (size*n, size*3/4) if size is not None else None _, ax = plt.subplots(1, n, figsize=figsize, dpi=dpi) for i in range(n): ax[i].imshow(imgs[i], cmap=plt.get_cmap('gray'), vmin=0, vmax=255) ax[i].get_yaxis().set_ticks([]) ax[i].get_xaxis().set_ticks([]) for spine in ax[i].spines.values(): # remove frame spine.set_visible(False) plt.tight_layout(pad=pad) def plot_keypoints(kpts0, kpts1, color='w', ps=2): ax = plt.gcf().axes ax[0].scatter(kpts0[:, 0], kpts0[:, 1], c=color, s=ps) ax[1].scatter(kpts1[:, 0], kpts1[:, 1], c=color, s=ps) def plot_matches(kpts0, kpts1, color, lw=1.5, ps=4): fig = plt.gcf() ax = fig.axes fig.canvas.draw() transFigure = fig.transFigure.inverted() fkpts0 = transFigure.transform(ax[0].transData.transform(kpts0)) fkpts1 = transFigure.transform(ax[1].transData.transform(kpts1)) fig.lines = [matplotlib.lines.Line2D( (fkpts0[i, 0], fkpts1[i, 0]), (fkpts0[i, 1], fkpts1[i, 1]), zorder=1, transform=fig.transFigure, c=color[i], linewidth=lw) for i in range(len(kpts0))] ax[0].scatter(kpts0[:, 0], kpts0[:, 1], c=color, s=ps) ax[1].scatter(kpts1[:, 0], kpts1[:, 1], c=color, s=ps) def unified_matching_plot2(image0, image1, kpts0, kpts1, mkpts0, mkpts1, color, text, path=None, show_keypoints=False, fast_viz=False, opencv_display=False, opencv_title='matches', small_text=[]): # Set the background color for the plot plt.figure(facecolor='#eeeeee') plot_image_pair([image0, image1]) # Elegant points and lines for matches if show_keypoints: plot_keypoints(kpts0, kpts1, color='k', ps=4) plot_keypoints(kpts0, kpts1, color='w', ps=2) plot_matches(mkpts0, mkpts1, color, lw=1) fig = plt.gcf() # Add text fig.text( 0.01, 0.01, '\n'.join(small_text), transform=fig.axes[0].transAxes, fontsize=10, va='bottom', ha='left', color='#333333', fontweight='bold', fontname='Helvetica', bbox=dict(facecolor='white', alpha=0.7, edgecolor='none', boxstyle="round,pad=0.3")) fig.text( 0.01, 0.99, '\n'.join(text), transform=fig.axes[0].transAxes, fontsize=15, va='top', ha='left', color='#333333', fontweight='bold', fontname='Helvetica', bbox=dict(facecolor='white', alpha=0.7, edgecolor='none', boxstyle="round,pad=0.3")) # Optional: remove axis for a cleaner look plt.axis('off') # Convert the figure to an OpenCV image buf = BytesIO() plt.savefig(buf, format='png', bbox_inches='tight', pad_inches=0) buf.seek(0) img_arr = np.frombuffer(buf.getvalue(), dtype=np.uint8) buf.close() img = cv2.imdecode(img_arr, 1) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # Close the figure to free memory plt.close(fig) return img def create_image_pyramid2(image_path, longest_side, scales=[0.25, 0.5, 1.0]): original_image = cv2.imread(image_path) oh, ow, _ = original_image.shape # Determine the scaling factor based on the longest side if oh > ow: output_height = longest_side output_width = int((ow / oh) * longest_side) else: output_width = longest_side output_height = int((oh / ow) * longest_side) output_size = (output_width, output_height) pyramid = [] for scale in scales: # Resize based on the scale factor resized = cv2.resize(original_image, None, fx=scale, fy=scale) rh, rw, _ = resized.shape if scale < 1.0: # downsampling # Calculate the amount of padding required dy_top = max((output_size[1] - rh) // 2, 0) dy_bottom = output_size[1] - rh - dy_top dx_left = max((output_size[0] - rw) // 2, 0) dx_right = output_size[0] - rw - dx_left # Create padded image padded = cv2.copyMakeBorder(resized, dy_top, dy_bottom, dx_left, dx_right, cv2.BORDER_CONSTANT, value=[255, 255, 255]) pyramid.append(padded) elif scale > 1.0: # upsampling # We need to crop the image to fit the desired output size dy = (rh - output_size[1]) // 2 dx = (rw - output_size[0]) // 2 cropped = resized[dy:dy+output_size[1], dx:dx+output_size[0]] pyramid.append(cropped) else: # scale == 1.0 pyramid.append(resized) return pyramid # Example usage # pyramid = create_image_pyramid('path_to_image.jpg', 800) def image_matching(query_img, target_img, image_dims=[640*2], scale_factors=[0.33,0.66,1], visualize=True, k_thresh=None, m_thresh=None, write=False): image1, inp1, scales1 = read_image(target_img, device, [640*2], 0, True) query_pyramid = create_image_pyramid2(query_img, image_dims[0], scale_factors) all_valid = [] all_inliers = [] all_return_imgs = [] max_matches_img = None max_matches = -1 for idx, query_level in enumerate(query_pyramid): temp_file_path = "temp_level_{}.png".format(idx) cv2.imwrite(temp_file_path, query_level) image0, inp0, scales0 = read_image(temp_file_path, device, [640*2], 0, True) if image0 is None or image1 is None: print('Problem reading image pair: {} {}'.format(query_img, target_img)) else: # Matching pred = matching({'image0': inp0, 'image1': inp1}) pred = {k: v[0] for k, v in pred.items()} kpts0, kpts1 = pred['keypoints0'], pred['keypoints1'] matches, conf = pred['matches0'], pred['matching_scores0'] valid = matches > -1 mkpts0 = kpts0[valid] mkpts1 = kpts1[matches[valid]] mconf = conf[valid] #color = cm.jet(mconf)[:len(mkpts0)] # Ensure consistent size color = cm.jet(mconf.detach().numpy())[:len(mkpts0)] all_valid.append(np.sum( valid.tolist() )) # Convert torch tensors to numpy arrays. mkpts0_np = mkpts0.cpu().numpy() mkpts1_np = mkpts1.cpu().numpy() try: # Use RANSAC to find the homography matrix. H, inliers = cv2.findHomography(mkpts0_np, mkpts1_np, cv2.RANSAC, 5.0) except: H = 0 inliers = 0 print ("Not enough points for homography") # Convert inliers from shape (N, 1) to shape (N,) and count them. num_inliers = np.sum(inliers) all_inliers.append(num_inliers) # Visualization text = [ 'Engagify Image Matching', 'Keypoints: {}:{}'.format(len(kpts0), len(kpts1)), 'Scaling Factor: {}'.format( scale_factors[idx]), 'Matches: {}'.format(len(mkpts0)), 'Inliers: {}'.format(num_inliers), ] k_thresh = matching.superpoint.config['keypoint_threshold'] m_thresh = matching.superglue.config['match_threshold'] small_text = [ 'Keypoint Threshold: {:.4f}'.format(k_thresh), 'Match Threshold: {:.2f}'.format(m_thresh), ] visualized_img = None # To store the visualized image if visualize: ret_img = unified_matching_plot2( image0, image1, kpts0, kpts1, mkpts0, mkpts1, color, text, 'Test_Level_{}'.format(idx), True, False, True, 'Matches_Level_{}'.format(idx), small_text) all_return_imgs.append(ret_img) # Storing image with most matches #if len(mkpts0) > max_matches: # max_matches = len(mkpts0) # max_matches_img = 'Matches_Level_{}'.format(idx) avg_valid = np.sum(all_valid) / len(scale_factors) avg_inliers = np.sum(all_inliers) / len(scale_factors) # Convert the image with the most matches to base64 encoded format # with open(max_matches_img, "rb") as image_file: # encoded_string = base64.b64encode(image_file.read()).decode() return {'valid':all_valid, 'inliers':all_inliers, 'visualized_image':all_return_imgs} #, encoded_string # Usage: #results = image_matching('Samples/Poster/poster_event_small_22.jpg', 'Samples/Images/16.jpeg', visualize=True) #print (results) def image_matching_no_pyramid(query_img, target_img, visualize=True, write=False): image1, inp1, scales1 = read_image(target_img, device, [640*2], 0, True) image0, inp0, scales0 = read_image(query_img, device, [640*2], 0, True) if image0 is None or image1 is None: print('Problem reading image pair: {} {}'.format(query_img, target_img)) return None # Matching pred = matching({'image0': inp0, 'image1': inp1}) pred = {k: v[0] for k, v in pred.items()} kpts0, kpts1 = pred['keypoints0'], pred['keypoints1'] matches, conf = pred['matches0'], pred['matching_scores0'] valid = matches > -1 mkpts0 = kpts0[valid] mkpts1 = kpts1[matches[valid]] mconf = conf[valid] #color = cm.jet(mconf)[:len(mkpts0)] # Ensure consistent size color = cm.jet(mconf.detach().numpy())[:len(mkpts0)] valid_count = np.sum(valid.tolist()) # Convert torch tensors to numpy arrays. mkpts0_np = mkpts0.cpu().numpy() mkpts1_np = mkpts1.cpu().numpy() try: # Use RANSAC to find the homography matrix. H, inliers = cv2.findHomography(mkpts0_np, mkpts1_np, cv2.RANSAC, 5.0) except: H = 0 inliers = 0 print("Not enough points for homography") # Convert inliers from shape (N, 1) to shape (N,) and count them. num_inliers = np.sum(inliers) # Visualization text = [ 'Engagify Image Matching', 'Keypoints: {}:{}'.format(len(kpts0), len(kpts1)), 'Matches: {}'.format(len(mkpts0)), 'Inliers: {}'.format(num_inliers), ] k_thresh = matching.superpoint.config['keypoint_threshold'] m_thresh = matching.superglue.config['match_threshold'] small_text = [ 'Keypoint Threshold: {:.4f}'.format(k_thresh), 'Match Threshold: {:.2f}'.format(m_thresh), ] visualized_img = None # To store the visualized image if visualize: visualized_img = unified_matching_plot2( image0, image1, kpts0, kpts1, mkpts0, mkpts1, color, text, 'Test_Match', True, False, True, 'Matches', small_text) return { 'valid': [valid_count], 'inliers': [num_inliers], 'visualized_image': [visualized_img] } # Usage: #results = image_matching_no_pyramid('Samples/Poster/poster_event_small_22.jpg', 'Samples/Images/16.jpeg', visualize=True) # Load the SuperPoint and SuperGlue models. device = 'cuda' if torch.cuda.is_available() and not opt.force_cpu else 'cpu' print('Running inference on device \"{}\"'.format(device)) config = { 'superpoint': { 'nms_radius': 4, 'keypoint_threshold': 0.005, 'max_keypoints': 1024 }, 'superglue': { 'weights': 'outdoor', 'sinkhorn_iterations': 20, 'match_threshold': 0.2, } } matching = Matching(config).eval().to(device) from PIL import Image def stitch_images(images): """Stitches a list of images vertically.""" if not images: # Return a placeholder image if the images list is empty return Image.new('RGB', (100, 100), color='gray') max_width = max([img.width for img in images]) total_height = sum(img.height for img in images) composite = Image.new('RGB', (max_width, total_height)) y_offset = 0 for img in images: composite.paste(img, (0, y_offset)) y_offset += img.height return composite def check_object_in_image3(query_image, target_image, threshold=50, scale_factor=[0.33,0.66,1]): decision_on = [] # Convert cv2 images to PIL images and add them to a list images_to_return = [] cropped_images, bbox_image = detect_and_crop2(target_image_path=target_image, query_image_path=query_image, model=model, processor=processor, mixin=mixin, device=device, visualize=False) temp_files = [save_array_to_temp_image(i) for i in cropped_images] crop_results = [image_matching_no_pyramid(query_image, i, visualize=True) for i in temp_files] cropped_visuals = [] cropped_inliers = [] for result in crop_results: # Add visualized images to the temporary list for img in result['visualized_image']: cropped_visuals.append(Image.fromarray(img)) for inliers_ in result['inliers']: cropped_inliers.append(inliers_) # Stitch the cropped visuals into one image images_to_return.append(stitch_images(cropped_visuals)) pyramid_results = image_matching(query_image, target_image, visualize=True, scale_factors=scale_factor) pyramid_visuals = [Image.fromarray(img) for img in pyramid_results['visualized_image']] # Stitch the pyramid visuals into one image images_to_return.append(stitch_images(pyramid_visuals)) # Check inliers and determine if the object is present print (cropped_inliers) is_present = any(value > threshold for value in cropped_inliers) if is_present == True: decision_on.append('Object Detection') is_present = any(value > threshold for value in pyramid_results["inliers"]) if is_present == True: decision_on.append('Pyramid Max Point') if is_present == False: decision_on.append("Neither, It Failed All Tests") # Return results as a dictionary return { 'is_present': is_present, 'images': images_to_return, 'scale factors': scale_factor, 'object detection inliers': cropped_inliers, 'pyramid_inliers' : pyramid_results["inliers"], 'bbox_image':bbox_image, 'decision_on':decision_on, } # Example call: #result = check_object_in_image3('Samples/Poster/poster_event_small.jpg', 'Samples/Images/True_Image_3423234.jpeg', 50) # Accessing the results: #print(result['is_present']) # prints True/False #print(result['images']) # is a list of 2 stitched images. import gradio as gr import cv2 from PIL import Image def gradio_interface(query_image_path, target_image_path, threshold): result = check_object_in_image3(query_image_path, target_image_path, threshold) # Depending on how many images are in the list, you can return them like this: return result['bbox_image'], result['images'][0], result['object detection inliers'], result['scale factors'], result['pyramid_inliers'], result['images'][1], str(result['is_present']), result['decision_on'] # Define the Gradio interface interface = gr.Interface( fn=gradio_interface, # function to be called on button press inputs=[ gr.components.Image(label="Query Image (Drop the Image you want to detect here)", type="filepath"), gr.components.Image(label="Target Image (Drop the Image youd like to search here)", type="filepath"), gr.components.Slider(minimum=0, maximum=200, value=50, step=5, label="Enter the Inlier Threshold"), ], outputs=[ gr.components.Image(label='Filtered Regions of Interest (Candidates)'), gr.components.Image(label="Cropped Visuals from Image Guided Object Detection "), gr.components.Text(label='Inliers detected for Image Guided Object Detection '), gr.components.Text(label='Scale Factors Used for Pyramid (Results below, In Order)'), gr.components.Text(label='Inliers detected for Pyramid Search (In Order)'), gr.components.Image(label="Pyramid Visuals"), gr.components.Textbox(label="Object Present?"), gr.components.Textbox(label="Decision Taken Based on?"), ], theme=gr.themes.Monochrome(), title="Engajify's Image Specific Image Recognition + Matching Tool", description="[Author: Ibrahim Hasani] \n " " This tool leverages Transformer, Deep Learning, and Traditional Computer Vision techniques to determine if a specified object " "(given by the query image) is present within a target image. \n" "1. Image-Guided Object Detection where we detect potential regions of interest. (Owl-Vit-Google). \n" "2. Pyramid Search that looks at various scales of the target image. Results provide " "visual representations of the matching process and a final verdict on the object's presence.\n" "3. SuperPoint (MagicLeap) + SuperGlue + Homography to extract inliers, which are thresholded for decision making." ) interface.launch()