Spaces:
Runtime error
Runtime error
import streamlit as st | |
from streamlit.components.v1 import html | |
import cv2 | |
import numpy as np | |
from PIL import Image | |
import io | |
import time | |
from selenium import webdriver | |
from transformers import pipeline | |
import torch | |
import torch.nn.functional as F | |
from ollama import Client | |
import base64 | |
from io import BytesIO | |
import yaml | |
import psutil | |
import threading | |
from streamlit_webrtc import WebRtcMode, webrtc_streamer | |
OPENCV_AVFOUNDATION_SKIP_AUTH=1 | |
def main(): | |
st.title('SpatialSense') | |
st.write('Github: https://github.com/kabir12345/SpatialSense') | |
# Initialize the depth-estimation pipeline | |
pipe = pipeline(task="depth-estimation", model="LiheYoung/depth-anything-small-hf") | |
# Streamlit-WebRTC component | |
webrtc_ctx = webrtc_streamer(key="example", mode=WebRtcMode.SENDRECV) | |
if webrtc_ctx.video_receiver: | |
while True: | |
frame = webrtc_ctx.video_receiver.get_frame(timeout=None) | |
if frame is None: | |
continue | |
image = frame.to_ndarray(format="bgr24") | |
pil_img = Image.fromarray(image) | |
# Perform depth estimation | |
depth_mask = apply_depth_estimation(pipe, pil_img) | |
# Convert PIL Image to NumPy array for display in Streamlit | |
depth_mask_np = np.array(depth_mask) | |
# Display the processed image | |
st.image(depth_mask_np, caption="Processed Depth Image", channels="BGR") | |
def apply_depth_estimation(pipe, pil_img): | |
# Assume the rest of your depth estimation logic is defined here | |
original_width, original_height = pil_img.size | |
depth = pipe(pil_img)["depth"] | |
depth_tensor = torch.from_numpy(np.array(depth)).unsqueeze(0).unsqueeze(0).float() | |
depth_resized = F.interpolate(depth_tensor, size=(original_height, original_width), mode='bilinear', align_corners=False)[0, 0] | |
depth_normalized = (depth_resized - depth_resized.min()) / (depth_resized.max() - depth_resized.min()) * 255.0 | |
depth_normalized_np = depth_normalized.byte().cpu().numpy() | |
colored_depth = cv2.applyColorMap(depth_normalized_np, cv2.COLORMAP_INFERNO) | |
colored_depth_rgb = cv2.cvtColor(colored_depth, cv2.COLOR_BGR2RGB) | |
colored_depth_image = Image.fromarray(colored_depth_rgb) | |
return colored_depth_image | |
def encode_image_to_base64(pil_img): | |
buffered = BytesIO() | |
pil_img.save(buffered, format="JPEG") # You can change to "PNG" if you prefer | |
return base64.b64encode(buffered.getvalue()).decode('utf-8') | |
def handle_user_query(query, image_path, text_placeholder): | |
if query: | |
client = Client(host='http://localhost:11434') | |
response = client.chat(model='llava:7b-v1.5-q2_K', messages=[ | |
{ | |
'role': 'user', | |
'content': query, | |
'images': [image_path] # Pass the path to the temporary file | |
}, | |
]) | |
# Assuming response returns correctly, extract the response content if necessary | |
response_content = str(response['message']['content']) # Adjust based on how the response content is structured | |
text_placeholder.text(response_content) | |
def update_cpu_usage(): | |
while True: | |
cpu_usage = psutil.cpu_percent(interval=1) | |
st.session_state.cpu_usage = f"CPU Usage: {cpu_usage}%" | |
time.sleep(5) # Update every 5 seconds, adjust as needed | |
if __name__ == "__main__": | |
main() |