project_charles / debug_app.py
sohojoe's picture
refactor ffmeg_concerter_actor.py to its own class
c4fe843
raw
history blame
2.72 kB
import logging
import traceback
from typing import List
import av
import numpy as np
import streamlit as st
from streamlit_webrtc import WebRtcMode, webrtc_streamer
import pydub
from dotenv import load_dotenv
from ffmpeg_converter_actor import FFMpegConverterActor
load_dotenv()
from sample_utils.turn import get_ice_servers
logger = logging.getLogger(__name__)
import os
import ray
from ray.util.queue import Queue
if not ray.is_initialized():
# Try to connect to a running Ray cluster
ray_address = os.getenv('RAY_ADDRESS')
if ray_address:
ray.init(ray_address, namespace="project_charles")
else:
ray.init(namespace="project_charles")
def video_frame_callback(
frame: av.VideoFrame,
) -> av.VideoFrame:
return frame
with open("chunks.pkl", "rb") as f:
import pickle
debug_chunks = pickle.load(f)
converter_queue = Queue(maxsize=100)
converter_actor = FFMpegConverterActor.remote(converter_queue)
ray.get(converter_actor.start_process.remote())
converter_actor.run.remote()
for chunk in debug_chunks:
ray.get(converter_actor.push_chunk.remote(chunk))
# emptry array of type int16
sample_buffer = np.zeros((0), dtype=np.int16)
def process_frame(old_frame):
try:
output_channels = 2
output_sample_rate = 44100
required_samples = old_frame.samples
if not converter_queue.empty():
frame_as_bytes = converter_queue.get()
samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
else:
# create a byte array of zeros
samples = np.zeros((required_samples * 2 * 1), dtype=np.int16)
# Duplicate mono channel for stereo
if output_channels == 2:
samples = np.vstack((samples, samples)).reshape((-1,), order='F')
samples = samples.reshape(1, -1)
layout = 'stereo' if output_channels == 2 else 'mono'
new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout)
new_frame.sample_rate = old_frame.sample_rate
new_frame.pts = old_frame.pts
return new_frame
except Exception as e:
print (e)
traceback.print_exc()
raise(e)
def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame:
new_frame = process_frame(old_frame)
# print (f"frame: {old_frame}, pts: {old_frame.pts}")
# print (f"new_frame: {new_frame}, pts: {new_frame.pts}")
return new_frame
# return old_frame
webrtc_streamer(
key="delay",
mode=WebRtcMode.SENDRECV,
rtc_configuration={"iceServers": get_ice_servers()},
video_frame_callback=video_frame_callback,
audio_frame_callback=audio_frame_callback,
)