Spaces:
Build error
Build error
# import torch | |
# import torch.nn.functional as F | |
# from torchvision import transforms | |
from PIL import Image | |
import numpy as np | |
from numpy import dot | |
from numpy.linalg import norm | |
import onnx, os, time, onnxruntime | |
import pandas as pd | |
import threading | |
# import queue | |
import cv2 | |
import av | |
import streamlit as st | |
from streamlit_webrtc import ( | |
ClientSettings, | |
VideoProcessorBase, | |
WebRtcMode, | |
webrtc_streamer, | |
) | |
import args | |
# def to_numpy(tensor): | |
# return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy() | |
def get_image(x): | |
return x.split(', ')[0] | |
# Transform image to ToTensor | |
def transform_image(image, IMG=True): | |
# transform = transforms.Compose([ | |
# transforms.Resize((224, 224)), | |
# transforms.ToTensor(), | |
# transforms.Normalize((0.485, 0.456, 0.4065), (0.229, 0.224, 0.225)), | |
# ]) | |
if IMG: | |
image = np.asarray(Image.open(image)) | |
# -------------- RESIZE USING CV2 --------------------- | |
image = cv2.resize(image, dsize=(224, 224)) | |
image = np.transpose(image, (2, 0, 1)) | |
# image = (image/255-np.expand_dims(np.array([0.485, 0.456, 0.4065]),axis = (1,2)))/np.expand_dims(np.array([0.229, 0.224, 0.225]),axis = (1,2)) | |
image = (image / 255 - np.array(args.MEAN)) / np.array(args.STD) | |
img_transformed = np.expand_dims(image.astype(np.float32), axis=0) | |
# x = torch.from_numpy(image.astype(np.float32)) | |
# x = torch.transpose(x, 2, 0) # shape [3, 224, 224] | |
# -------------- RESIZE USING CV2 --------------------- | |
# img_transformed = [] | |
# for _ in range(1): | |
# img_transformed.append(x) | |
# img_transformed = torch.stack(img_transformed) # shape [1, 3, 224, 224] | |
else: | |
# -------------- RESIZE USING CV2 --------------------- | |
image = cv2.resize(image, dsize=(224, 224)) | |
image = np.transpose(image, (2, 0, 1)) | |
# image = (image/255-np.expand_dims(np.array([0.485, 0.456, 0.4065]),axis = (1,2)))/np.expand_dims(np.array([0.229, 0.224, 0.225]),axis = (1,2)) | |
image = (image / 255 - np.array(args.MEAN)) / np.array(args.STD) | |
img_transformed = np.expand_dims(image.astype(np.float32), axis=0) | |
# x = torch.from_numpy(image.astype(np.float32)) | |
# x = torch.transpose(x, 2, 0) | |
# -------------- RESIZE USING CV2 --------------------- | |
# img_transformed = [] | |
# img_transformed.append(x) | |
# img_transformed = torch.stack(img_transformed) | |
return img_transformed | |
# predict multi-level classification | |
def get_classification(image_tensor, df_train, sub_test_list, embeddings, | |
ort_session, input_name, confidence | |
): | |
# Prediction time | |
start = time.time() | |
# ort_inputs = {input_name: to_numpy(image_tensor)} | |
ort_inputs = {input_name: image_tensor} | |
pred, em = ort_session.run(None, ort_inputs) | |
if pred.max(axis=1) > confidence: # threshold to select of item is car part or not, Yes if > 0.5 | |
# Compute kNN (using Cosine) | |
# knn = torch.nn.CosineSimilarity(dim = 1)(torch.tensor(em), embeddings).topk(1, largest=True) | |
knn = np.array( | |
[dot((em), embeddings[i]) / (norm(em) * norm(embeddings[i])) for i in range(embeddings.shape[0])]).flatten() | |
knn = np.argsort(knn)[-1] | |
# maker = 'Maker: '+str(df_train.iloc[knn.indices.item(), 0]) | |
# model = str(df_train.iloc[knn.indices.item(), 1]) | |
# vehicle = str(df_train.iloc[knn.indices.item(), 2]) | |
# year = str(df_train.iloc[knn.indices.item(), 3]) | |
# part = 'Part: '+str(df_train.iloc[knn.indices.item(), 4]) | |
maker = 'Maker: ' + str(df_train.iloc[knn, 0]) | |
model = str(df_train.iloc[knn, 1]) | |
if model == 'nan': | |
model = 'Model: No information' | |
else: | |
model = 'Model: ' + model | |
vehicle = str(df_train.iloc[knn, 2]) | |
if vehicle == 'nan': | |
vehicle = 'Vehicle: No information' | |
else: | |
vehicle = 'Vehicle: ' + vehicle | |
year = str(df_train.iloc[knn, 3]) | |
if year == 'nan': | |
year = 'Year: No information' | |
else: | |
year = 'Year: ' + year | |
part = 'Part: ' + str(df_train.iloc[knn, 4]) | |
predict_time = 'Predict time: ' + str(round(time.time() - start, 4)) + ' seconds' | |
# Similarity score | |
sim_score = 'Confidence: ' + str(round(pred.max(axis=1).item() * 100, 2)) + '%' | |
else: | |
maker = 'This is not car part !' | |
model = vehicle = year = part = predict_time = sim_score = None | |
return {'maker': maker, 'model': model, 'vehicle': vehicle, 'year': year, 'part': part, | |
'predict_time': predict_time, 'sim_score': sim_score} | |
def get_classification_frame(image_tensor, df_train, sub_test_list, embeddings, | |
ort_session, input_name | |
): | |
# ort_inputs = {input_name: to_numpy(image_tensor)} | |
ort_inputs = {input_name: image_tensor} | |
pred, em = ort_session.run(None, ort_inputs) | |
if pred.max(axis=1) > args.VIDEO_CONFIDENCE: | |
# knn = torch.nn.CosineSimilarity(dim = 1)(torch.tensor(em), embeddings).topk(1, largest=True) | |
# part = str(df_train.iloc[knn.indices.item(), 4]) | |
knn = np.array( | |
[dot((em), embeddings[i]) / (norm(em) * norm(embeddings[i])) for i in range(embeddings.shape[0])]).flatten() | |
knn = np.argsort(knn)[-1] | |
part = str(df_train.iloc[knn, 4]) | |
# Similarity score | |
sim_score = str(round(pred.max(axis=1).item() * 100, 2)) + '%' | |
else: | |
part = 'No part detected' | |
sim_score = '' | |
return {'part_name': part, 'sim_score': sim_score} | |
# predict similarity | |
def get_similarity(image_tensor, df_train, sub_test_list, embeddings, | |
ort_session, input_name | |
): | |
start = time.time() | |
# ort_inputs = {input_name: to_numpy(image_tensor)} | |
ort_inputs = {input_name: image_tensor} | |
pred, em = ort_session.run(None, ort_inputs) | |
# Compute kNN (using Cosine) | |
# knn = torch.nn.CosineSimilarity(dim = 1)(torch.tensor(em), embeddings).topk(6, largest=True) | |
# idx = knn.indices.numpy() | |
knn = np.array( | |
[dot((em), embeddings[i]) / (norm(em) * norm(embeddings[i])) for i in range(embeddings.shape[0])]).flatten() | |
idx = np.argsort(knn)[-6:] | |
predict_time = 'Predict time: ' + str(round(time.time() - start, 4)) + ' seconds' | |
images_path = 'Test_set' | |
images = [os.path.join(images_path, sub_test_list[i]) for i in idx] | |
# sub_test_list | |
return {'images': images, 'predict_time': predict_time} | |
# -------------------------------------------------------------------------------------------- | |
# IMAGE INPUT | |
# -------------------------------------------------------------------------------------------- | |
content_images_dict = { | |
name: os.path.join(args.IMAGES_PATH, filee) for name, filee in | |
zip(args.CONTENT_IMAGES_NAME, args.CONTENT_IMAGES_FILE) | |
} | |
def show_original(): | |
""" Show Uploaded or Example image before prediction | |
Returns: | |
------- | |
content_file: str | |
path to image | |
""" | |
if st.sidebar.checkbox('Upload', value=True, help='Select Upload to browse image from local machine'): | |
content_file = st.sidebar.file_uploader("", type=["png", "jpg", "jpeg"]) | |
else: | |
content_name = st.sidebar.selectbox("or Choose an example Image below", args.CONTENT_IMAGES_NAME) | |
content_file = content_images_dict[content_name] | |
col1, col2 = st.columns(2) | |
with col1: | |
# col1.markdown('## Target image') | |
if content_file: | |
col1.write('') | |
col1.image(content_file, channels='BGR', width=300, clamp=True, caption='Input image') | |
return content_file, col2 | |
def image_input(content_file, df_train, sub_test_list, embeddings, ort_session, input_name, col2): | |
# Set confidence level | |
confidence_threshold = st.slider( | |
"Confidence threshold", 0.0, 1.0, args.DEFAULT_CONFIDENCE_THRESHOLD, 0.05, | |
help='Choose minimum confidence level. If prediction result below this threshold, no information is shown.' | |
) | |
if content_file is not None: | |
content = transform_image(content_file) | |
pred_info = get_classification( | |
content, df_train, sub_test_list, | |
embeddings, ort_session, input_name, confidence_threshold | |
) | |
pred_images = get_similarity( | |
content, df_train, sub_test_list, | |
embeddings, ort_session, input_name | |
) | |
container = st.container() | |
col6, col7 = container.columns([1, 4]) | |
# with col6: | |
if col6.button("PREDICT"): | |
print_classification(col2, content_file, pred_info) | |
if col7.button("SEARCH SIMILAR"): | |
print_classification(col2, content_file, pred_info) | |
if pred_info['maker'] != 'This is not car part !': | |
# container = st.container() | |
print_similar_img(pred_images) # , container) | |
else: | |
st.warning("No similar car part image ! Reduce confidence threshold OR Choose another image.") | |
else: | |
st.success("Upload an Image OR Untick the Upload Button from Options on the sidebar") | |
st.info("Navigate input source from Navigation on the sidebar") | |
st.stop() | |
WEBRTC_CLIENT_SETTINGS = ClientSettings( | |
rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}, | |
media_stream_constraints={ | |
"video": True, | |
"audio": False, | |
}, | |
) | |
def webcam_input(df_train, sub_test_list, embeddings, ort_session, input_name): | |
st.header("Webcam Live Feed") | |
class NeuralStyleTransferTransformer(VideoProcessorBase): | |
def __init__(self) -> None: | |
self._model_lock = threading.Lock() | |
def _annotate_image(self, image, pred_info): | |
# display the prediction | |
part_name = pred_info['part_name'] | |
confidence = pred_info['sim_score'] | |
label = f"{part_name} {confidence}" | |
cv2.putText( | |
image, | |
label, | |
(2, 30), | |
cv2.FONT_HERSHEY_SIMPLEX, | |
0.8, | |
(0, 255, 223), | |
2, | |
) | |
return image | |
def recv(self, frame: av.VideoFrame) -> av.VideoFrame: | |
image = frame.to_ndarray(format="bgr24") | |
content = transform_image(image, IMG=False) | |
pred_info = get_classification_frame( | |
content, df_train, sub_test_list, | |
embeddings, ort_session, input_name | |
) | |
annotated_image = self._annotate_image(image, pred_info) | |
return av.VideoFrame.from_ndarray(annotated_image, format="bgr24") | |
webrtc_ctx = webrtc_streamer( | |
key="live-cassification", | |
mode=WebRtcMode.SENDRECV, | |
client_settings=WEBRTC_CLIENT_SETTINGS, | |
video_processor_factory=NeuralStyleTransferTransformer, | |
async_processing=True, | |
) | |
def print_classification(col2, content_file, pred_info): | |
""" Print classification prediction | |
""" | |
with col2: | |
col2.markdown('##### Predicted information') | |
col2.markdown('') | |
if pred_info['maker'] != 'This is not car part !': | |
col2.markdown('###### - {}'.format(pred_info['maker'])) | |
col2.markdown('###### - {}'.format(pred_info['model'])) | |
col2.markdown('###### - {}'.format(pred_info['vehicle'])) | |
col2.markdown('###### - {}'.format(pred_info['year'])) | |
col2.markdown('###### - {}'.format(pred_info['part'])) | |
col2.markdown('###### - {}'.format(pred_info['predict_time'])) | |
col2.markdown('###### - {}'.format(pred_info['sim_score'])) | |
else: | |
col2.markdown('### {}'.format(pred_info['maker'])) | |
def print_similar_img(pred_images): | |
""" Print similarity images prediction | |
""" | |
st.markdown('### Most similar images') | |
st.markdown('#### {}'.format(pred_images['predict_time'])) | |
col3, col4, col5 = st.columns(3) | |
with col3: | |
col3.image(pred_images['images'][0], channels='BGR', clamp=True, width=300) | |
col3.image(pred_images['images'][1], channels='BGR', clamp=True, width=300) | |
with col4: | |
# col4.markdown('# ') | |
col4.image(pred_images['images'][3], channels='BGR', clamp=True, width=300) | |
col4.image(pred_images['images'][4], channels='BGR', clamp=True, width=300) | |
with col5: | |
col5.image(pred_images['images'][5], channels='BGR', clamp=True, width=300) | |
col5.image(pred_images['images'][2], channels='BGR', clamp=True, width=300) | |