from sentence_transformers import SentenceTransformer, util | |
class ImageSimilarity(object): | |
def __init__(self, minimum_commutative_image_diff): | |
self.minimum_commutative_image_diff = minimum_commutative_image_diff | |
def check(self, pil_images): | |
results = [] | |
# Load the OpenAI CLIP Model | |
print('Loading CLIP Model...') | |
model = SentenceTransformer('clip-ViT-B-32') | |
print("Images:", len(pil_images)) | |
encoded_image = model.encode([image["pil"] for image in pil_images], batch_size=128, convert_to_tensor=True, show_progress_bar=True) | |
# Now we run the clustering algorithm. This function compares images aganist | |
# all other images and returns a list with the pairs that have the highest | |
# cosine similarity score | |
processed_images = util.paraphrase_mining_embeddings(encoded_image) | |
# NUM_SIMILAR_IMAGES = 10 | |
# ================= | |
# DUPLICATES | |
# ================= | |
# print('Finding duplicate images...') | |
# Filter list for duplicates. Results are triplets (score, image_id1, image_id2) and is scorted in decreasing order | |
# A duplicate image will have a score of 1.00 | |
# It may be 0.9999 due to lossy image compression (.jpg) | |
# duplicates = [image for image in processed_images if image[0] >= 0.999] | |
# Output the top X duplicate images | |
# for score, image_id1, image_id2 in duplicates[0:NUM_SIMILAR_IMAGES]: | |
# print("\nScore: {:.3f}%".format(score * 100)) | |
# print(pil_images[image_id1]) | |
# print(pil_images[image_id2]) | |
# ================= | |
# NEAR DUPLICATES | |
# ================= | |
print('Finding near duplicate images...') | |
# Use a threshold parameter to identify two images as similar. By setting the threshold lower, | |
# you will get larger clusters which have less similar images in it. Threshold 0 - 1.00 | |
# A threshold of 1.00 means the two images are exactly the same. Since we are finding near | |
# duplicate images, we can set it at 0.99 or any number 0 < X < 1.00. | |
# threshold = 0.99 | |
# near_duplicates = [image for image in processed_images if image[0] < threshold] | |
for score, image_id1, image_id2 in processed_images: | |
results.append({ | |
'score': score, | |
'image1': pil_images[image_id1]["key"], | |
'image2': pil_images[image_id2]["key"] | |
}) | |
# print("\nScore: {:.3f}%".format(score * 100)) | |
# print(pil_images[image_id1]["key"]) | |
# print(pil_images[image_id2]["key"]) | |
return results |