Spaces:
Sleeping
Sleeping
import io | |
import os | |
import subprocess | |
import av | |
import numpy as np | |
mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"] | |
mpv_process = subprocess.Popen( | |
mpv_command, | |
stdin=subprocess.PIPE, | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL, | |
) | |
load_chunks = False | |
load_chunks = os.path.exists("chunks.pkl") | |
audio_frames = [] | |
# try open chunks | |
with open("chunks.pkl", "rb") as f: | |
import pickle | |
chunks = pickle.load(f) | |
append = False | |
for chunk in chunks: | |
mpv_process.stdin.write(chunk) | |
mpv_process.stdin.flush() | |
# np_chunk = np.frombuffer(chunk, dtype=np.int16) | |
# aa = av.AudioFrame.from_ndarray(chunk) | |
try: | |
if append: | |
bytes_io.write(chunk) | |
append = False | |
bytes_io.seek(0) | |
else: | |
bytes_io = io.BytesIO(chunk) | |
container = av.open(bytes_io, 'r', format='mp3') | |
audio_stream = next(s for s in container.streams if s.type == 'audio') | |
for frame in container.decode(audio_stream): | |
# Convert the audio frame to a NumPy array | |
array = frame.to_ndarray() | |
# Now you can use av.AudioFrame.from_ndarray | |
audio_frame = av.AudioFrame.from_ndarray(array, format='flt', layout='mono') | |
audio_frame.sample_rate = 44100 | |
audio_frames.append(audio_frame) | |
except Exception as e: | |
print (e) | |
append = True | |
bytes_io.seek(0, io.SEEK_END) | |
continue | |
# with open("frames.pkl", "wb") as f: | |
# import pickle | |
# pickle.dump(audio_frames, f) | |
if mpv_process.stdin: | |
mpv_process.stdin.close() | |
mpv_process.wait() |