| | import torch |
| | import asyncio |
| | import traceback |
| | import numpy as np |
| | from fastapi import FastAPI, HTTPException |
| | from pydantic import BaseModel |
| | from aiohttp import ClientSession |
| | from PIL import Image, ImageFilter |
| | from io import BytesIO |
| | from facenet_pytorch import MTCNN, InceptionResnetV1 |
| | import uvicorn |
| | import os |
| |
|
| | class FaceMainExecutionPytorch: |
| | def __init__(self): |
| | self.MVL = 0.90 |
| | self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| | self.detector = MTCNN() |
| | self.resnet = InceptionResnetV1(pretrained="vggface2").eval().to(self.device) |
| |
|
| | async def calculate_blurriness(self, img: Image.Image) -> float: |
| | if img.mode != "L": |
| | img = img.convert("L") |
| | laplacian = img.filter(ImageFilter.FIND_EDGES) |
| | laplacian_array = np.array(laplacian, dtype=np.float32) |
| | return laplacian_array.var() |
| |
|
| | async def extract_face(self, image, detector, size=(160, 160)): |
| | """Detect and extract the face from an image object.""" |
| | try: |
| | boxes, _ = self.detector.detect(np.array(image)) |
| | if boxes is None or len(boxes) == 0: |
| | print("No face detected in the image.") |
| | return None |
| |
|
| | x, y, width, height = boxes[0] |
| | x, y, width, height = int(x), int(y), int(width), int(height) |
| |
|
| | x, y = max(x, 0), max(y, 0) |
| | face = image.crop((x, y, x + width, y + height)) |
| | face = face.resize(size) |
| | return face |
| | except Exception as e: |
| | print(f"Error extracting face: {e}") |
| | traceback.print_exc() |
| | return None |
| |
|
| | async def calculate_embedding(self, face_image): |
| | """Calculate the face embedding.""" |
| | try: |
| | resnet = self.resnet |
| | face_tensor = torch.tensor(np.array(face_image)).permute(2, 0, 1).unsqueeze(0).float() / 255.0 |
| | face_tensor = (face_tensor - 0.5) / 0.5 |
| | face_tensor = face_tensor.to(self.device) |
| | embedding = resnet(face_tensor) |
| | return embedding.detach().cpu().numpy() |
| | except Exception as e: |
| | print(f"Error calculating embedding: {e}") |
| | traceback.print_exc() |
| | return None |
| |
|
| | async def compare_faces(self, embedding1, embedding2): |
| | """Compare two face embeddings and return similarity.""" |
| | distance = np.linalg.norm(embedding1 - embedding2) |
| | distance = round((distance), 2) |
| | print(f"Cosine Distance: {distance}") |
| | return distance < self.MVL |
| |
|
| | async def get_image_from_url(self, img_path): |
| | """Download or load an image from a local path or URL.""" |
| | try: |
| | |
| | if img_path.startswith("http://") or img_path.startswith("https://"): |
| | async with ClientSession() as session: |
| | async with session.get(img_path) as response: |
| | if response.status != 200: |
| | raise ValueError(f"HTTP Error: {response.status}") |
| | img_data = await response.read() |
| | image = Image.open(BytesIO(img_data)) |
| | image.verify() |
| | image = Image.open(BytesIO(img_data)).convert("RGB") |
| | elif os.path.isfile(img_path): |
| | |
| | image = Image.open(img_path).convert("RGB") |
| | else: |
| | raise ValueError("Invalid input path. Must be a valid URL or local file path.") |
| | |
| | blur = await self.calculate_blurriness(image) |
| | if blur and blur < 30: |
| | print(f"===================>>>>>>>>>> Blurry: {blur}") |
| | return None |
| | else: |
| | print('Good Image: ', blur) |
| | |
| | return image |
| | except Exception as e: |
| | print(f"Error downloading or loading image: {e}") |
| | traceback.print_exc() |
| | return None |
| |
|
| | async def compare_faces_from_urls(self, url1, url2): |
| | """Compare faces from two image URLs.""" |
| | try: |
| | task = [ |
| | asyncio.create_task(self.get_image_from_url(url1)), |
| | asyncio.create_task(self.get_image_from_url(url2)) |
| | ] |
| | img1, img2 = await asyncio.gather(*task) |
| |
|
| | if img1 is None or img2 is None: |
| | return False |
| |
|
| | face1 = await self.extract_face(img1, self.detector) |
| | face2 = await self.extract_face(img2, self.detector) |
| |
|
| | if face1 is None or face2 is None: |
| | return False |
| |
|
| | embedding1 = await self.calculate_embedding(face1) |
| | embedding2 = await self.calculate_embedding(face2) |
| |
|
| | if embedding1 is None or embedding2 is None: |
| | return False |
| |
|
| | return await self.compare_faces(embedding1, embedding2) |
| | except Exception as e: |
| | print(f"Error comparing faces from URLs: {e}") |
| | traceback.print_exc() |
| | return False |
| |
|
| | async def faceMain(self, BODY): |
| | try: |
| | result = await self.compare_faces_from_urls(BODY['img1'], BODY['img2']) |
| | if result: |
| | return True |
| | else: |
| | return False |
| | except Exception as e: |
| | traceback.print_exc() |
| | return False |
| |
|
| | |
| | app = FastAPI() |
| |
|
| | class Body(BaseModel): |
| | img1: str |
| | img2: str |
| |
|
| | |
| | face_executor = FaceMainExecutionPytorch() |
| |
|
| | @app.post("/compare_faces") |
| | async def compare_faces(body: Body): |
| | try: |
| | result = await face_executor.faceMain(body.dict()) |
| | print(result) |
| | if result: |
| | return True |
| | else: |
| | return False |
| | |
| | except Exception as e: |
| | traceback.print_exc() |
| | raise HTTPException(status_code=500, detail="Internal Server Error") |
| |
|
| | if __name__ == "__main__": |
| | try: |
| | uvicorn.run(app, host="127.0.0.1", port=8888) |
| | finally: |
| | torch.cuda.empty_cache() |
| |
|