tkau's picture
Upload 5 files (#2)
d55dccd
from pathlib import Path
import PIL
# External packages
import streamlit as st
# Local Modules
import settings
import helper
import test_api.algorithm as algorithm
import multiprocessing
import time
import requests
import socketio
import base64
import io
from PIL import Image
st.set_page_config(
page_title="YOLOv8 目标检测",
page_icon="🤖",
layout="wide",
initial_sidebar_state="expanded" # 或者 "collapsed"
)
# # Main page heading
st.title("YOLOv8 目标检测")
# Sidebar
st.sidebar.header("模型配置")
# Model Options
model_type = st.sidebar.selectbox(
"任务选择", ['检测', '分割',"越界检测","行为检测"])
confidence = float(st.sidebar.slider(
"选择模型Confidence", 25, 100, 40)) / 100
# Selecting Detection Or Segmentation
if model_type == '检测':
model_path = Path(settings.DETECTION_MODEL)
elif model_type == '分割':
model_path = Path(settings.SEGMENTATION_MODEL)
elif model_type == "越界检测":
model_path = Path(settings.SEGMENTATION_MODEL)
elif model_type == "行为检测":
model_path = Path(settings.SEGMENTATION_MODEL)
# Load Pre-trained ML Model
try:
model = helper.load_model(model_path)
except Exception as ex:
st.error(f"Unable to load model. Check the specified path: {model_path}")
st.error(ex)
st.sidebar.header("图像/视频 配置")
source_radio = st.sidebar.radio(
"选择来源", settings.SOURCES_LIST)
source_img = None
# If image is selected
if source_radio == settings.IMAGE:
source_img = st.sidebar.file_uploader(
"选择一张图像...", type=("jpg", "jpeg", "png", 'bmp', 'webp'))
col1, col2 = st.columns(2)
with col1:
try:
if source_img is None:
default_image_path = str(settings.DEFAULT_IMAGE)
default_image = PIL.Image.open(default_image_path)
st.image(default_image_path, caption="默认图像",
use_column_width=True)
else:
uploaded_image = PIL.Image.open(source_img)
st.image(source_img, caption="Uploaded Image",
use_column_width=True)
except Exception as ex:
st.error("Error occurred while opening the image.")
st.error(ex)
with col2:
if source_img is None:
default_detected_image_path = str(settings.DEFAULT_DETECT_IMAGE)
default_detected_image = PIL.Image.open(
default_detected_image_path)
st.image(default_detected_image_path, caption='检测图像',
use_column_width=True)
else:
if st.sidebar.button('检测目标'):
res = model.predict(uploaded_image,
conf=confidence
)
boxes = res[0].boxes
res_plotted = res[0].plot()[:, :, ::-1]
st.image(res_plotted, caption='Detected Image',
use_column_width=True)
try:
with st.expander("Detection Results"):
for box in boxes:
st.write(box.data)
except Exception as ex:
# st.write(ex)
st.write("No image is uploaded yet!")
elif source_radio == settings.RTSP:
if model_type == '检测':
src ={
"video_url": "http://127.0.0.1:8999/live/test.live.flv"
}
start = st.sidebar.button('检测目标')
stop = st.sidebar.button('停止')
if start:
response = requests.post('http://192.168.110.232:7555/analyzerControlAdd', json=src)
print(response.text)
elif model_type == '分割':
yolov8=algorithm.YoloV8Detection()
yolov8.play_rtsp_stream(confidence,model,display_video=True)
elif model_type == "越界检测":
yolov8_b=algorithm.BoundaryDetection()
yolov8_b.play_rtsp_stream(confidence,model,display_video=True)
#helper.play_rtsp_stream(confidence, model)
else:
st.error("Please select a valid source type!")
# 创建 SocketIO 实例
sio = socketio.Client()
# 连接到 WebSocket 服务器
sio.connect('http://192.168.110.232:7555')
# 接收帧数据并显示
@sio.on('frame_data')
def handle_frame_data(data):
image_bytes = base64.b64decode(data['image'])
image = Image.open(io.BytesIO(image_bytes))
st.image(image, caption='Real-time Detection', channels='BGR')
# 等待连接断开
st.text('Waiting for real-time updates...')
sio.wait()