import cv2 import torch from ultralytics import YOLO import gradio as gr import threading import time import os import zipfile from datetime import datetime import pandas as pd import tempfile # === 模型與設定 === model = YOLO("best0709.pt") TARGET_CLASS_NAME = "kumay" save_dir = "saved_bears" log_path = os.path.join(save_dir, "detection_log.csv") os.makedirs(save_dir, exist_ok=True) # === 全域狀態 === latest_frame = None lock = threading.Lock() streaming = False # 初始化 log 檔 if not os.path.exists(log_path): with open(log_path, "w") as f: f.write("frame_id,timestamp,timestamp_diff,filename,class,confidence\n") last_detection_time = None frame_counter = 0 # === Webcam 持續讀取 === def webcam_reader(): global latest_frame cap = cv2.VideoCapture(0) while True: ret, frame = cap.read() if ret: with lock: latest_frame = frame.copy() time.sleep(0.03) # === 偵測與儲存 === def detect_and_save(frame): global last_detection_time, frame_counter results = model(frame) names = results[0].names has_bear = False best_conf = 0 best_cls_name = "" for box in results[0].boxes: cls_id = int(box.cls[0]) cls_name = names[cls_id] conf = float(box.conf[0]) if cls_name == TARGET_CLASS_NAME and conf >= 0.85: has_bear = True if conf > best_conf: best_conf = conf best_cls_name = cls_name if has_bear: timestamp = datetime.now() timestamp_str = timestamp.strftime("%Y%m%d_%H%M%S_%f")[:-3] filename = os.path.join(save_dir, f"bear_{timestamp_str}.png") for box in results[0].boxes: cls_id = int(box.cls[0]) cls_name = names[cls_id] conf = float(box.conf[0]) if cls_name == TARGET_CLASS_NAME and conf >= 0.85: xyxy = box.xyxy[0].cpu().numpy().astype(int) cv2.putText( frame, f"{cls_name}: {conf:.2f}", (xyxy[0], xyxy[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2, ) cv2.rectangle(frame, (xyxy[0], xyxy[1]), (xyxy[2], xyxy[3]), (0, 255, 0), 2) cv2.imwrite(filename, frame) print(f"📸 偵測到 {best_cls_name},儲存:{filename}") assert os.path.exists(filename) diff = (timestamp - last_detection_time).total_seconds() if last_detection_time else 0.0 with open(log_path, "a") as f: f.write(f"{frame_counter},{timestamp},{diff:.3f},{filename},{best_cls_name},{best_conf:.4f}\n") last_detection_time = timestamp frame_counter += 1 return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # === Webcam 處理 === def get_annotated_frame(): global latest_frame with lock: frame = latest_frame.copy() if latest_frame is not None else None if frame is None: return None return detect_and_save(frame) def streaming_loop(): global streaming while streaming: frame = get_annotated_frame() if frame is not None: with lock: cv2.imwrite("latest_stream.png", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)) time.sleep(0.2) def start_stream(): global streaming streaming = True threading.Thread(target=streaming_loop, daemon=True).start() def stop_stream(): global streaming streaming = False # === 影片偵測 === def detect_video(video_path): cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (W, H)) while cap.isOpened(): ret, frame = cap.read() if not ret: break annotated = detect_and_save(frame) out.write(cv2.cvtColor(annotated, cv2.COLOR_RGB2BGR)) cap.release() out.release() print(f"✅ 影片處理完成:{output_path}") return output_path # === ZIP 功能 === def create_zip(): zip_path = "detection_package.zip" with zipfile.ZipFile(zip_path, "w") as zipf: for fname in os.listdir(save_dir): fpath = os.path.join(save_dir, fname) if os.path.isfile(fpath): zipf.write(fpath, arcname=os.path.join("saved_bears", fname)) if os.path.exists(log_path): zipf.write(log_path, arcname="detection_log.csv") return zip_path def read_csv(): if os.path.exists(log_path): df = pd.read_csv(log_path) if "frame_id" in df.columns: return df.sort_values(by="frame_id", ascending=False).reset_index(drop=True) return df return [] def get_latest_image(): return "latest_stream.png" if os.path.exists("latest_stream.png") else None # === 啟動 webcam 執行緒 === threading.Thread(target=webcam_reader, daemon=True).start() # === Gradio UI === with gr.Blocks() as demo: gr.Markdown("## 🐻 台灣黑熊偵測系統") with gr.Tab("📹 上傳影片辨識"): gr.Markdown("上傳影片,逐幀偵測台灣黑熊,並自動儲存出現畫面") video_input = gr.Video() video_output = gr.Video() video_button = gr.Button("上傳並分析影片") video_button.click(fn=detect_video, inputs=video_input, outputs=video_output) with gr.Tab("📷 即時攝影機偵測"): gr.Markdown("啟用 webcam 進行即時偵測,若出現台灣黑熊則自動儲存影像") webcam_output = gr.Image( label="即時辨識結果", interactive=False, type="filepath", value=get_latest_image, every=0.2 ) with gr.Row(): start_btn = gr.Button("▶️ 開始直播") stop_btn = gr.Button("⏹ 停止直播") start_btn.click(fn=start_stream, inputs=[], outputs=[]) stop_btn.click(fn=stop_stream, inputs=[], outputs=[]) with gr.Tab("📁 下載與預覽"): gr.Markdown("### 預覽與下載偵測圖片與紀錄檔") log_df = gr.Dataframe(label="detection_log.csv 預覽", interactive=False) load_log_btn = gr.Button("🔄 重新載入紀錄檔") load_log_btn.click(fn=read_csv, outputs=log_df) csv_file = gr.File(value=log_path, label="⬇️ 下載 CSV 檔") gr.Markdown("### 打包圖片與紀錄檔(.zip)") zip_btn = gr.Button("📦 產生 ZIP 檔") zip_file = gr.File(label="⬇️ 點我下載壓縮檔") zip_btn.click(fn=create_zip, outputs=zip_file) # 啟動 if __name__ == "__main__": demo.launch()