IoT-Flask-Webserver / device_core.py
Teapack1's picture
Update device_core.py
83de6ee
import cv2
from datetime import datetime, timedelta
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import threading
import os
import time
class Camera:
def __init__(self, id, name, index=0, fps=1, save_duration=0.1, rtsp=None):
self.name = name
self.id = id
self.fps = fps
self.save_duration = save_duration
self.index = rtsp if rtsp else index
# Initialize camera in a separate thread to avoid blocking
self.out = None
self.cap = None
self.frame_width = 1980
self.frame_height = 720
self.frame_count = 0
self.start_time = time.time()
self.initialization_thread = threading.Thread(target=self.initialize_camera)
self.initialization_thread.start()
def initialize_camera(self):
self.cap = cv2.VideoCapture(self.index)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.frame_width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.frame_height)
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
# Wait for camera to open or timeout after 10 seconds
start_time = time.time()
while not self.cap.isOpened():
time.sleep(0.1)
if time.time() - start_time > 10: # 10 seconds timeout
print(f"Timeout: Unable to open camera {self.id} - {self.name}")
break
def save_frame(self, frame):
filename = "video/frames/frame_{}.jpg".format(self.frame_count)
cv2.imwrite(filename, frame)
self.frame_count += 1
def create_video_from_frames(self):
filename = "video/outpy_{}.mp4".format(time.strftime("%Y%m%d-%H%M%S"))
self.out = cv2.VideoWriter(filename, cv2.VideoWriter_fourcc(*'mp4v'), self.fps, (self.frame_width, self.frame_height))
for i in range(self.frame_count):
frame = cv2.imread("video/frames/frame_{}.jpg".format(i))
self.out.write(frame)
os.remove("video/frames/frame_{}.jpg".format(i))
self.out.release()
self.frame_count = 0
def gen_frames(self):
i = 0
if not self.cap or not self.cap.isOpened():
yield b'' # Return empty bytes if camera is not available
return
while True:
success, frame = self.cap.read()
if not success:
break
else:
if i % 20 == 0:
self.save_frame(frame)
ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 100])
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
i += 1
if time.time() - self.start_time > self.save_duration * 3600:
self.create_video_from_frames()
self.start_time = time.time()
def close_camera(self):
try:
self.cap.release()
except: print("No camera to release")
try:
self.out.release()
except: print("No video to release")
class Sensor:
def __init__(self, id, name):
self.id = id
self.name = name
self.time = datetime.now()
self.measurement = ["Spotřeba","Napětí","Výkon","Teplota","Vlhkost","Vlhkost"]
self.values = [12,None,50,None,None,None] # store sensor values in a dictionary
self.units = ["A","V","D",None,None,None] # store sensor units in a dictionary
def update_values(self, data, units):
self.values = [round(float(d), 4) for d in data]
self.units = units
self.time = datetime.now()
print(self.values, self.units)
def find(devices, id):
for device in devices:
if device.id == id:
return device
return None # device not found
def plot(self, Db_class, title = "graph"):
# Get the current date and time
now = datetime.now()
# Calculate the date and time 10 days ago
ten_days_ago = now - timedelta(days=300)
# Query the database for records from the last 10 days
ele_1 = Db_class.query.filter(Db_class.sensor_id == self.id, Db_class.time >= ten_days_ago).all()
x = [record.time for record in ele_1]
y1 = [record.value4 for record in ele_1]
y2 = [record.value3 for record in ele_1] # Assuming value2 exists
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Add traces
fig.add_trace(
go.Scatter(x=x, y=y1, name="Spotřeba['kWh'])"),
secondary_y=False,
)
fig.add_trace(
go.Scatter(x=x, y=y2, name="Aktuální výkon['W']"),
secondary_y=True,
)
fig.update_layout(
autosize=True,
title=title,
width=800,
height=512,
margin=dict(
l=40,
r=20,
b=20,
t=60,
pad=2
),
paper_bgcolor='#F5F5F5',
plot_bgcolor='#FFFFFF'
)
fig.update_xaxes(title_text='Datum a čas')
fig.update_yaxes(title_text="Spotřeba [kWh]", secondary_y=False)
fig.update_yaxes(title_text="Aktuální výkon [W]", secondary_y=True)
return fig