Spaces:
Paused
Paused
| import os | |
| import tempfile | |
| import uuid | |
| from fastapi import File | |
| from paddleocr import PaddleOCR | |
| import spacy | |
| from ultralytics import YOLO | |
| from PIL import Image | |
| import boto3 | |
| import time | |
| from botocore.exceptions import NoCredentialsError | |
| from dotenv import load_dotenv | |
| from colorthief import ColorThief | |
| import logging | |
| from roboflow import Roboflow | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, # Set the logging level | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Log message format | |
| ) | |
| # Create a logger | |
| logger = logging.getLogger(__name__) | |
| load_dotenv() | |
| ocr_model = PaddleOCR(lang='en') | |
| # nlp_ner = spacy.load("en_pipeline") | |
| detector = YOLO('best.pt') | |
| vehicle = YOLO('yolov8x.pt') | |
| rf = Roboflow(api_key="P4usj8uPwcbnflvyJIAB") | |
| project = rf.workspace("ntchindagiscard").project("id_card_annotation") | |
| model = project.version(2).model | |
| aws_access_key_id=os.getenv('AWS_ACCESS_KEY') | |
| aws_secret_access_key = os.getenv('AWS_SECRET_KEY') | |
| bucket_name='vvims' | |
| def read_text_img(img_path:str) -> str: | |
| """ | |
| Read text from images | |
| Args: | |
| - img_path: Path to the images in which the text will be extracted | |
| Returns: | |
| - text: The extracted text | |
| """ | |
| result = ocr_model.ocr(img_path) | |
| text = '' | |
| if result[0]: | |
| for res in result[0]: | |
| text += res[1][0] + ' ' | |
| return text | |
| def set_image_dpi(file_path): | |
| im = Image.open(file_path) | |
| length_x, width_y = im.size | |
| factor = min(1, float(1024.0 / length_x)) | |
| size = int(factor * length_x), int(factor * width_y) | |
| im_resized = im.resize(size, Image.LANCZOS) | |
| im_resized.save(file_path, dpi=(300, 300)) | |
| def crop_images_from_detections(image_path): | |
| results = {} | |
| detections = model.predict( image_path , confidence=20, overlap=50) | |
| for detection in detections: | |
| image_path = image_path | |
| image = Image.open(image_path) | |
| x1 = detection['x'] - detection['width'] / 2 | |
| x2 = detection['x'] + detection['width'] / 2 | |
| y1 = detection['y'] - detection['height'] / 2 | |
| y2 = detection['y'] + detection['height'] / 2 | |
| box = (x1, y1, x2, y2) | |
| cropped_image = image.crop(box) | |
| path= f"uploads/detections.jpg" | |
| cropped_image.save(path) | |
| set_image_dpi(path) | |
| text = read_text_img(path) | |
| results[detection['class']] = text | |
| return results | |
| # Function to upload a file to S3 | |
| def upload_to_s3( | |
| file_path, | |
| bucket_name=bucket_name, | |
| aws_access_key_id=aws_access_key_id, | |
| aws_secret_access_key=aws_secret_access_key, | |
| region_name='eu-north-1'): | |
| # Create an S3 client | |
| s3 = boto3.client('s3', | |
| aws_access_key_id=aws_access_key_id, | |
| aws_secret_access_key=aws_secret_access_key, | |
| region_name=region_name) | |
| # Get the current timestamp | |
| timestamp = int(time.time()) | |
| # Extract the file name from the file path | |
| file_name = file_path.split("/")[-1] | |
| # Concatenate the timestamp with the file name | |
| unique_file_name = f"{str(uuid.uuid4())}-{file_name}" | |
| print(f"[*] ---- File path ====> {file_path}") | |
| try: | |
| # Upload the file | |
| s3.upload_file(file_path, bucket_name, unique_file_name) | |
| # Construct the file URL | |
| file_url = f"https://{bucket_name}.s3.{region_name}.amazonaws.com/{unique_file_name}" | |
| return file_url | |
| except FileNotFoundError: | |
| print("The file was not found") | |
| return None | |
| except NoCredentialsError: | |
| print("Credentials not available") | |
| return None | |
| def detect_licensePlate(img: str) -> dict: | |
| image = Image.open(img) | |
| results = vehicle(source=img, cls=['car', 'bus', 'truck', 'motorcycle'], conf=0.7) | |
| names = vehicle.names | |
| classes=[] | |
| data=[] | |
| final = [] | |
| print("Results from car detetctions,:", results) | |
| if len(results[0]) >= 1: | |
| for result in results: | |
| for c in result.boxes.cls.numpy(): | |
| classes.append(names[int(c)]) | |
| boxes = result.boxes.xyxy | |
| for box in boxes.numpy(): | |
| x1, y1, x2, y2 = box[0], box[1], box[2], box[3] | |
| cropped_image = image.crop((x1, y1, x2, y2)) | |
| img_path = os.path.join('license', 'cars.jpg') | |
| cropped_image.save(img_path) | |
| num_plate = licence_dect(img_path) | |
| color_thief = ColorThief(img_path) | |
| dominant_color = color_thief.get_color(quality=1) | |
| data.append({"plate": num_plate, "color": dominant_color}) | |
| for i in range(len(classes)): | |
| if classes[i] in ['car', 'bus', 'truck', 'motorcycle']: | |
| final.append({"type": classes[i], "info": data[i]}) | |
| else: | |
| plate = licence_dect(img) | |
| color_thief = ColorThief(img) | |
| dominant_color = color_thief.get_color(quality=1) | |
| final = [ | |
| {"type": "", "info": {"plate": plate, "color": dominant_color }} | |
| ] | |
| print(final) | |
| return final | |
| def licence_dect(img: str) -> list: | |
| image = Image.open(img) | |
| results = detector(img) | |
| detections = [] | |
| try: | |
| for result in results: | |
| boxes = result.boxes.xyxy | |
| for box in boxes.numpy(): | |
| x1, y1, x2, y2 = box[0], box[1], box[2], box[3] | |
| # confidence = float(confidence) | |
| cropped_image = image.crop((x1, y1, x2, y2)) | |
| cropped_image.save(os.path.join('license', 'carplate.jpg')) | |
| txt = read_text_img('license/carplate.jpg') | |
| detections.append(txt) | |
| return txt | |
| except Exception as e: | |
| pass | |
| def lookup_user_metadata(index, encoder, serial): | |
| result = index.query( | |
| namespace="ns1", | |
| vector= encoder, | |
| top_k=2, | |
| include_values=True, | |
| include_metadata=True, | |
| filter={"serial": {"$eq": serial}} | |
| ) | |
| result_data = { | |
| "matches": [ | |
| { | |
| "id": match.id, | |
| "score": match.score, | |
| "metadata": match.metadata | |
| } | |
| for match in result.matches | |
| ] | |
| } | |
| return result_data | |
| def lookup_user(index, encoding_list): | |
| result = index.query( | |
| namespace="ns1", | |
| vector=encoding_list, | |
| top_k=1, | |
| include_metadata=True, | |
| ) | |
| # Convert the QueryResponse to a serializable format | |
| result_data = { | |
| "matches": [ | |
| { | |
| "id": match.id, | |
| "score": match.score, | |
| "metadata": match.metadata | |
| } | |
| for match in result.matches | |
| ] | |
| } | |
| return result_data | |
| def vehicle_dect(img: str) -> any: | |
| image = Image.open(img) | |
| results = vehicle(source=img, cls=['car', 'bus', 'truck', 'motorcycle'], conf=0.7) | |
| names = vehicle.names | |
| classes=[] | |
| colors = [] | |
| final = [] | |
| try: | |
| for result in results: | |
| boxes = result.boxes.xyxy | |
| logger.info(f"Classes {result.boxes.cls}") | |
| for c in result.boxes.cls.numpy(): | |
| classes.append(names[int(c)]) | |
| for box in boxes.numpy(): | |
| x1, y1, x2, y2 = box[0], box[1], box[2], box[3] | |
| cropped_image = image.crop((x1, y1, x2, y2)) | |
| img_path = os.path.join('license', 'cars.jpg') | |
| cropped_image.save(img_path) | |
| num_plate = licence_dect(img_path) | |
| color_thief = ColorThief(img_path) | |
| dominant_color = color_thief.get_color(quality=1) | |
| colors = {"color": dominant_color, "plate": num_plate} | |
| logger.info("[*]---- Color detecttion : f{color}") | |
| logger.info("[*]---- Classes detecttion : f{classes}") | |
| for i in range(len(classes)): | |
| final.append({ "type": classes[i], "car_data" : colors[i]}) | |
| logger.info("[*]---- Final detecttion : f{final}") | |
| return final | |
| except Exception as e: | |
| # raise(e) | |
| pass | |