Spaces:
Running
Running
import os | |
import tempfile | |
import uuid | |
from fastapi import File | |
from paddleocr import PaddleOCR | |
import spacy | |
from ultralytics import YOLO | |
from PIL import Image | |
import boto3 | |
import time | |
from botocore.exceptions import NoCredentialsError | |
from dotenv import load_dotenv | |
from colorthief import ColorThief | |
import logging | |
from roboflow import Roboflow | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, # Set the logging level | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Log message format | |
) | |
# Create a logger | |
logger = logging.getLogger(__name__) | |
load_dotenv() | |
ocr_model = PaddleOCR(lang='en') | |
# nlp_ner = spacy.load("en_pipeline") | |
detector = YOLO('best.pt') | |
vehicle = YOLO('yolov8x.pt') | |
rf = Roboflow(api_key="P4usj8uPwcbnflvyJIAB") | |
project = rf.workspace("ntchindagiscard").project("id_card_annotation") | |
model = project.version(2).model | |
aws_access_key_id=os.getenv('AWS_ACCESS_KEY') | |
aws_secret_access_key = os.getenv('AWS_SECRET_KEY') | |
bucket_name='vvims' | |
def read_text_img(img_path:str) -> str: | |
""" | |
Read text from images | |
Args: | |
- img_path: Path to the images in which the text will be extracted | |
Returns: | |
- text: The extracted text | |
""" | |
result = ocr_model.ocr(img_path) | |
text = '' | |
if result[0]: | |
for res in result[0]: | |
text += res[1][0] + ' ' | |
return text | |
def set_image_dpi(file_path): | |
im = Image.open(file_path) | |
length_x, width_y = im.size | |
factor = min(1, float(1024.0 / length_x)) | |
size = int(factor * length_x), int(factor * width_y) | |
im_resized = im.resize(size, Image.LANCZOS) | |
im_resized.save(file_path, dpi=(300, 300)) | |
def crop_images_from_detections(image_path): | |
results = {} | |
detections = model.predict( image_path , confidence=20, overlap=50) | |
for detection in detections: | |
image_path = image_path | |
image = Image.open(image_path) | |
x1 = detection['x'] - detection['width'] / 2 | |
x2 = detection['x'] + detection['width'] / 2 | |
y1 = detection['y'] - detection['height'] / 2 | |
y2 = detection['y'] + detection['height'] / 2 | |
box = (x1, y1, x2, y2) | |
cropped_image = image.crop(box) | |
path= f"uploads/detections.jpg" | |
cropped_image.save(path) | |
set_image_dpi(path) | |
text = read_text_img(path) | |
results[detection['class']] = text | |
return results | |
# Function to upload a file to S3 | |
def upload_to_s3( | |
file_path, | |
bucket_name=bucket_name, | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key, | |
region_name='eu-north-1'): | |
# Create an S3 client | |
s3 = boto3.client('s3', | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key, | |
region_name=region_name) | |
# Get the current timestamp | |
timestamp = int(time.time()) | |
# Extract the file name from the file path | |
file_name = file_path.split("/")[-1] | |
# Concatenate the timestamp with the file name | |
unique_file_name = f"{str(uuid.uuid4())}-{file_name}" | |
print(f"[*] ---- File path ====> {file_path}") | |
try: | |
# Upload the file | |
s3.upload_file(file_path, bucket_name, unique_file_name) | |
# Construct the file URL | |
file_url = f"https://{bucket_name}.s3.{region_name}.amazonaws.com/{unique_file_name}" | |
return file_url | |
except FileNotFoundError: | |
print("The file was not found") | |
return None | |
except NoCredentialsError: | |
print("Credentials not available") | |
return None | |
def detect_licensePlate(img: str) -> dict: | |
image = Image.open(img) | |
results = vehicle(source=img, cls=['car', 'bus', 'truck', 'motorcycle'], conf=0.7) | |
names = vehicle.names | |
classes=[] | |
data=[] | |
final = [] | |
print("Results from car detetctions,:", results) | |
if len(results[0]) >= 1: | |
for result in results: | |
for c in result.boxes.cls.numpy(): | |
classes.append(names[int(c)]) | |
boxes = result.boxes.xyxy | |
for box in boxes.numpy(): | |
x1, y1, x2, y2 = box[0], box[1], box[2], box[3] | |
cropped_image = image.crop((x1, y1, x2, y2)) | |
img_path = os.path.join('license', 'cars.jpg') | |
cropped_image.save(img_path) | |
num_plate = licence_dect(img_path) | |
color_thief = ColorThief(img_path) | |
dominant_color = color_thief.get_color(quality=1) | |
data.append({"plate": num_plate, "color": dominant_color}) | |
for i in range(len(classes)): | |
if classes[i] in ['car', 'bus', 'truck', 'motorcycle']: | |
final.append({"type": classes[i], "info": data[i]}) | |
else: | |
plate = licence_dect(img) | |
color_thief = ColorThief(img) | |
dominant_color = color_thief.get_color(quality=1) | |
final = [ | |
{"type": "", "info": {"plate": plate, "color": dominant_color }} | |
] | |
print(final) | |
return final | |
def licence_dect(img: str) -> list: | |
image = Image.open(img) | |
results = detector(img) | |
detections = [] | |
try: | |
for result in results: | |
boxes = result.boxes.xyxy | |
for box in boxes.numpy(): | |
x1, y1, x2, y2 = box[0], box[1], box[2], box[3] | |
# confidence = float(confidence) | |
cropped_image = image.crop((x1, y1, x2, y2)) | |
cropped_image.save(os.path.join('license', 'carplate.jpg')) | |
txt = read_text_img('license/carplate.jpg') | |
detections.append(txt) | |
return txt | |
except Exception as e: | |
pass | |
def lookup_user_metadata(index, encoder, serial): | |
result = index.query( | |
namespace="ns1", | |
vector= encoder, | |
top_k=2, | |
include_values=True, | |
include_metadata=True, | |
filter={"serial": {"$eq": serial}} | |
) | |
result_data = { | |
"matches": [ | |
{ | |
"id": match.id, | |
"score": match.score, | |
"metadata": match.metadata | |
} | |
for match in result.matches | |
] | |
} | |
return result_data | |
def lookup_user(index, encoding_list): | |
result = index.query( | |
namespace="ns1", | |
vector=encoding_list, | |
top_k=1, | |
include_metadata=True, | |
) | |
# Convert the QueryResponse to a serializable format | |
result_data = { | |
"matches": [ | |
{ | |
"id": match.id, | |
"score": match.score, | |
"metadata": match.metadata | |
} | |
for match in result.matches | |
] | |
} | |
return result_data | |
def vehicle_dect(img: str) -> any: | |
image = Image.open(img) | |
results = vehicle(source=img, cls=['car', 'bus', 'truck', 'motorcycle'], conf=0.7) | |
names = vehicle.names | |
classes=[] | |
colors = [] | |
final = [] | |
try: | |
for result in results: | |
boxes = result.boxes.xyxy | |
logger.info(f"Classes {result.boxes.cls}") | |
for c in result.boxes.cls.numpy(): | |
classes.append(names[int(c)]) | |
for box in boxes.numpy(): | |
x1, y1, x2, y2 = box[0], box[1], box[2], box[3] | |
cropped_image = image.crop((x1, y1, x2, y2)) | |
img_path = os.path.join('license', 'cars.jpg') | |
cropped_image.save(img_path) | |
num_plate = licence_dect(img_path) | |
color_thief = ColorThief(img_path) | |
dominant_color = color_thief.get_color(quality=1) | |
colors = {"color": dominant_color, "plate": num_plate} | |
logger.info("[*]---- Color detecttion : f{color}") | |
logger.info("[*]---- Classes detecttion : f{classes}") | |
for i in range(len(classes)): | |
final.append({ "type": classes[i], "car_data" : colors[i]}) | |
logger.info("[*]---- Final detecttion : f{final}") | |
return final | |
except Exception as e: | |
# raise(e) | |
pass | |