xsestech's picture
Update app.py
c1e37c4
import gradio as gr
from ultralytics import YOLO
import torch, torchvision
import cv2
import os
import pickle
import numpy as np
class jeysonHandler:
def __init__(self, delta_threshold, pos_threshold, last_pos_num=5, wait_time=60, num_types=4):
self.ids={}
self.missing_ids=set()
self.translator={}
self.dt = delta_threshold
self.pt = pos_threshold
self.lpn = last_pos_num
self.wt = wait_time
self.num_types = num_types
def reg_id(self, id):
self.ids[id] ={
'type_statistics':[0]*self.num_types,
'last_positions':[],
'last_delta':np.array((0,0)),
'events':{
'0':[],
'1':[]
}
}
def reg_data(self, timestamp, id, type, pos):
pos = np.array(pos)
self.missing_ids.discard(id)
self.ids[id]['type_statistics'][type]+=1
self.ids[id]['last_positions'].append((timestamp, pos))
if len(self.ids[id]['last_positions']) > self.lpn:
self.ids[id]['last_positions'] = self.ids[id]['last_positions'][-self.lpn:]
deltaS = self.ids[id]['last_positions'][-1][1]-self.ids[id]['last_positions'][0][1]
deltaT = self.ids[id]['last_positions'][-1][0]-self.ids[id]['last_positions'][0][0]
delta = deltaS/deltaT
self.ids[id]['last_delta'] = delta
event = {
'start_time': timestamp,
'end_time': timestamp
}
if len(self.ids[id]['events']['0']) ==0:
self.ids[id]['events']['0'].append(event)
elif timestamp - self.ids[id]['events']['0'][-1]['end_time'] > self.wt:
if self.ids[id]['events']['0'][-1]['end_time'] - self.ids[id]['events']['0'][-1]['start_time'] < self.wt/3:
self.ids[id]['events']['0'].pop()
self.ids[id]['events']['0'].append(event)
else:
self.ids[id]['events']['0'][-1]['end_time'] = timestamp
if np.sqrt(np.sum(np.power(delta, 2)))> self.dt:
if len(self.ids[id]['events']['1']) ==0:
self.ids[id]['events']['1'].append(event)
elif timestamp - self.ids[id]['events']['1'][-1]['end_time'] > self.wt:
if self.ids[id]['events']['1'][-1]['end_time'] - self.ids[id]['events']['1'][-1]['start_time'] < self.wt/3:
self.ids[id]['events']['1'].pop()
self.ids[id]['events']['1'].append(event)
else:
self.ids[id]['events']['1'][-1]['end_time'] = timestamp
def add_data(self, timestamp, data):
lost = set(self.ids.keys())
lost = lost.difference(self.missing_ids)
newids = []
for id, type, pos in data:
if id in self.translator:
id = self.translator[id]
if id in self.ids:
lost.discard(id)
self.reg_data(timestamp, id, type, pos)
else:
newids.append((id, type, pos))
for id, type, pos in newids:
distances = []
for lostid in lost:
last_time = self.ids[lostid]['last_positions'][-1][0]
if timestamp - last_time > self.wt:
self.missing_ids.add(lostid)
continue
last_pos = self.ids[lostid]['last_positions'][-1][1]
last_delta = self.ids[lostid]['last_delta']
predicted_pos = last_pos + last_delta*(timestamp - last_time)
distance = np.sqrt(np.sum(np.power(pos-predicted_pos,2)))
distances.append((distance, lostid))
if len(distances)>0:
mindist = min(distances)
if mindist[0]<self.pt:
self.translator[id] = lostid
self.reg_data(timestamp, lostid, type, pos)
else:
self.reg_id(id)
self.reg_data(timestamp, id, type, pos)
else:
self.reg_id(id)
self.reg_data(timestamp, id, type, pos)
def get_json(self, type_names):
ajson = {}
for id in self.ids:
idinfo = {
'type': type_names[np.argmax(np.array(self.ids[id]['type_statistics']))],
'events': self.ids[id]['events']
}
ajson[str(id)] = idinfo
return ajson
i = 0
def timestamp():
global i
i += 1
return (i - 1) / 5
def video_model(video):
model = YOLO('last.pt')
predict = model.track(source=video, show=False, conf=.1)
boxes = list(map(lambda x: (timestamp(), x.boxes.id.int().numpy(), x.boxes.cls.int().numpy(), (x.boxes.xywh[..., :2] + x.boxes.xywh[..., 2:] / 2).numpy()), predict))
handler = jeysonHandler(3, 10)
for x in boxes:
timestamps = x[0]
data = []
for i in range(len(x[1])):
data.append((x[1][i], x[2][i], x[3][i]))
handler.add_data(timestamps, data)
return handler.get_json(['crane', 'excavator', 'tractor', 'truck'])
demo = gr.Interface(video_model,
gr.Video(),
gr.JSON(),
cache_examples=True).queue()
if __name__ == "__main__":
demo.launch(share=False)