Spaces:
Sleeping
Sleeping
import cv2 | |
from datetime import datetime, timedelta | |
from plotly.subplots import make_subplots | |
import plotly.graph_objects as go | |
import threading | |
import os | |
import time | |
class Camera: | |
def __init__(self, id, name, index=0, fps=1, save_duration=0.1, rtsp=None): | |
self.name = name | |
self.id = id | |
self.fps = fps | |
self.save_duration = save_duration | |
self.index = rtsp if rtsp else index | |
# Initialize camera in a separate thread to avoid blocking | |
self.out = None | |
self.cap = None | |
self.frame_width = 1980 | |
self.frame_height = 720 | |
self.frame_count = 0 | |
self.start_time = time.time() | |
self.initialization_thread = threading.Thread(target=self.initialize_camera) | |
self.initialization_thread.start() | |
def initialize_camera(self): | |
self.cap = cv2.VideoCapture(self.index) | |
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.frame_width) | |
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.frame_height) | |
self.cap.set(cv2.CAP_PROP_FPS, self.fps) | |
# Wait for camera to open or timeout after 10 seconds | |
start_time = time.time() | |
while not self.cap.isOpened(): | |
time.sleep(0.1) | |
if time.time() - start_time > 10: # 10 seconds timeout | |
print(f"Timeout: Unable to open camera {self.id} - {self.name}") | |
break | |
def save_frame(self, frame): | |
filename = "video/frames/frame_{}.jpg".format(self.frame_count) | |
cv2.imwrite(filename, frame) | |
self.frame_count += 1 | |
def create_video_from_frames(self): | |
filename = "video/outpy_{}.mp4".format(time.strftime("%Y%m%d-%H%M%S")) | |
self.out = cv2.VideoWriter(filename, cv2.VideoWriter_fourcc(*'mp4v'), self.fps, (self.frame_width, self.frame_height)) | |
for i in range(self.frame_count): | |
frame = cv2.imread("video/frames/frame_{}.jpg".format(i)) | |
self.out.write(frame) | |
os.remove("video/frames/frame_{}.jpg".format(i)) | |
self.out.release() | |
self.frame_count = 0 | |
def gen_frames(self): | |
i = 0 | |
if not self.cap or not self.cap.isOpened(): | |
yield b'' # Return empty bytes if camera is not available | |
return | |
while True: | |
success, frame = self.cap.read() | |
if not success: | |
break | |
else: | |
if i % 20 == 0: | |
self.save_frame(frame) | |
ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 100]) | |
frame = buffer.tobytes() | |
yield (b'--frame\r\n' | |
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') | |
i += 1 | |
if time.time() - self.start_time > self.save_duration * 3600: | |
self.create_video_from_frames() | |
self.start_time = time.time() | |
def close_camera(self): | |
try: | |
self.cap.release() | |
except: print("No camera to release") | |
try: | |
self.out.release() | |
except: print("No video to release") | |
class Sensor: | |
def __init__(self, id, name): | |
self.id = id | |
self.name = name | |
self.time = datetime.now() | |
self.measurement = ["Spotřeba","Napětí","Výkon","Teplota","Vlhkost","Vlhkost"] | |
self.values = [12,None,50,None,None,None] # store sensor values in a dictionary | |
self.units = ["A","V","D",None,None,None] # store sensor units in a dictionary | |
def update_values(self, data, units): | |
self.values = [round(float(d), 4) for d in data] | |
self.units = units | |
self.time = datetime.now() | |
print(self.values, self.units) | |
def find(devices, id): | |
for device in devices: | |
if device.id == id: | |
return device | |
return None # device not found | |
def plot(self, Db_class, title = "graph"): | |
# Get the current date and time | |
now = datetime.now() | |
# Calculate the date and time 10 days ago | |
ten_days_ago = now - timedelta(days=300) | |
# Query the database for records from the last 10 days | |
ele_1 = Db_class.query.filter(Db_class.sensor_id == self.id, Db_class.time >= ten_days_ago).all() | |
x = [record.time for record in ele_1] | |
y1 = [record.value4 for record in ele_1] | |
y2 = [record.value3 for record in ele_1] # Assuming value2 exists | |
fig = make_subplots(specs=[[{"secondary_y": True}]]) | |
# Add traces | |
fig.add_trace( | |
go.Scatter(x=x, y=y1, name="Spotřeba['kWh'])"), | |
secondary_y=False, | |
) | |
fig.add_trace( | |
go.Scatter(x=x, y=y2, name="Aktuální výkon['W']"), | |
secondary_y=True, | |
) | |
fig.update_layout( | |
autosize=True, | |
title=title, | |
width=800, | |
height=512, | |
margin=dict( | |
l=40, | |
r=20, | |
b=20, | |
t=60, | |
pad=2 | |
), | |
paper_bgcolor='#F5F5F5', | |
plot_bgcolor='#FFFFFF' | |
) | |
fig.update_xaxes(title_text='Datum a čas') | |
fig.update_yaxes(title_text="Spotřeba [kWh]", secondary_y=False) | |
fig.update_yaxes(title_text="Aktuální výkon [W]", secondary_y=True) | |
return fig |