Spaces:
Running
Running
from ultralytics import YOLO #行人识别,采用YoloV8模型 | |
import math #用于四舍五入取整 | |
import cv2 #opencv图像处理库 | |
import cvzone #在图像上绘画 | |
import numpy as np | |
import gradio as gr #GUI界面 | |
import tempfile #创建输出临时文件夹 | |
import os | |
from detectMotion import * #单独的运动检测 | |
import time | |
import deep_sort.deep_sort.deep_sort as ds | |
#导入YoloV8模型,模型名称存储在model_list.txt中 | |
with open("model_list.txt") as file: | |
lines = file.readlines() | |
model_list = [line.rstrip() for line in lines] #逐行导入 | |
#用于人流量统计 | |
number_of_people_in_one_frame_list = [] | |
#YoloV8官方模型标签数据,本次项目只使用了'person' | |
classNames=['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light', | |
'fire hydrant', | |
'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', | |
'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball', | |
'kite', | |
'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', | |
'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', | |
'donut', | |
'cake', 'chair', 'couch', 'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', | |
'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', | |
'scissors', | |
'teddy bear', 'hair drier', 'toothbrush'] | |
#实时显示人流量统计图表 | |
def plot_number(): | |
global number_of_people_in_one_frame_list | |
plt.close() | |
fig = plt.figure() | |
plt.xlabel('Second') | |
plt.ylabel('Traffic') | |
plt.grid(linestyle='--', alpha=0.3, linewidth=2) | |
plt.plot(number_of_people_in_one_frame_list) | |
return fig | |
# 彩色图像进行自适应直方图均衡化 | |
def hisEqulColor(img): | |
fig_preprocessed = image_histogram(img) | |
## 将RGB图像转换到YCrCb空间中 | |
ycrcb = cv2.cvtColor(img, cv2.COLOR_BGR2YCR_CB) | |
# 将YCrCb图像通道分离 | |
channels = cv2.split(ycrcb) | |
# 以下代码详细注释见官网: | |
# https://docs.opencv.org/4.1.0/d5/daf/tutorial_py_histogram_equalization.html | |
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
clahe.apply(channels[0], channels[0]) | |
cv2.merge(channels, ycrcb) | |
cv2.cvtColor(ycrcb, cv2.COLOR_YCR_CB2BGR, img) | |
fig_postprocessed = image_histogram(img) | |
return img, fig_preprocessed, fig_postprocessed | |
#添加高斯噪声 | |
def AddGaussNoise(img,sigma): | |
gauss=np.random.normal(0,sigma,img.shape) | |
img=np.uint8(img + gauss)#将高斯噪声与原始图像叠加 | |
#img=cv2.medianBlur(img,5) | |
return img | |
#图像超分 | |
def AddISR(img): | |
sr = cv2.dnn_superres.DnnSuperResImpl_create() | |
sr.readModel("EDSR_x4.pb") | |
sr.setModel("edsr", 4) # set the model by passing the value and the upsampling ratio | |
result = sr.upsample(img) # upscale the input image | |
return result | |
#图像处理 | |
def processImg(img,sigma,median_filter,ISR): | |
img=cv2.cvtColor(img,cv2.COLOR_RGB2BGR) | |
res1 = AddGaussNoise(img,sigma) | |
if median_filter == True: | |
#中值滤波降噪 | |
img=cv2.medianBlur(img,5) | |
res1, fig1, fig2 = hisEqulColor(res1) | |
if ISR == True: | |
res1 = AddISR(res1) | |
res1=cv2.cvtColor(res1,cv2.COLOR_BGR2RGB) | |
return res1, fig1, fig2 | |
#视频处理 | |
def processVideo(inputPath, codec, model): | |
global number_of_people_in_one_frame_list | |
tracker = ds.DeepSort('deep_sort/deep_sort/deep/checkpoint/ckpt.t7') | |
if inputPath == None: | |
raise gr.Error("请先上传视频") | |
model=YOLO(model) | |
number_of_people_in_one_frame = 0 | |
number_of_people_in_one_frame_list = [] | |
number_of_people = 0 | |
sum_of_frame = 0 | |
cap = cv2.VideoCapture(inputPath)#从inputPath读入视频 | |
fps = cap.get(cv2.CAP_PROP_FPS) #获取视频的帧率 | |
size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), | |
int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))#获取视频的大小 | |
output_viedo = cv2.VideoWriter()#初始化视频写入 | |
outputPath=tempfile.mkdtemp()#创建输出视频的临时文件夹的路径 | |
#输出格式 | |
if codec == "mp4": | |
#h264:avc1,此处为了兼容性换成mp4v | |
fourcc = cv2.VideoWriter_fourcc('m','p','4','v')#视频编码:mp4v,此编码不需要openh264-1.8.0-win64.dll | |
video_save_path = os.path.join(outputPath,"output.mp4")#创建输出视频路径 | |
elif codec == "avi": | |
fourcc = cv2.VideoWriter_fourcc('h','2','6','4')#视频编码:h264,只能保存为avi格式,本地运行时不能在浏览器播放 | |
video_save_path = os.path.join(outputPath,"output.avi")#创建输出视频路径 | |
elif codec == "mkv": | |
fourcc = cv2.VideoWriter_fourcc('X','V','I','D')#视频编码:XVID,此编码不需要openh264-1.8.0-win64.dll | |
video_save_path = os.path.join(outputPath,"output.mkv")#创建输出视频路径 | |
elif codec == "wmv": | |
fourcc = cv2.VideoWriter_fourcc('X','V','I','D')#视频编码:XVID,此编码不需要openh264-1.8.0-win64.dll | |
video_save_path = os.path.join(outputPath,"output.wmv")#创建输出视频路径 | |
elif codec == "mp4(h264)": | |
#h264:avc1, 在本地运行时可以在浏览器播放 | |
fourcc = cv2.VideoWriter_fourcc('a','v','c','1')#视频编码:h264,本地运行时可直接在浏览器播放 | |
video_save_path = os.path.join(outputPath,"output_h264.mp4")#创建输出视频路径 | |
output_viedo.open(video_save_path , fourcc, fps, size, True) | |
#对每一帧图片进行读取和处理 | |
while True: | |
ret, img = cap.read()#将每一帧图片读取到img当中 | |
results=model(img,stream=True)#使用YoloV8模型进行推理 | |
detections=np.empty((0, 4))#初始化运动检测 | |
confarray = [] | |
if not(ret):#当视频全部读完,ret返回false,终止循环,视频帧读取和写入结束 | |
break | |
img, __, __= hisEqulColor(img)#视频增强 | |
#读取推理的数据 | |
for r in results: | |
boxes=r.boxes | |
for box in boxes: | |
#读取每一帧识别出的边界信息,并显示 | |
x1,y1,x2,y2=box.xywh[0] | |
x1,y1,x2,y2=int(x1),int(y1),int(x2),int(y2)#将tensor类型转变为整型 | |
conf=math.ceil(box.conf[0]*100)/100#对conf取2位小数 | |
cls=int(box.cls[0])#获取物体类别标签 | |
#只检测人 | |
if cls==0: | |
number_of_people_in_one_frame += 1 | |
currentArray=np.array([x1,y1,x2,y2]) | |
confarray.append(conf) | |
detections=np.vstack((detections,currentArray))#按行堆叠数据 | |
sum_of_frame += 1 | |
if sum_of_frame % fps == 0: | |
number_of_people_in_one_frame_list.append(number_of_people_in_one_frame) | |
number_of_people_in_one_frame = 0 | |
#运动检测 | |
resultsTracker=tracker.update(detections, confarray, img) | |
for result in resultsTracker: | |
x1,y1,x2,y2,Id=result | |
number_of_people=max(str(int(Id)),str(number_of_people)) | |
x1,y1,x2,y2=int(x1),int(y1),int(x2),int(y2)#将浮点数转变为整型 | |
cv2.rectangle(img,(x1,y1),(x2,y2),(255,0,255),3) | |
cvzone.putTextRect(img,f'{int(Id)}',(max(-10,x1),max(40,y1)),scale=1.3,thickness=2) | |
#image_np = np.squeeze(img.render())#用np.squeeze将输出结果降维 | |
output_viedo.write(img)#将处理后的图像写入视频 | |
output_viedo.release()#释放 | |
cap.release()#释放 | |
return video_save_path,video_save_path,number_of_people | |
#人流量显示 | |
def change_total_visible(value): | |
if value == True: | |
return gr.update(visible = True) | |
if value == False: | |
return gr.update(visible = False) | |
#WebUi图形界面(block) | |
with gr.Blocks() as demo: | |
gr.Markdown(""" | |
# 运动检测与行人跟踪 | |
基于opencv + yoloV8 + deepsort | |
""") | |
with gr.Tab("视频识别"): | |
with gr.Row(): | |
with gr.Column(): | |
text_inputPath = gr.Video() | |
codec = gr.Radio(["mp4","avi","mkv","wmv",'mp4(h264)'], label="输出视频格式", | |
value="mp4") | |
model = gr.Dropdown(model_list, value="yolov8n.pt", label="模型", info="模型列表存储在model_list.txt中") | |
videoProcess_button = gr.Button("处理") | |
with gr.Column(): | |
text_output = gr.Video() | |
text_output_path = gr.Text(label="输出路径") | |
total_ID = gr.Text(label="总人数") | |
total_visible = gr.Checkbox(label="显示人流量") | |
with gr.Row(): | |
with gr.Column(): | |
figure_number_output = gr.Plot(label="人流量", visible=False) | |
with gr.Tab("图像增强"): | |
with gr.Row(): | |
with gr.Column(): | |
image_input = gr.Image() | |
image_sigma = gr.Slider(0,40,label="高斯噪声sigma") | |
median_filter = gr.Checkbox(label="中值滤波") | |
Add_ISR = gr.Checkbox(label="图像超分") | |
image_output = gr.Image() | |
with gr.Column(): | |
figure_pre_output = gr.Plot(label="处理前直方图") | |
figure_post_output = gr.Plot(label="处理后直方图") | |
image_button = gr.Button("处理") | |
with gr.Tab("运动检测"): | |
with gr.Column(): | |
with gr.Row(): | |
with gr.Column(): | |
motion_inputPath = gr.Video() | |
motionProcess_button = gr.Button("处理") | |
motion_output_frame = gr.Video() | |
motion_output_fmask = gr.Video() | |
with gr.Column(): | |
frame_output_path = gr.Text(label="frame输出路径") | |
fmask_output_path = gr.Text(label="mask输出路径") | |
with gr.Accordion("算法:"): | |
gr.Markdown("高斯混合模型(GMM)") | |
videoProcess_button.click(processVideo, inputs=[text_inputPath, codec, model], | |
outputs=[text_output,text_output_path, total_ID]) | |
image_button.click(processImg, inputs=[image_input,image_sigma,median_filter,Add_ISR], | |
outputs=[image_output,figure_pre_output,figure_post_output]) | |
motionProcess_button.click(motionDetection, inputs=[motion_inputPath], | |
outputs=[motion_output_frame,motion_output_fmask, | |
frame_output_path,fmask_output_path]) | |
videoProcess_button.click(plot_number,outputs=figure_number_output,every=2) | |
total_visible.change(change_total_visible,total_visible, figure_number_output ) | |
demo.queue()#当有多个请求时,排队 | |
demo.launch()#生成内网链接,如需要公网链接,括号内输入share=True | |
#server_port=6006 | |