Spaces:
Sleeping
Sleeping
| import time, threading | |
| from datetime import datetime | |
| from typing import Dict, Any | |
| class CiTriggerEngine: | |
| def __init__(self): | |
| self.events: Dict[str, Any] = {} | |
| self.active = True | |
| def add_input(self, data: Dict[str, Any]): | |
| self.events[data['id']] = data | |
| print(f"[TRIGGER] Подію {data['id']} додано до черги.") | |
| def process_inputs(self): | |
| while self.active: | |
| now = datetime.utcnow() | |
| for eid, ev in list(self.events.items()): | |
| ts = datetime.fromisoformat(ev['timestamp']) | |
| if now >= ts: | |
| self.trigger(ev) | |
| del self.events[eid] | |
| time.sleep(1) | |
| def trigger(self, event): | |
| print(f"[CiTrigger] ⚡ Активовано вхід: {event['назва']}") | |
| trigger_engine = CiTriggerEngine() | |
| threading.Thread(target=trigger_engine.process_inputs, daemon=True).start() |