gradio_test_app / app.py
derat0r's picture
Upload app.py
d74c020
raw
history blame
12.9 kB
import matplotlib.pyplot as plt
import mediapipe as mp
import gradio as gr
import numpy as np
import torch
from torch import nn
import cv2 as cv
mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic
frame_size = (350, 200)
NUM_FRAMES = 30
device = "cpu"
unique_symbols = [' ', '!', '(', ')', ',', '-', '0', '1', '2', '4', '5', '6', '7', ':', ';', '?', 'D', 'M', 'a', 'd',
'k', 'l', 'n', 'o', 's', 'no_event', 'Ё', 'А', 'Б', 'В', 'Г', 'Д', 'Е', 'Ж', 'З', 'И', 'Й', 'К', 'Л',
'М', 'Н', 'О', 'П', 'Р', 'С', 'Т', 'У', 'Ф', 'Х', 'Ц', 'Ч', 'Ш', 'Щ', 'Ъ', 'Ы', 'Ь', 'Э', 'Ю', 'Я',
'а', 'б', 'в', 'г', 'д', 'е', 'ж', 'з', 'и', 'й', 'к', 'л', 'м', 'н', 'о', 'п', 'р', 'с', 'т', 'у',
'ф', 'х', 'ц', 'ч', 'ш', 'щ', 'ъ', 'ы', 'ь', 'э', 'ю', 'я', 'ё', "#", "<", ">"]
label2idx = {unique_symbols[i]: i for i in range(len(unique_symbols))}
idx2label = {i: unique_symbols[i] for i in range(len(unique_symbols))}
bos_token = "<"
eos_token = ">"
pad_token = "#"
class TokenEmbedding(nn.Module):
def __init__(self, num_vocab=1000, maxlen=100, num_hid=64):
super().__init__()
self.emb = nn.Embedding(num_vocab, num_hid)
self.pos_emb = nn.Embedding(maxlen, num_hid)
def forward(self, x):
maxlen = x.size()[-1]
x = self.emb(x)
positions = torch.arange(start=0, end=maxlen).to(device)
positions = self.pos_emb(positions)
return x + positions
class LandmarkEmbedding(nn.Module):
def __init__(self, in_ch, num_hid=64):
super().__init__()
self.emb = nn.Sequential(
nn.Conv1d(in_channels=in_ch, out_channels=num_hid, kernel_size=11, padding="same"),
nn.ReLU(),
nn.Conv1d(in_channels=num_hid, out_channels=num_hid, kernel_size=11, padding="same"),
nn.ReLU(),
nn.Conv1d(in_channels=num_hid, out_channels=num_hid, kernel_size=11, padding="same"),
nn.ReLU()
)
def forward(self, x):
x = x.permute(0, 2, 1)
x = self.emb(x)
x = x.permute(0, 2, 1)
return x
class TransformerEncoder(nn.Module):
def __init__(self, embed_dim, num_heads, feed_forward_dim, rate=0.1):
super().__init__()
self.att = nn.MultiheadAttention(num_heads=num_heads, embed_dim=embed_dim, batch_first=True)
self.ffn = nn.Sequential(
nn.Linear(in_features=embed_dim, out_features=feed_forward_dim),
nn.ReLU(),
nn.Linear(in_features=feed_forward_dim, out_features=embed_dim)
)
self.layernorm1 = nn.LayerNorm(normalized_shape=embed_dim, eps=1e-6)
self.layernorm2 = nn.LayerNorm(normalized_shape=embed_dim, eps=1e-6)
self.dropout1 = nn.Dropout(rate)
self.dropout2 = nn.Dropout(rate)
def forward(self, inputs):
attn_output = self.att(inputs, inputs, inputs)[0]
attn_output = self.dropout1(attn_output)
out1 = self.layernorm1(inputs + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output)
return self.layernorm2(out1 + ffn_output)
class TransformerDecoder(nn.Module):
def __init__(self, embed_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
super().__init__()
self.num_heads = num_heads
self.layernorm1 = nn.LayerNorm(normalized_shape=embed_dim, eps=1e-6)
self.layernorm2 = nn.LayerNorm(normalized_shape=embed_dim, eps=1e-6)
self.layernorm3 = nn.LayerNorm(normalized_shape=embed_dim, eps=1e-6)
self.self_att = nn.MultiheadAttention(num_heads=num_heads, embed_dim=embed_dim, batch_first=True)
self.enc_att = nn.MultiheadAttention(num_heads=num_heads, embed_dim=embed_dim, batch_first=True)
self.self_dropout = nn.Dropout(0.5)
self.enc_dropout = nn.Dropout(0.1)
self.ffn_dropout = nn.Dropout(0.1)
self.ffn = nn.Sequential(
nn.Linear(in_features=embed_dim, out_features=feed_forward_dim),
nn.ReLU(),
nn.Linear(in_features=feed_forward_dim, out_features=embed_dim)
)
def causal_attention_mask(self, batch_size, n_dest, n_src, dtype):
"""Masks the upper half of the dot product matrix in self attention.
This prevents flow of information from future tokens to current token.
1's in the lower triangle, counting from the lower right corner.
"""
i = torch.arange(start=0, end=n_dest)[:, None]
j = torch.arange(start=0, end=n_src)
m = i >= j - n_src + n_dest
mask = m.type(dtype)
mask = torch.reshape(mask, [1, n_dest, n_src])
batch_size = torch.LongTensor([batch_size])
mult = torch.cat((batch_size * self.num_heads, torch.ones(1, 2).type(torch.int32).squeeze(0)), axis=0)
mult = tuple(mult.detach().cpu().numpy())
return torch.tile(mask, mult).to(device)
def forward(self, enc_out, target):
input_shape = target.size()
batch_size = input_shape[0]
seq_len = input_shape[1]
causal_mask = self.causal_attention_mask(batch_size, seq_len, seq_len, torch.bool)
target_att = self.self_att(target, target, target, is_causal=True)[0]
self_dropout = self.self_dropout(target_att)
target_norm = self.layernorm1(target + self_dropout)
enc_out = self.enc_att(target_norm, enc_out, enc_out)[0]
enc_out_norm = self.layernorm2(self.enc_dropout(enc_out) + target_norm)
ffn_out = self.ffn(enc_out_norm)
ffn_out_norm = self.layernorm3(enc_out_norm + self.ffn_dropout(ffn_out))
return ffn_out_norm
class Transformer(nn.Module):
def __init__(
self,
num_hid=64,
num_head=2,
num_feed_forward=128,
target_maxlen=100,
num_layers_enc=4,
num_layers_dec=1,
num_classes=10,
in_ch=126
):
super().__init__()
self.num_layers_enc = num_layers_enc
self.num_layers_dec = num_layers_dec
self.target_maxlen = target_maxlen
self.num_classes = num_classes
self.enc_input = LandmarkEmbedding(in_ch=in_ch, num_hid=num_hid)
self.dec_input = TokenEmbedding(
num_vocab=num_classes, maxlen=target_maxlen, num_hid=num_hid
)
list_encoder = [self.enc_input] + [
TransformerEncoder(num_hid, num_head, num_feed_forward)
for _ in range(num_layers_enc)
]
self.encoder = nn.Sequential(*list_encoder)
for i in range(num_layers_dec):
setattr(
self,
f"dec_layer_{i}",
TransformerDecoder(num_hid, num_head, num_feed_forward),
)
self.classifier = nn.Linear(in_features=num_hid, out_features=num_classes)
def decode(self, enc_out, target):
y = self.dec_input(target)
for i in range(self.num_layers_dec):
y = getattr(self, f"dec_layer_{i}")(enc_out, y)
return y
def forward(self, source, target):
x = self.encoder(source)
y = self.decode(x, target)
y = self.classifier(y)
return y
def generate(self, source, target_start_token_idx):
"""Performs inference over one batch of inputs using greedy decoding."""
bs = source.size()[0]
enc = self.encoder(source)
dec_input = torch.ones((bs, 1), dtype=torch.int32) * target_start_token_idx
dec_input = dec_input.to(device)
dec_logits = []
for i in range(self.target_maxlen - 1):
dec_out = self.decode(enc, dec_input)
logits = self.classifier(dec_out)
logits = torch.argmax(logits, dim=-1).type(torch.int32)
# last_logit = tf.expand_dims(logits[:, -1], axis=-1)
last_logit = logits[:, -1].unsqueeze(0)
dec_logits.append(last_logit)
dec_input = torch.concat([dec_input, last_logit], axis=-1)
dec_input = dec_input.squeeze(0).cpu()
return dec_input
model = torch.load("weights.pt", map_location=torch.device('cpu'))
model.eval()
def predict(inp):
x = torch.from_numpy(inp).to(device)
enc_out = model.generate(x.unsqueeze(0), label2idx[bos_token]).numpy()
res1 = ""
for p in enc_out:
res1 += idx2label[p]
if p == label2idx[eos_token]:
break
print(f"prediction: {res1}\n")
def mediapipe_detection(image, model, show_landmarks):
image = cv.cvtColor(image, cv.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
image = cv.flip(image, 1)
image.flags.writeable = False # Image is no longer writeable
results = model.process(image) # Make prediction
if show_landmarks:
image.flags.writeable = True # Image is now writeable
image = cv.cvtColor(image, cv.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
return image, results
def classify_image(inp):
cap = cv.VideoCapture(inp)
landmark_list = []
frame_counter = 0
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame = cv.resize(frame, frame_size)
show_landmarks = False # FIX ME
image, results = mediapipe_detection(frame, holistic, show_landmarks)
# pose
try:
pose = results.pose_landmarks.landmark
pose_mat = list([landmark.x, landmark.y, landmark.z] for landmark in pose[11:17])
mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=1, circle_radius=2),
mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=1, circle_radius=1)
)
except:
pose_mat = [[0, 0, 0]] * 6
# print(pose_show)
# left hand
try:
left = results.left_hand_landmarks.landmark
left_mat = list([landmark.x, landmark.y, landmark.z] for landmark in left)
if show_landmarks:
mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
mp_drawing.DrawingSpec(color=(121, 22, 76), thickness=1, circle_radius=2),
mp_drawing.DrawingSpec(color=(121, 44, 250), thickness=1, circle_radius=1)
)
except:
left_mat = [[0, 0, 0]] * 21
# right hand
try:
right = results.right_hand_landmarks.landmark
right_mat = list([landmark.x, landmark.y, landmark.z] for landmark in right)
if show_landmarks:
mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
mp_drawing.DrawingSpec(color=(76, 22, 121), thickness=1, circle_radius=2),
mp_drawing.DrawingSpec(color=(44, 250, 44), thickness=1, circle_radius=1)
)
except:
right_mat = [[0, 0, 0]] * 21
iter_landmarks = left_mat + right_mat # + pose_mat
landmark_list.append(iter_landmarks)
if show_landmarks:
plt.imshow(image)
plt.show()
frame_counter += 1
cap.release()
frames = len(landmark_list)
if frames < NUM_FRAMES:
for i in range(NUM_FRAMES - frames):
landmark_list = [landmark_list[0]] + landmark_list
elif frames > NUM_FRAMES:
start = (frames - NUM_FRAMES) // 2
landmark_list = landmark_list[start:start + NUM_FRAMES]
landmark_list = np.array([landmark_list], dtype=np.float32)
if landmark_list.shape == (1, 30, 42, 3):
landmark_list = landmark_list.reshape(landmark_list.shape[0], landmark_list.shape[1], -1)
inp = torch.from_numpy(landmark_list).to(device)
# inp = torch.randn(size=[1, 30, 126], dtype=torch.float32)
with torch.no_grad():
out = model.generate(inp, label2idx[bos_token]).numpy()
res1 = ""
for p in out:
res1 += idx2label[p]
if p == label2idx[eos_token]:
break
return res1
else:
return f'Classification Error {landmark_list.shape}'
gr.Interface(fn=classify_image,
inputs=gr.Video(height=360, width=480),
outputs='text').launch(share=True)