Spaces:
Sleeping
Sleeping
import gradio as gr | |
from ultralytics import YOLO | |
import torch, torchvision | |
import cv2 | |
import os | |
import pickle | |
import numpy as np | |
class jeysonHandler: | |
def __init__(self, delta_threshold, pos_threshold, last_pos_num=5, wait_time=60, num_types=4): | |
self.ids={} | |
self.missing_ids=set() | |
self.translator={} | |
self.dt = delta_threshold | |
self.pt = pos_threshold | |
self.lpn = last_pos_num | |
self.wt = wait_time | |
self.num_types = num_types | |
def reg_id(self, id): | |
self.ids[id] ={ | |
'type_statistics':[0]*self.num_types, | |
'last_positions':[], | |
'last_delta':np.array((0,0)), | |
'events':{ | |
'0':[], | |
'1':[] | |
} | |
} | |
def reg_data(self, timestamp, id, type, pos): | |
pos = np.array(pos) | |
self.missing_ids.discard(id) | |
self.ids[id]['type_statistics'][type]+=1 | |
self.ids[id]['last_positions'].append((timestamp, pos)) | |
if len(self.ids[id]['last_positions']) > self.lpn: | |
self.ids[id]['last_positions'] = self.ids[id]['last_positions'][-self.lpn:] | |
deltaS = self.ids[id]['last_positions'][-1][1]-self.ids[id]['last_positions'][0][1] | |
deltaT = self.ids[id]['last_positions'][-1][0]-self.ids[id]['last_positions'][0][0] | |
delta = deltaS/deltaT | |
self.ids[id]['last_delta'] = delta | |
event = { | |
'start_time': timestamp, | |
'end_time': timestamp | |
} | |
if len(self.ids[id]['events']['0']) ==0: | |
self.ids[id]['events']['0'].append(event) | |
elif timestamp - self.ids[id]['events']['0'][-1]['end_time'] > self.wt: | |
if self.ids[id]['events']['0'][-1]['end_time'] - self.ids[id]['events']['0'][-1]['start_time'] < self.wt/3: | |
self.ids[id]['events']['0'].pop() | |
self.ids[id]['events']['0'].append(event) | |
else: | |
self.ids[id]['events']['0'][-1]['end_time'] = timestamp | |
if np.sqrt(np.sum(np.power(delta, 2)))> self.dt: | |
if len(self.ids[id]['events']['1']) ==0: | |
self.ids[id]['events']['1'].append(event) | |
elif timestamp - self.ids[id]['events']['1'][-1]['end_time'] > self.wt: | |
if self.ids[id]['events']['1'][-1]['end_time'] - self.ids[id]['events']['1'][-1]['start_time'] < self.wt/3: | |
self.ids[id]['events']['1'].pop() | |
self.ids[id]['events']['1'].append(event) | |
else: | |
self.ids[id]['events']['1'][-1]['end_time'] = timestamp | |
def add_data(self, timestamp, data): | |
lost = set(self.ids.keys()) | |
lost = lost.difference(self.missing_ids) | |
newids = [] | |
for id, type, pos in data: | |
if id in self.translator: | |
id = self.translator[id] | |
if id in self.ids: | |
lost.discard(id) | |
self.reg_data(timestamp, id, type, pos) | |
else: | |
newids.append((id, type, pos)) | |
for id, type, pos in newids: | |
distances = [] | |
for lostid in lost: | |
last_time = self.ids[lostid]['last_positions'][-1][0] | |
if timestamp - last_time > self.wt: | |
self.missing_ids.add(lostid) | |
continue | |
last_pos = self.ids[lostid]['last_positions'][-1][1] | |
last_delta = self.ids[lostid]['last_delta'] | |
predicted_pos = last_pos + last_delta*(timestamp - last_time) | |
distance = np.sqrt(np.sum(np.power(pos-predicted_pos,2))) | |
distances.append((distance, lostid)) | |
if len(distances)>0: | |
mindist = min(distances) | |
if mindist[0]<self.pt: | |
self.translator[id] = lostid | |
self.reg_data(timestamp, lostid, type, pos) | |
else: | |
self.reg_id(id) | |
self.reg_data(timestamp, id, type, pos) | |
else: | |
self.reg_id(id) | |
self.reg_data(timestamp, id, type, pos) | |
def get_json(self, type_names): | |
ajson = {} | |
for id in self.ids: | |
idinfo = { | |
'type': type_names[np.argmax(np.array(self.ids[id]['type_statistics']))], | |
'events': self.ids[id]['events'] | |
} | |
ajson[str(id)] = idinfo | |
return ajson | |
i = 0 | |
def timestamp(): | |
global i | |
i += 1 | |
return (i - 1) / 5 | |
def video_model(video): | |
model = YOLO('last.pt') | |
predict = model.track(source=video, show=False, conf=.1) | |
boxes = list(map(lambda x: (timestamp(), x.boxes.id.int().numpy(), x.boxes.cls.int().numpy(), (x.boxes.xywh[..., :2] + x.boxes.xywh[..., 2:] / 2).numpy()), predict)) | |
handler = jeysonHandler(3, 10) | |
for x in boxes: | |
timestamps = x[0] | |
data = [] | |
for i in range(len(x[1])): | |
data.append((x[1][i], x[2][i], x[3][i])) | |
handler.add_data(timestamps, data) | |
return handler.get_json(['crane', 'excavator', 'tractor', 'truck']) | |
demo = gr.Interface(video_model, | |
gr.Video(), | |
gr.JSON(), | |
cache_examples=True).queue() | |
if __name__ == "__main__": | |
demo.launch(share=False) |