Spaces:
Sleeping
Sleeping
import argparse | |
import queue | |
import time | |
import threading | |
import logging | |
import os | |
from PIL import Image | |
from daily import EventHandler, CallClient, Daily | |
from datetime import datetime | |
from dotenv import load_dotenv | |
from auth import get_meeting_token, get_room_name | |
from pipeline import Pipeline | |
from device import device, torch_dtype | |
load_dotenv() | |
class DailyVision(EventHandler): | |
def __init__( | |
self, | |
room_url, | |
room_name, | |
expiration, | |
idle, | |
bot_name="Daily Bot" | |
): | |
self.__client = CallClient(event_handler=self) | |
self.__pipeline = Pipeline | |
self.__camera = None | |
self.__time = time.time() | |
#self.__queue = queue.Queue() | |
self.__app_quit = False | |
self.__image_buffer = None | |
self.__bot_name = bot_name | |
self.__room_url = room_url | |
self.__room_name = room_name | |
self.__expiration = expiration | |
self.__params = Pipeline.InputParams() | |
self.__idle = idle | |
# Create the pipeline (this might take a moment) | |
self.__pipeline = Pipeline(device, torch_dtype) | |
#print(self.__pipeline.InputParams.schema()) | |
# Configure logger | |
FORMAT = f"%(asctime)s {self.__room_url} %(message)s" | |
logging.basicConfig(format=FORMAT) | |
self.logger = logging.getLogger("bot-instance") | |
self.logger.setLevel(logging.DEBUG) | |
self.logger.info(f"Expiration timer set to: {self.__expiration}") | |
# Setup camera | |
self.setup_camera() | |
def run(self, meeting_url, token): | |
# Join | |
self.logger.info(f"Connecting to room {meeting_url} as {self.__bot_name}") | |
self.__client.set_user_name(self.__bot_name) | |
self.__client.join(meeting_url, token, completion=self.on_joined) | |
#self.__participant_id = self.client.participants()["local"]["id"] | |
# Start thread | |
self.__thread = threading.Thread(target = self.process_frames) | |
self.__thread.start() | |
# Keep-alive on thread | |
self.__thread.join() | |
def leave(self): | |
self.logger.info(f"Leaving...") | |
self.__app_quit = True | |
self.__thread.join() | |
self.__client.leave() | |
def on_joined(self, join_data, client_error): | |
self.logger.info(f"call_joined: {join_data}, {client_error}") | |
def on_participant_joined(self, participant): | |
self.logger.info(f"Participant {participant['id']} joined, analyzing frames...") | |
self.__client.set_video_renderer(participant["id"], self.on_video_frame, color_format="RGB") | |
# Say hello | |
self.wave() | |
def setup_camera(self): | |
if not self.__camera: | |
self.__camera = Daily.create_camera_device("camera", | |
width = 640, | |
height = 480, | |
color_format="RGB") | |
self.__client.update_inputs({ | |
"camera": { | |
"isEnabled": True, | |
"settings": { | |
"deviceId": "camera" | |
} | |
} | |
}) | |
def process_frames(self): | |
params = self.__params | |
while not self.__app_quit: | |
# Is anyone watching? | |
if not self.__idle and len(self.__client.participants()) < 2: | |
self.logger.info(f"No partcipants in channel. Exiting...") | |
self.__app_quit = True | |
break | |
# Check expiry timer | |
if time.time() > self.__expiration: | |
self.logger.info(f"Expiration timer exceeded. Exiting...") | |
self.__app_quit = True | |
break | |
try: | |
#video_frame = self.__queue.get(timeout=5) | |
video_frame = self.__image_buffer | |
if not video_frame == None: | |
image = Image.frombytes("RGB", (video_frame.width, video_frame.height), video_frame.buffer) | |
result_image = self.__pipeline.predict(params, image) | |
self.__camera.write_frame(result_image.tobytes()) | |
except queue.Empty: | |
pass | |
def on_video_frame(self, participant_id, video_frame): | |
# Process ~15 frames per second (considering incoming frames at 30fps). | |
if time.time() - self.__time > float(os.getenv("FPS_CAP", 0.0333)): | |
self.__time = time.time() | |
self.__image_buffer = video_frame | |
#self.__queue.put(video_frame) | |
def on_app_message(self, message, sender): | |
# Update pipeline settings based on message data | |
print(message) | |
self.__params = self.__pipeline.InputParams(**message) | |
print(self.__params) | |
#print(self.__pipeline.Info()) | |
return | |
def wave(self): | |
self.__client.send_app_message( | |
{ | |
"prompt": self.__params.prompt, | |
} | |
) | |
def main(): | |
parser = argparse.ArgumentParser(description="Daily Bot") | |
# Required args | |
parser.add_argument("-u", "--url", required=True, type=str, help="URL of the Daily room") | |
parser.add_argument("-k", "--api_key", required=True, type=str, help="Daily API key") | |
# Optional args | |
parser.add_argument("-t", "--private", type=bool, help="Is this room private?", default=True) | |
parser.add_argument("-n", "--bot-name", type=str, help="Name of the bot", default="Daily Bot") | |
parser.add_argument("-e", "--expiration", type=int, help="Duration of bot", default=os.getenv("BOT_MAX_DURATION", 300)) | |
parser.add_argument("-i", "--idle", type=bool, help="Wait for participants to join", default=os.getenv("BOT_WILL_IDLE", False)) | |
args = parser.parse_args() | |
Daily.init() | |
expiration = time.time() + args.expiration | |
room_name = get_room_name(args.url) | |
# Retrieve a meeting token, if not provided | |
#@TODO do room lookup to check privacy | |
if args.private: | |
token = get_meeting_token(room_name, args.api_key, expiration) | |
app = DailyVision(args.url, room_name, expiration, args.idle, args.bot_name) | |
try : | |
app.run(args.url, token) | |
except KeyboardInterrupt: | |
print("Ctrl-C detected. Exiting!") | |
finally: | |
app.leave() | |
# Let leave finish | |
time.sleep(2) | |
if __name__ == '__main__': | |
main() |