project_charles / debug_app.py
sohojoe's picture
first version streaming via webrtc but quality sucks
d91a673
raw
history blame
2.78 kB
import logging
import traceback
from typing import List
import av
import numpy as np
import streamlit as st
from streamlit_webrtc import WebRtcMode, webrtc_streamer
import pydub
from dotenv import load_dotenv
from ffmpeg_converter_actor import FFMpegConverterActor
load_dotenv()
from sample_utils.turn import get_ice_servers
logger = logging.getLogger(__name__)
import os
import ray
from ray.util.queue import Queue
if not ray.is_initialized():
# Try to connect to a running Ray cluster
ray_address = os.getenv('RAY_ADDRESS')
if ray_address:
ray.init(ray_address, namespace="project_charles")
else:
ray.init(namespace="project_charles")
def video_frame_callback(
frame: av.VideoFrame,
) -> av.VideoFrame:
return frame
with open("chunks.pkl", "rb") as f:
import pickle
debug_chunks = pickle.load(f)
converter_queue = Queue(maxsize=100)
converter_actor = FFMpegConverterActor.remote(converter_queue)
ray.get(converter_actor.start_process.remote())
converter_actor.run.remote()
for chunk in debug_chunks:
ray.get(converter_actor.push_chunk.remote(chunk))
# emptry array of type int16
sample_buffer = np.zeros((0), dtype=np.int16)
def process_frame(old_frame):
try:
output_channels = 2
output_sample_rate = 44100
required_samples = old_frame.samples
if not converter_queue.empty():
frame_as_bytes = converter_queue.get()
# print(f"frame_as_bytes: {len(frame_as_bytes)}")
samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
else:
# create a byte array of zeros
samples = np.zeros((required_samples * 2 * 1), dtype=np.int16)
# Duplicate mono channel for stereo
if output_channels == 2:
samples = np.vstack((samples, samples)).reshape((-1,), order='F')
samples = samples.reshape(1, -1)
layout = 'stereo' if output_channels == 2 else 'mono'
new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout)
new_frame.sample_rate = old_frame.sample_rate
new_frame.pts = old_frame.pts
return new_frame
except Exception as e:
print (e)
traceback.print_exc()
raise(e)
def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame:
new_frame = process_frame(old_frame)
# print (f"frame: {old_frame}, pts: {old_frame.pts}")
# print (f"new_frame: {new_frame}, pts: {new_frame.pts}")
return new_frame
# return old_frame
webrtc_streamer(
key="delay",
mode=WebRtcMode.SENDRECV,
rtc_configuration={"iceServers": get_ice_servers()},
video_frame_callback=video_frame_callback,
audio_frame_callback=audio_frame_callback,
)