|
import gradio as gr |
|
from ultralytics import YOLO |
|
import cv2 |
|
import os |
|
import pymysql |
|
import boto3 |
|
from io import BytesIO |
|
import io |
|
from PIL import Image |
|
from transformers import AutoTokenizer, AutoModel |
|
import torch |
|
import numpy as np |
|
|
|
|
|
aws_access_key = "AKIAXECLNGBK5SXL2CER" |
|
aws_secret_key = "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" |
|
aws_region = "eu-west-3" |
|
|
|
|
|
s3 = boto3.client( |
|
's3', |
|
aws_access_key_id=aws_access_key, |
|
aws_secret_access_key=aws_secret_key, |
|
region_name=aws_region |
|
) |
|
|
|
S3_BUCKET_NAME = 'savingbuckett5' |
|
S3_FOLDER = 'Video-Processing/' |
|
|
|
|
|
model = YOLO("./YOLO_Model_v5.pt") |
|
|
|
RDS_HOST = "database-2.cnqamusmkwon.eu-north-1.rds.amazonaws.com" |
|
RDS_PORT = 3306 |
|
DB_USER = "root" |
|
DB_PASSWORD = "mkmk162345" |
|
DB_NAME = "traffic" |
|
|
|
def get_connection(): |
|
return pymysql.connect( |
|
host=RDS_HOST, |
|
port=RDS_PORT, |
|
user=DB_USER, |
|
password=DB_PASSWORD, |
|
database=DB_NAME, |
|
cursorclass=pymysql.cursors.DictCursor |
|
) |
|
|
|
def increment_road(id_value, increment_value, is_in=True): |
|
try: |
|
connection = get_connection() |
|
with connection.cursor() as cursor: |
|
select_sql = "SELECT id, road_in, road_out, road_current FROM traffic_counter_road WHERE id = %s" |
|
cursor.execute(select_sql, (id_value,)) |
|
result = cursor.fetchone() |
|
|
|
if result: |
|
with connection.cursor() as cursor: |
|
if is_in: |
|
new_road_in = result['road_in'] + increment_value |
|
new_road_current = new_road_in - result['road_out'] |
|
update_sql = """ |
|
UPDATE traffic_counter_road |
|
SET road_in = %s, road_current = %s |
|
WHERE id = %s |
|
""" |
|
cursor.execute(update_sql, (new_road_in, new_road_current, id_value)) |
|
else: |
|
new_road_out = result['road_out'] + increment_value |
|
new_road_current = result['road_in'] - new_road_out |
|
update_sql = """ |
|
UPDATE traffic_counter_road |
|
SET road_out = %s, road_current = %s |
|
WHERE id = %s |
|
""" |
|
cursor.execute(update_sql, (new_road_out, new_road_current, id_value)) |
|
|
|
connection.commit() |
|
except pymysql.MySQLError as e: |
|
print(f"Error: {e}") |
|
finally: |
|
if connection: |
|
connection.close() |
|
|
|
def upload_frame_to_s3(frame, frame_number): |
|
|
|
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
pil_image = Image.fromarray(image) |
|
|
|
|
|
buffer = BytesIO() |
|
pil_image.save(buffer, format="JPEG") |
|
buffer.seek(0) |
|
|
|
|
|
s3_key = f"{S3_FOLDER}frame_{frame_number}.jpg" |
|
|
|
|
|
s3.upload_fileobj(buffer, S3_BUCKET_NAME, s3_key) |
|
|
|
print(f"Uploaded frame {frame_number} to S3 at {S3_BUCKET_NAME}/{s3_key}") |
|
|
|
def process_video(video_path, count_type): |
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
raise ValueError("Error opening video file.") |
|
|
|
box = (1650, 900, 2816, 1500) |
|
counter = 0 |
|
License_plate = set() |
|
class_names = ['License Plate', 'Car', 'Motorcycle', 'Truck'] |
|
|
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
results = model.track(frame, persist=True) |
|
for result in results: |
|
for boxes in result.boxes: |
|
bbox = boxes.xyxy[0].cpu().numpy() |
|
class_id = int(boxes.cls[0].cpu().numpy()) |
|
conf = boxes.conf[0].cpu().numpy() |
|
|
|
id = int(boxes.id[0].cpu().numpy()) if boxes.id is not None else -1 |
|
|
|
x1, y1, x2, y2 = map(int, bbox) |
|
cropped_object = frame[y1:y2, x1:x2] |
|
|
|
|
|
|
|
|
|
|
|
if x1 >= box[0] and y1 >= box[1] and x2 <= box[2] and y2 <= box[3]: |
|
if id not in License_plate: |
|
License_plate.add(id) |
|
if count_type == "in": |
|
print("It's counting IN") |
|
increment_road(1, 1, is_in=True) |
|
elif count_type == "out": |
|
print("It's counting OUT") |
|
increment_road(1, 1, is_in=False) |
|
|
|
print("It's now uploading") |
|
upload_frame_to_s3(cropped_object, counter) |
|
counter += 1 |
|
|
|
|
|
def insert_data(license_value): |
|
try: |
|
connection = get_connection() |
|
with connection.cursor() as cursor: |
|
|
|
insert_sql = """ |
|
INSERT INTO license_plates (license_plate) |
|
VALUES (%s) |
|
""" |
|
cursor.execute(insert_sql, (license_value)) |
|
|
|
connection.commit() |
|
|
|
except pymysql.MySQLError as e: |
|
print(f"Error: {e}") |
|
|
|
finally: |
|
if connection: |
|
connection.close() |
|
|
|
|
|
def count_in(video): |
|
process_video(video, count_type="in") |
|
return "Processed vehicles counting 'in' successfully." |
|
|
|
|
|
def count_out(video): |
|
process_video(video, count_type="out") |
|
return "Processed vehicles counting 'out' successfully." |
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
def ocr(image): |
|
tokenizer = AutoTokenizer.from_pretrained('ucaslcl/GOT-OCR2_0', trust_remote_code=True) |
|
model = AutoModel.from_pretrained('ucaslcl/GOT-OCR2_0', trust_remote_code=True, low_cpu_mem_usage=True, use_safetensors=True, pad_token_id=tokenizer.eos_token_id).to(device) |
|
|
|
if isinstance(image, np.ndarray): |
|
image = Image.fromarray(image) |
|
|
|
elif not isinstance(image, Image.Image): |
|
raise ValueError("Input must be a numpy.ndarray or a PIL.Image.") |
|
|
|
|
|
image_bytes = io.BytesIO() |
|
image.save(image_bytes, format='JPEG') |
|
image_bytes.seek(0) |
|
|
|
|
|
res = model.chat(tokenizer, image_bytes, ocr_type='ocr') |
|
|
|
|
|
return res |
|
|
|
|
|
|
|
iface_in = gr.Interface( |
|
fn=count_in, |
|
inputs="video", |
|
outputs=None, |
|
api_name="count_in", |
|
title="YOLO Video Object Detection (Count In)", |
|
description="Upload a video to count vehicles 'in' and save frames to S3." |
|
) |
|
|
|
iface_out = gr.Interface( |
|
fn=count_out, |
|
inputs="video", |
|
outputs=None, |
|
api_name="count_out", |
|
title="YOLO Video Object Detection (Count Out)", |
|
description="Upload a video to count vehicles 'out' and save frames to S3." |
|
) |
|
|
|
iface_ocr = gr.Interface( |
|
fn=ocr, |
|
inputs=gr.Image(type="numpy", label="Upload Image"), |
|
outputs=gr.Textbox(label="Extracted Text"), |
|
api_name="ocr", |
|
title="OCR Image Text Extraction", |
|
description="Upload an image and extract text using the OCR model." |
|
) |
|
|
|
|
|
iface = gr.TabbedInterface([iface_in, iface_out, iface_ocr], ["Count In", "Count Out", "OCR"]) |
|
|
|
|
|
iface.launch() |
|
|