Jon Taylor
added rendering logic back
8e06a0f
raw
history blame
6.41 kB
import argparse
import queue
import time
import threading
import logging
import os
from PIL import Image
from daily import EventHandler, CallClient, Daily
from datetime import datetime
from dotenv import load_dotenv
from auth import get_meeting_token, get_room_name
from pipeline import Pipeline
from device import device, torch_dtype
load_dotenv()
class DailyVision(EventHandler):
def __init__(
self,
room_url,
room_name,
expiration,
idle,
bot_name="Daily Bot"
):
self.__client = CallClient(event_handler=self)
self.__pipeline = Pipeline
self.__camera = None
self.__time = time.time()
#self.__queue = queue.Queue()
self.__app_quit = False
self.__image_buffer = None
self.__bot_name = bot_name
self.__room_url = room_url
self.__room_name = room_name
self.__expiration = expiration
self.__params = Pipeline.InputParams()
self.__idle = idle
# Create the pipeline (this might take a moment)
self.__pipeline = Pipeline(device, torch_dtype)
#print(self.__pipeline.InputParams.schema())
# Configure logger
FORMAT = f"%(asctime)s {self.__room_url} %(message)s"
logging.basicConfig(format=FORMAT)
self.logger = logging.getLogger("bot-instance")
self.logger.setLevel(logging.DEBUG)
self.logger.info(f"Expiration timer set to: {self.__expiration}")
# Setup camera
self.setup_camera()
def run(self, meeting_url, token):
# Join
self.logger.info(f"Connecting to room {meeting_url} as {self.__bot_name}")
self.__client.set_user_name(self.__bot_name)
self.__client.join(meeting_url, token, completion=self.on_joined)
#self.__participant_id = self.client.participants()["local"]["id"]
# Start thread
self.__thread = threading.Thread(target = self.process_frames)
self.__thread.start()
# Keep-alive on thread
self.__thread.join()
def leave(self):
self.logger.info(f"Leaving...")
self.__app_quit = True
self.__thread.join()
self.__client.leave()
def on_joined(self, join_data, client_error):
self.logger.info(f"call_joined: {join_data}, {client_error}")
def on_participant_joined(self, participant):
self.logger.info(f"Participant {participant['id']} joined, analyzing frames...")
self.__client.set_video_renderer(participant["id"], self.on_video_frame, color_format="RGB")
# Say hello
self.wave()
def setup_camera(self):
if not self.__camera:
self.__camera = Daily.create_camera_device("camera",
width = 640,
height = 480,
color_format="RGB")
self.__client.update_inputs({
"camera": {
"isEnabled": True,
"settings": {
"deviceId": "camera"
}
}
})
def process_frames(self):
params = self.__params
while not self.__app_quit:
# Is anyone watching?
if not self.__idle and len(self.__client.participants()) < 2:
self.logger.info(f"No partcipants in channel. Exiting...")
self.__app_quit = True
break
# Check expiry timer
if time.time() > self.__expiration:
self.logger.info(f"Expiration timer exceeded. Exiting...")
self.__app_quit = True
break
try:
#video_frame = self.__queue.get(timeout=5)
video_frame = self.__image_buffer
if not video_frame == None:
image = Image.frombytes("RGB", (video_frame.width, video_frame.height), video_frame.buffer)
result_image = self.__pipeline.predict(params, image)
self.__camera.write_frame(result_image.tobytes())
except queue.Empty:
pass
def on_video_frame(self, participant_id, video_frame):
# Process ~15 frames per second (considering incoming frames at 30fps).
if time.time() - self.__time > float(os.getenv("FPS_CAP", 0.0333)):
self.__time = time.time()
self.__image_buffer = video_frame
#self.__queue.put(video_frame)
def on_app_message(self, message, sender):
# Update pipeline settings based on message data
print(message)
self.__params = self.__pipeline.InputParams(**message)
print(self.__params)
#print(self.__pipeline.Info())
return
def wave(self):
self.__client.send_app_message(
{
"prompt": self.__params.prompt,
}
)
def main():
parser = argparse.ArgumentParser(description="Daily Bot")
# Required args
parser.add_argument("-u", "--url", required=True, type=str, help="URL of the Daily room")
parser.add_argument("-k", "--api_key", required=True, type=str, help="Daily API key")
# Optional args
parser.add_argument("-t", "--private", type=bool, help="Is this room private?", default=True)
parser.add_argument("-n", "--bot-name", type=str, help="Name of the bot", default="Daily Bot")
parser.add_argument("-e", "--expiration", type=int, help="Duration of bot", default=os.getenv("BOT_MAX_DURATION", 300))
parser.add_argument("-i", "--idle", type=bool, help="Wait for participants to join", default=os.getenv("BOT_WILL_IDLE", False))
args = parser.parse_args()
Daily.init()
expiration = time.time() + args.expiration
room_name = get_room_name(args.url)
# Retrieve a meeting token, if not provided
#@TODO do room lookup to check privacy
if args.private:
token = get_meeting_token(room_name, args.api_key, expiration)
app = DailyVision(args.url, room_name, expiration, args.idle, args.bot_name)
try :
app.run(args.url, token)
except KeyboardInterrupt:
print("Ctrl-C detected. Exiting!")
finally:
app.leave()
# Let leave finish
time.sleep(2)
if __name__ == '__main__':
main()