Spaces:
Sleeping
Sleeping
import logging | |
import traceback | |
from typing import List | |
import av | |
import numpy as np | |
import streamlit as st | |
from streamlit_webrtc import WebRtcMode, webrtc_streamer | |
import pydub | |
from dotenv import load_dotenv | |
from ffmpeg_converter_actor import FFMpegConverterActor | |
load_dotenv() | |
from sample_utils.turn import get_ice_servers | |
logger = logging.getLogger(__name__) | |
import os | |
import ray | |
from ray.util.queue import Queue | |
if not ray.is_initialized(): | |
# Try to connect to a running Ray cluster | |
ray_address = os.getenv('RAY_ADDRESS') | |
if ray_address: | |
ray.init(ray_address, namespace="project_charles") | |
else: | |
ray.init(namespace="project_charles") | |
def video_frame_callback( | |
frame: av.VideoFrame, | |
) -> av.VideoFrame: | |
return frame | |
with open("chunks.pkl", "rb") as f: | |
import pickle | |
debug_chunks = pickle.load(f) | |
converter_queue = Queue(maxsize=100) | |
converter_actor = FFMpegConverterActor.remote(converter_queue) | |
ray.get(converter_actor.start_process.remote()) | |
converter_actor.run.remote() | |
for chunk in debug_chunks: | |
ray.get(converter_actor.push_chunk.remote(chunk)) | |
# emptry array of type int16 | |
sample_buffer = np.zeros((0), dtype=np.int16) | |
def process_frame(old_frame): | |
try: | |
output_channels = 2 | |
output_sample_rate = 44100 | |
required_samples = old_frame.samples | |
if not converter_queue.empty(): | |
frame_as_bytes = converter_queue.get() | |
# print(f"frame_as_bytes: {len(frame_as_bytes)}") | |
samples = np.frombuffer(frame_as_bytes, dtype=np.int16) | |
else: | |
# create a byte array of zeros | |
samples = np.zeros((required_samples * 2 * 1), dtype=np.int16) | |
# Duplicate mono channel for stereo | |
if output_channels == 2: | |
samples = np.vstack((samples, samples)).reshape((-1,), order='F') | |
samples = samples.reshape(1, -1) | |
layout = 'stereo' if output_channels == 2 else 'mono' | |
new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout) | |
new_frame.sample_rate = old_frame.sample_rate | |
new_frame.pts = old_frame.pts | |
return new_frame | |
except Exception as e: | |
print (e) | |
traceback.print_exc() | |
raise(e) | |
def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame: | |
new_frame = process_frame(old_frame) | |
# print (f"frame: {old_frame}, pts: {old_frame.pts}") | |
# print (f"new_frame: {new_frame}, pts: {new_frame.pts}") | |
return new_frame | |
# return old_frame | |
webrtc_streamer( | |
key="delay", | |
mode=WebRtcMode.SENDRECV, | |
rtc_configuration={"iceServers": get_ice_servers()}, | |
video_frame_callback=video_frame_callback, | |
audio_frame_callback=audio_frame_callback, | |
) | |