Spaces:
Sleeping
Sleeping
import base64 | |
import random | |
import shutil | |
import time | |
from openai import OpenAI | |
import glob | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from sklearn import svm | |
import zipfile | |
from PIL import Image | |
from sklearn.decomposition import PCA | |
from PIL import Image | |
import numpy as np | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.svm import OneClassSVM | |
import numpy as np | |
import skimage | |
from skimage.feature import hog | |
from skimage.color import rgb2gray | |
from skimage import io | |
from sklearn.decomposition import PCA | |
from sklearn.svm import OneClassSVM | |
from sklearn.preprocessing import StandardScaler | |
import os | |
from tqdm import tqdm | |
import pickle | |
import joblib | |
import cv2 | |
import streamlit as st | |
from streamlit_image_select import image_select | |
def cut_video(video_path): | |
# video_path = '/Users/ducky/Downloads/thief_1.mp4' | |
cap = cv2.VideoCapture(video_path) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
frames_dir = "./data/video_frame" | |
if os.path.exists(frames_dir): | |
shutil.rmtree(frames_dir) | |
os.makedirs(frames_dir, exist_ok=True) | |
frame_count = 0 | |
frame_times = [] | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
timestamp_ms = (frame_count / fps) * 1000 | |
minutes = int(timestamp_ms // 60000) | |
seconds = int((timestamp_ms % 60000) // 1000) | |
milliseconds = int(timestamp_ms % 1000) | |
time_formatted = f"{minutes:02}:{seconds:02}:{milliseconds:03}" | |
frame_times.append(time_formatted) | |
frame_file_path = os.path.join(frames_dir, f'frame_{frame_count:04d}.jpg') | |
cv2.imwrite(frame_file_path, frame) | |
frame_count += 1 | |
cap.release() | |
return frames_dir, frame_times | |
def extract_hog_features(image_path): | |
""" | |
画像ファイルからHOG特徴量を抽出します。 | |
:param image_path: 画像ファイルのパス | |
:return: HOG特徴量のNumPy配列 | |
""" | |
# 画像を読み込む | |
img = io.imread(image_path) | |
img = img[:,:,:3] | |
# 画像をグレースケールに変換 | |
gray_img = rgb2gray(img) | |
# HOG特徴量を抽出 | |
features, _ = hog(gray_img, visualize=True, block_norm='L2-Hys') | |
return features | |
def prepare_features(image_paths): | |
""" | |
複数の画像からHOG特徴量を抽出し、特徴量の行列を作成します。 | |
:param image_paths: 画像ファイルのパスのリスト | |
:return: 特徴量のNumPy配列 | |
""" | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
features = [] | |
for i, path in enumerate(tqdm(image_paths)): | |
features.append(extract_hog_features(path)) | |
progress = int((i + 1) / len(image_paths) * 100) | |
progress_bar.progress(progress) | |
status_text.text(f"Processing image {i+1}/{len(image_paths)}: {path}") | |
progress_bar.empty() | |
status_text.text("Processing complete!") | |
status_text.empty() | |
return np.array(features) | |
def run_pca(features): | |
pca = PCA(n_components=4) | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
def simulate_pca_progress(progress_bar, status_text, total_steps=100): | |
for step in range(total_steps): | |
progress_bar.progress(int((step + 1) / total_steps * 100)) | |
status_text.text(f"PCA Transformation Progress: {int((step + 1) / total_steps * 100)}%") | |
time.sleep(0.1) | |
simulate_pca_progress(progress_bar, status_text) | |
transformed_data = pca.fit_transform(features) | |
status_text.text("PCA Transformation Complete!") | |
progress_bar.empty() | |
status_text.empty() | |
return pca, transformed_data | |
def run_standard_scale(features): | |
scaler = StandardScaler() | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
def simulate_ss_progress(progress_bar, status_text, total_steps=100): | |
for step in range(total_steps): | |
progress_bar.progress(int((step + 1) / total_steps * 100)) | |
status_text.text(f"StandardScaler Transformation Progress: {int((step + 1) / total_steps * 100)}%") | |
time.sleep(0.1) | |
simulate_ss_progress(progress_bar, status_text) | |
transformed_data = scaler.fit_transform(features) | |
status_text.text("StandardScaler Transformation Complete!") | |
progress_bar.empty() | |
status_text.empty() | |
return scaler, transformed_data | |
def run_OneClassSVM(z_train): | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
clf = svm.OneClassSVM(nu=0.2, kernel="rbf", gamma=0.001) | |
def simulate_fitting_progress(clf, z_train, total_steps=100): | |
for step in range(total_steps): | |
time.sleep(0.05) | |
progress_bar.progress(step + 1) | |
status_text.text(f"Fitting model... {step + 1}% complete") | |
clf.fit(z_train) | |
progress_bar.empty() | |
status_text.empty() | |
simulate_fitting_progress(clf, z_train) | |
return clf | |
def predict_with_progress(clf, features_array): | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
predictions = np.zeros(features_array.shape[0]) | |
for i in range(features_array.shape[0]): | |
predictions[i] = clf.predict(features_array[i].reshape(1, -1)) | |
# predictions[i] = clf.decision_function(features_array[i].reshape(1, -1)) | |
progress = int((i + 1) / features_array.shape[0] * 100) | |
progress_bar.progress(progress) | |
status_text.text(f"Predicting... {progress}% complete") | |
progress_bar.empty() | |
status_text.empty() | |
return predictions | |
def prepare_all_displayed_anomalies(frames_dir, predictions): | |
anomaly_indices = [index for index, value in enumerate(predictions) if value == -1] | |
anomaly_indices.sort() | |
frames = os.listdir(frames_dir) | |
frames.sort() | |
anomaly_folder = "./data/anomaly" | |
os.makedirs(anomaly_folder, exist_ok=True) | |
anomaly_paths = [] | |
frame_number = 0 | |
anomaly_count = 0 | |
for frame in frames: | |
frame_path = os.path.join(frames_dir, frame) | |
if frame_number == anomaly_indices[anomaly_count]: | |
anomaly_frame_path = os.path.join(anomaly_folder, f'frame_{frame_number:04d}.jpg') | |
shutil.copy(frame_path, anomaly_frame_path) | |
anomaly_paths.append(anomaly_frame_path) | |
anomaly_count += 1 | |
if anomaly_count >= len(anomaly_indices): break | |
frame_number += 1 | |
return anomaly_paths | |
def prepare_3_displayed_anomalies(frames_dir, predictions, frame_times): | |
anomaly_frames = [index for index, value in enumerate(predictions) if value == -1] | |
indices = random.sample(range(len(anomaly_frames)), 3) | |
anomaly_frames = [anomaly_frames[i] for i in indices] | |
anomaly_frames.sort() | |
frames = os.listdir(frames_dir) | |
frames.sort() | |
anomaly_folder = "./data/anomaly" | |
os.makedirs(anomaly_folder, exist_ok=True) | |
anomaly_paths = [] | |
frame_number = 0 | |
anomaly_count = 0 | |
for frame in frames: | |
frame_path = os.path.join(frames_dir, frame) | |
if frame_number == anomaly_frames[anomaly_count]: | |
anomaly_frame_path = os.path.join(anomaly_folder, f'frame_{frame_number:04d}.jpg') | |
shutil.copy(frame_path, anomaly_frame_path) | |
anomaly_paths.append([anomaly_frame_path, frame_times[frame_number]]) | |
anomaly_count += 1 | |
if anomaly_count >= len(anomaly_frames): break | |
frame_number += 1 | |
return anomaly_paths | |
def OneClassSvm_anomaly_detection(image_paths): | |
features = prepare_features(image_paths) | |
_, features_scaled = run_standard_scale(features) | |
_, z_train = run_pca(features_scaled) | |
clf = run_OneClassSVM(z_train) | |
return clf, z_train | |
def encode_image(image_path): | |
with open(image_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode("utf-8") | |
def get_response(model, client, image_path): | |
base64_image = encode_image(image_path) | |
response = client.chat.completions.create( | |
model=model, | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant that responds in Markdown. Help me with task!"}, | |
{"role": "user", "content": [ | |
{"type": "text", "text": "この画像の中の人物が行っている活動を説明してください"}, | |
{"type": "image_url", "image_url": { | |
"url": f"data:image/png;base64,{base64_image}"} | |
} | |
]} | |
], | |
temperature=0.0, | |
) | |
return response.choices[0].message.content | |
def VLM_anomaly_detection(anomaly_paths): | |
model = "gpt-4o" | |
API_KEY = os.getenv("MY_API_KEY") | |
client = OpenAI(api_key=API_KEY) | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
st.session_state.responses = [] | |
anomaly_paths = [path[0] for path in anomaly_paths] | |
for i, anomaly_path in enumerate(tqdm(anomaly_paths)): | |
progress = int((i + 1) / len(anomaly_paths) * 100) | |
progress_bar.progress(progress) | |
status_text.text(f"Running VLM {i+1}/{len(anomaly_paths)}") | |
response = get_response(model, client, anomaly_path) | |
st.session_state.responses.append(response) | |
progress_bar.empty() | |
status_text.text("Processing complete!") | |
status_text.empty() | |
def main(): | |
if 'responses' not in st.session_state: | |
st.session_state.responses = [] | |
if 'display_anomalies' not in st.session_state: | |
st.session_state.display_anomalies = [] | |
with st.sidebar: | |
st.image("logo.png") | |
uploaded_video = st.file_uploader("Upload video", type=["mp4", "mov", "avi"]) | |
os.makedirs("./data", exist_ok=True) | |
if uploaded_video is not None: | |
video_file_path = "./data/uploaded_video.mp4" | |
with open(video_file_path, "wb") as f: | |
f.write(uploaded_video.read()) | |
st.video(uploaded_video, start_time=0) | |
for _ in range(3): st.write(" ") | |
st.write("サイドバーより動画としてアップロードし推論ボタンをクリック") | |
if st.button("推論開始"): | |
with st.spinner("データを学習中、少々お待ちください..."): | |
video_file_path = "./data/uploaded_video.mp4" | |
frames_dir, frame_times = cut_video(video_file_path) | |
image_paths = [os.path.join(frames_dir, image_path) for image_path in os.listdir(frames_dir)] | |
clf, z_train = OneClassSvm_anomaly_detection(image_paths) | |
with st.spinner("学習が完了しました。異常検知を行っています..."): | |
predictions = predict_with_progress(clf, z_train) | |
st.session_state.display_anomalies = [] | |
st.session_state.display_anomalies = prepare_3_displayed_anomalies(frames_dir, predictions, frame_times) | |
VLM_anomaly_detection(st.session_state.display_anomalies) | |
if st.session_state.display_anomalies: | |
anomaly_paths = [path[0] for path in st.session_state.display_anomalies] | |
anomaly_time = [str(path[1]) for path in st.session_state.display_anomalies] | |
selected = image_select( | |
label = "「異常」である可能性があるフレーム", | |
images = anomaly_paths, | |
captions = anomaly_time, | |
key = "image_select" | |
) | |
selected_img = str(selected)[:100] | |
idx = anomaly_paths.index(selected_img) | |
st.info(st.session_state.responses[idx]) | |
if __name__ == "__main__": | |
main() |