import gradio as gr from ultralytics import YOLO import torch, torchvision import cv2 import os import pickle import numpy as np class jeysonHandler: def __init__(self, delta_threshold, pos_threshold, last_pos_num=5, wait_time=60, num_types=4): self.ids={} self.missing_ids=set() self.translator={} self.dt = delta_threshold self.pt = pos_threshold self.lpn = last_pos_num self.wt = wait_time self.num_types = num_types def reg_id(self, id): self.ids[id] ={ 'type_statistics':[0]*self.num_types, 'last_positions':[], 'last_delta':np.array((0,0)), 'events':{ '0':[], '1':[] } } def reg_data(self, timestamp, id, type, pos): pos = np.array(pos) self.missing_ids.discard(id) self.ids[id]['type_statistics'][type]+=1 self.ids[id]['last_positions'].append((timestamp, pos)) if len(self.ids[id]['last_positions']) > self.lpn: self.ids[id]['last_positions'] = self.ids[id]['last_positions'][-self.lpn:] deltaS = self.ids[id]['last_positions'][-1][1]-self.ids[id]['last_positions'][0][1] deltaT = self.ids[id]['last_positions'][-1][0]-self.ids[id]['last_positions'][0][0] delta = deltaS/deltaT self.ids[id]['last_delta'] = delta event = { 'start_time': timestamp, 'end_time': timestamp } if len(self.ids[id]['events']['0']) ==0: self.ids[id]['events']['0'].append(event) elif timestamp - self.ids[id]['events']['0'][-1]['end_time'] > self.wt: if self.ids[id]['events']['0'][-1]['end_time'] - self.ids[id]['events']['0'][-1]['start_time'] < self.wt/3: self.ids[id]['events']['0'].pop() self.ids[id]['events']['0'].append(event) else: self.ids[id]['events']['0'][-1]['end_time'] = timestamp if np.sqrt(np.sum(np.power(delta, 2)))> self.dt: if len(self.ids[id]['events']['1']) ==0: self.ids[id]['events']['1'].append(event) elif timestamp - self.ids[id]['events']['1'][-1]['end_time'] > self.wt: if self.ids[id]['events']['1'][-1]['end_time'] - self.ids[id]['events']['1'][-1]['start_time'] < self.wt/3: self.ids[id]['events']['1'].pop() self.ids[id]['events']['1'].append(event) else: self.ids[id]['events']['1'][-1]['end_time'] = timestamp def add_data(self, timestamp, data): lost = set(self.ids.keys()) lost = lost.difference(self.missing_ids) newids = [] for id, type, pos in data: if id in self.translator: id = self.translator[id] if id in self.ids: lost.discard(id) self.reg_data(timestamp, id, type, pos) else: newids.append((id, type, pos)) for id, type, pos in newids: distances = [] for lostid in lost: last_time = self.ids[lostid]['last_positions'][-1][0] if timestamp - last_time > self.wt: self.missing_ids.add(lostid) continue last_pos = self.ids[lostid]['last_positions'][-1][1] last_delta = self.ids[lostid]['last_delta'] predicted_pos = last_pos + last_delta*(timestamp - last_time) distance = np.sqrt(np.sum(np.power(pos-predicted_pos,2))) distances.append((distance, lostid)) if len(distances)>0: mindist = min(distances) if mindist[0]