Spaces:
Runtime error
Runtime error
File size: 2,066 Bytes
2485dd8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# import json
import uuid
class Room:
def __init__(self, room_id) -> None:
self.room_id = room_id
# members is a dict from client_id to Member
self.members = {}
# listeners and speakers are lists of client_id's
self.listeners = []
self.speakers = []
def __str__(self) -> str:
return f"Room {self.room_id} ({len(self.members)} member{'s' if len(self.members) == 1 else ''})"
def to_json(self):
varsResult = vars(self)
# Remember: result is just a shallow copy, so result.members === self.members
# Because of that, we need to jsonify self.members without writing over result.members,
# which we do here via dictionary unpacking (the ** operator)
result = {
**varsResult,
"members": {key: value.to_json() for (key, value) in self.members.items()},
"activeTranscoders": self.get_active_transcoders(),
}
return result
def get_active_connections(self):
return len(
[m for m in self.members.values() if m.connection_status == "connected"]
)
def get_active_transcoders(self):
return len([m for m in self.members.values() if m.transcoder is not None])
def get_room_status_dict(self):
return {
"activeConnections": self.get_active_connections(),
"activeTranscoders": self.get_active_transcoders(),
}
class Member:
def __init__(self, client_id, session_id, name) -> None:
self.client_id = client_id
self.session_id = session_id
self.name = name
self.connection_status = "connected"
self.transcoder = None
self.requested_output_type = None
self.transcoder_dynamic_config = None
def __str__(self) -> str:
return f"{self.name} (id: {self.client_id[:4]}...) ({self.connection_status})"
def to_json(self):
self_vars = vars(self)
return {
**self_vars,
"transcoder": self.transcoder is not None,
}
|