Spaces:
				
			
			
	
			
			
		Runtime error
		
	
	
	
			
			
	
	
	
	
		
		
		Runtime error
		
	| import fileinput | |
| import itertools | |
| import os | |
| import re | |
| from copy import deepcopy | |
| from operator import itemgetter | |
| from pathlib import Path | |
| from typing import Union | |
| import cv2 # type: ignore | |
| import gradio as gr # type: ignore | |
| import numpy as np | |
| import torch | |
| from deep_sort_realtime.deepsort_tracker import DeepSort # type: ignore | |
| from paddleocr import PaddleOCR # type: ignore | |
| if not os.path.isfile("weights.pt"): | |
| weights_url = "https://archive.org/download/anpr_weights/weights.pt" | |
| os.system(f"wget {weights_url}") | |
| if not os.path.isdir("examples"): | |
| examples_url = "https://archive.org/download/anpr_examples_202208/examples.tar.gz" | |
| os.system(f"wget {examples_url}") | |
| os.system("tar -xvf examples.tar.gz") | |
| os.system("rm -rf examples.tar.gz") | |
| def prepend_text(filename: Union[str, Path], text: str): | |
| with fileinput.input(filename, inplace=True) as file: | |
| for line in file: | |
| if file.isfirstline(): | |
| print(text) | |
| print(line, end="") | |
| if not os.path.isdir("yolov7"): | |
| yolov7_repo_url = "https://github.com/WongKinYiu/yolov7" | |
| os.system(f"git clone {yolov7_repo_url}") | |
| # Fix import errors | |
| for file in [ | |
| "yolov7/models/common.py", | |
| "yolov7/models/experimental.py", | |
| "yolov7/models/yolo.py", | |
| "yolov7/utils/datasets.py", | |
| ]: | |
| prepend_text(file, "import sys\nsys.path.insert(0, './yolov7')") | |
| from yolov7.models.experimental import attempt_load # type: ignore | |
| from yolov7.utils.datasets import letterbox # type: ignore | |
| from yolov7.utils.general import check_img_size # type: ignore | |
| from yolov7.utils.general import non_max_suppression # type: ignore | |
| from yolov7.utils.general import scale_coords # type: ignore | |
| from yolov7.utils.plots import plot_one_box # type: ignore | |
| from yolov7.utils.torch_utils import TracedModel, select_device # type: ignore | |
| weights = "weights.pt" | |
| device_id = "cpu" | |
| image_size = 640 | |
| trace = True | |
| # Initialize | |
| device = select_device(device_id) | |
| half = device.type != "cpu" # half precision only supported on CUDA | |
| # Load model | |
| model = attempt_load(weights, map_location=device) # load FP32 model | |
| stride = int(model.stride.max()) # model stride | |
| imgsz = check_img_size(image_size, s=stride) # check img_size | |
| if trace: | |
| model = TracedModel(model, device, image_size) | |
| if half: | |
| model.half() # to FP16 | |
| if device.type != "cpu": | |
| model( | |
| torch.zeros(1, 3, imgsz, imgsz).to(device).type_as(next(model.parameters())) | |
| ) # run once | |
| model.eval() | |
| # Load OCR | |
| paddle = PaddleOCR(lang="en") | |
| def detect_plate(source_image): | |
| # Padded resize | |
| img_size = 640 | |
| stride = 32 | |
| img = letterbox(source_image, img_size, stride=stride)[0] | |
| # Convert | |
| img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416 | |
| img = np.ascontiguousarray(img) | |
| img = torch.from_numpy(img).to(device) | |
| img = img.half() if half else img.float() # uint8 to fp16/32 | |
| img /= 255.0 # 0 - 255 to 0.0 - 1.0 | |
| if img.ndimension() == 3: | |
| img = img.unsqueeze(0) | |
| with torch.no_grad(): | |
| # Inference | |
| pred = model(img, augment=True)[0] | |
| # Apply NMS | |
| pred = non_max_suppression(pred, 0.25, 0.45, classes=0, agnostic=True) | |
| plate_detections = [] | |
| det_confidences = [] | |
| # Process detections | |
| for i, det in enumerate(pred): # detections per image | |
| if len(det): | |
| # Rescale boxes from img_size to source image size | |
| det[:, :4] = scale_coords( | |
| img.shape[2:], det[:, :4], source_image.shape | |
| ).round() | |
| # Return results | |
| for *xyxy, conf, cls in reversed(det): | |
| coords = [ | |
| int(position) | |
| for position in (torch.tensor(xyxy).view(1, 4)).tolist()[0] | |
| ] | |
| plate_detections.append(coords) | |
| det_confidences.append(conf.item()) | |
| return plate_detections, det_confidences | |
| def unsharp_mask(image, kernel_size=(5, 5), sigma=1.0, amount=2.0, threshold=0): | |
| blurred = cv2.GaussianBlur(image, kernel_size, sigma) | |
| sharpened = float(amount + 1) * image - float(amount) * blurred | |
| sharpened = np.maximum(sharpened, np.zeros(sharpened.shape)) | |
| sharpened = np.minimum(sharpened, 255 * np.ones(sharpened.shape)) | |
| sharpened = sharpened.round().astype(np.uint8) | |
| if threshold > 0: | |
| low_contrast_mask = np.absolute(image - blurred) < threshold | |
| np.copyto(sharpened, image, where=low_contrast_mask) | |
| return sharpened | |
| def crop(image, coord): | |
| cropped_image = image[int(coord[1]) : int(coord[3]), int(coord[0]) : int(coord[2])] | |
| return cropped_image | |
| def ocr_plate(plate_region): | |
| # Image pre-processing for more accurate OCR | |
| rescaled = cv2.resize( | |
| plate_region, None, fx=1.2, fy=1.2, interpolation=cv2.INTER_CUBIC | |
| ) | |
| grayscale = cv2.cvtColor(rescaled, cv2.COLOR_BGR2GRAY) | |
| kernel = np.ones((1, 1), np.uint8) | |
| dilated = cv2.dilate(grayscale, kernel, iterations=1) | |
| eroded = cv2.erode(dilated, kernel, iterations=1) | |
| sharpened = unsharp_mask(eroded) | |
| # OCR the preprocessed image | |
| results = paddle.ocr(sharpened, det=False, cls=False) | |
| flattened = list(itertools.chain.from_iterable(results)) | |
| plate_text, ocr_confidence = max(flattened, key=itemgetter(1), default=("", 0)) | |
| # Filter out anything but uppercase letters, digits, hypens and whitespace. | |
| plate_text = re.sub(r"[^-A-Z0-9 ]", r"", plate_text).strip() | |
| if ocr_confidence == "nan": | |
| ocr_confidence = 0 | |
| return plate_text, ocr_confidence | |
| def get_plates_from_image(input): | |
| if input is None: | |
| return None | |
| plate_detections, det_confidences = detect_plate(input) | |
| plate_texts = [] | |
| ocr_confidences = [] | |
| detected_image = deepcopy(input) | |
| for coords in plate_detections: | |
| plate_region = crop(input, coords) | |
| plate_text, ocr_confidence = ocr_plate(plate_region) | |
| if ocr_confidence == 0: # If OCR confidence is 0, skip this detection | |
| continue | |
| plate_texts.append(plate_text) | |
| ocr_confidences.append(ocr_confidence) | |
| plot_one_box( | |
| coords, | |
| detected_image, | |
| label=plate_text, | |
| color=[0, 150, 255], | |
| line_thickness=2, | |
| ) | |
| return detected_image | |
| def pascal_voc_to_coco(x1y1x2y2): | |
| x1, y1, x2, y2 = x1y1x2y2 | |
| return [x1, y1, x2 - x1, y2 - y1] | |
| def get_best_ocr(preds, rec_conf, ocr_res, track_id): | |
| for info in preds: | |
| # Check if it is current track id | |
| if info["track_id"] == track_id: | |
| # Check if the ocr confidenence is maximum or not | |
| if info["ocr_conf"] < rec_conf: | |
| info["ocr_conf"] = rec_conf | |
| info["ocr_txt"] = ocr_res | |
| else: | |
| rec_conf = info["ocr_conf"] | |
| ocr_res = info["ocr_txt"] | |
| break | |
| return preds, rec_conf, ocr_res | |
| def get_plates_from_video(source): | |
| if source is None: | |
| return None | |
| # Create a VideoCapture object | |
| video = cv2.VideoCapture(source) | |
| # Default resolutions of the frame are obtained. The default resolutions are system dependent. | |
| # We convert the resolutions from float to integer. | |
| width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = video.get(cv2.CAP_PROP_FPS) | |
| # Define the codec and create VideoWriter object. | |
| temp = f"{Path(source).stem}_temp{Path(source).suffix}" | |
| export = cv2.VideoWriter( | |
| temp, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height) | |
| ) | |
| # Intializing tracker | |
| tracker = DeepSort(embedder_gpu=False) | |
| # Initializing some helper variables. | |
| preds = [] | |
| total_obj = 0 | |
| while True: | |
| ret, frame = video.read() | |
| if ret == True: | |
| # Run the ANPR algorithm | |
| bboxes, scores = detect_plate(frame) | |
| # Convert Pascal VOC detections to COCO | |
| bboxes = list(map(lambda bbox: pascal_voc_to_coco(bbox), bboxes)) | |
| if len(bboxes) > 0: | |
| # Storing all the required info in a list. | |
| detections = [ | |
| (bbox, score, "number_plate") for bbox, score in zip(bboxes, scores) | |
| ] | |
| # Applying tracker. | |
| # The tracker code flow: kalman filter -> target association(using hungarian algorithm) and appearance descriptor. | |
| tracks = tracker.update_tracks(detections, frame=frame) | |
| # Checking if tracks exist. | |
| for track in tracks: | |
| if not track.is_confirmed() or track.time_since_update > 1: | |
| continue | |
| # Changing track bbox to top left, bottom right coordinates | |
| bbox = [int(position) for position in list(track.to_tlbr())] | |
| for i in range(len(bbox)): | |
| if bbox[i] < 0: | |
| bbox[i] = 0 | |
| # Cropping the license plate and applying the OCR. | |
| plate_region = crop(frame, bbox) | |
| plate_text, ocr_confidence = ocr_plate(plate_region) | |
| # Storing the ocr output for corresponding track id. | |
| output_frame = { | |
| "track_id": track.track_id, | |
| "ocr_txt": plate_text, | |
| "ocr_conf": ocr_confidence, | |
| } | |
| # Appending track_id to list only if it does not exist in the list | |
| # else looking for the current track in the list and updating the highest confidence of it. | |
| if track.track_id not in list( | |
| set(pred["track_id"] for pred in preds) | |
| ): | |
| total_obj += 1 | |
| preds.append(output_frame) | |
| else: | |
| preds, ocr_confidence, plate_text = get_best_ocr( | |
| preds, ocr_confidence, plate_text, track.track_id | |
| ) | |
| # Plotting the prediction. | |
| plot_one_box( | |
| bbox, | |
| frame, | |
| label=f"{str(track.track_id)}. {plate_text}", | |
| color=[255, 150, 0], | |
| line_thickness=3, | |
| ) | |
| # Write the frame into the output file | |
| export.write(frame) | |
| else: | |
| break | |
| # When everything done, release the video capture and video write objects | |
| video.release() | |
| export.release() | |
| # Compressing the output video for smaller size and web compatibility. | |
| output = f"{Path(source).stem}_detected{Path(source).suffix}" | |
| os.system( | |
| f"ffmpeg -y -i {temp} -c:v libx264 -b:v 5000k -minrate 1000k -maxrate 8000k -pass 1 -c:a aac -f mp4 /dev/null && ffmpeg -i {temp} -c:v libx264 -b:v 5000k -minrate 1000k -maxrate 8000k -pass 2 -c:a aac -movflags faststart {output}" | |
| ) | |
| os.system(f"rm -rf {temp} ffmpeg2pass-0.log ffmpeg2pass-0.log.mbtree") | |
| return output | |
| with gr.Blocks() as demo: | |
| gr.Markdown('### <h3 align="center">Automatic Number Plate Recognition</h3>') | |
| gr.Markdown( | |
| "This AI was trained to detect and recognize number plates on vehicles." | |
| ) | |
| with gr.Tabs(): | |
| with gr.TabItem("Image"): | |
| with gr.Row(): | |
| image_input = gr.Image() | |
| image_output = gr.Image() | |
| image_input.change( | |
| get_plates_from_image, inputs=image_input, outputs=image_output | |
| ) | |
| gr.Examples( | |
| [ | |
| ["examples/test_image_1.jpg"], | |
| ["examples/test_image_2.jpg"], | |
| ["examples/test_image_3.png"], | |
| ["examples/test_image_4.jpeg"], | |
| ], | |
| [image_input], | |
| image_output, | |
| get_plates_from_image, | |
| cache_examples=True, | |
| ) | |
| with gr.TabItem("Video"): | |
| with gr.Row(): | |
| video_input = gr.Video(format="mp4") | |
| video_output = gr.Video(format="mp4") | |
| video_input.change( | |
| get_plates_from_video, inputs=video_input, outputs=video_output | |
| ) | |
| gr.Examples( | |
| [["examples/test_video_1.mp4"]], | |
| [video_input], | |
| video_output, | |
| get_plates_from_video, | |
| cache_examples=True, | |
| ) | |
| gr.Markdown("[@itsyoboieltr](https://github.com/itsyoboieltr)") | |
| demo.launch() | |
