|
import time |
|
import streamlit as st |
|
|
|
import os |
|
import torch |
|
import datetime |
|
import numpy as np |
|
import soundfile |
|
from wavmark.utils import file_reader |
|
from audioseal import AudioSeal |
|
import torchaudio |
|
from pydub import AudioSegment |
|
import io |
|
import librosa |
|
import ffmpeg |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_default_value(): |
|
if "def_value" not in st.session_state: |
|
def_val_npy = np.random.choice([0, 1], size=32 - len_start_bit) |
|
def_val_str = "".join([str(i) for i in def_val_npy]) |
|
st.session_state.def_value = def_val_str |
|
|
|
def download_sample_audio(): |
|
url = "https://keithito.com/LJ-Speech-Dataset/LJ037-0171.wav" |
|
with open("test.wav", "wb") as f: |
|
resp = urllib.request.urlopen(url) |
|
f.write(resp.read()) |
|
|
|
wav, sample_rate = torchaudio.load("test.wav") |
|
return wav, sample_rate |
|
|
|
|
|
def main(): |
|
create_default_value() |
|
|
|
|
|
|
|
markdown_text = """ |
|
# MDS07 Demo Presentation |
|
[AudioSeal](https://github.com/jcha0155/audioseal) is the next-generation watermarking tool driven by AI. |
|
You can upload an audio file and encode a custom 16-bit watermark or perform decoding from a watermarked audio. |
|
|
|
This page is for demonstration usage and only process **the first minute** of the audio. |
|
If you have longer files for processing, we recommend using [our python toolkit](https://github.com/jcha0155/audioseal). |
|
""" |
|
|
|
|
|
st.markdown(markdown_text) |
|
|
|
audio_file = st.file_uploader("Upload Audio", type=["wav", "mp3"], accept_multiple_files=False) |
|
|
|
if audio_file: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tmp_input_audio_file = os.path.join("/tmp/", audio_file.name) |
|
file_extension = os.path.splitext(tmp_input_audio_file)[1].lower() |
|
|
|
if file_extension in [".wav", ".flac"]: |
|
with open("test.wav", "wb") as f: |
|
f.write(audio_file.getbuffer()) |
|
|
|
st.audio("test.wav", format="audio/wav") |
|
|
|
elif file_extension == ".mp3": |
|
with open("test.mp3", "wb") as f: |
|
f.write(audio_file.getbuffer()) |
|
|
|
st.audio("test.mp3", format="audio/mpeg") |
|
|
|
|
|
if file_extension in [".wav", ".flac"]: |
|
wav, sample_rate = torchaudio.load("test.wav") |
|
|
|
|
|
file_extension_ori =".wav" |
|
|
|
wav= wav.unsqueeze(0) |
|
|
|
elif file_extension == ".mp3": |
|
|
|
audio = AudioSegment.from_mp3("test.mp3") |
|
|
|
|
|
audio.export("test.wav", format="wav") |
|
wav3, sample_rate = torchaudio.load("test.wav") |
|
wav= wav3.unsqueeze(0) |
|
file_extension_ori =".mp3" |
|
file_extension =".wav" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
action = st.selectbox("Select Action", ["Add Watermark", "Detect Watermark"]) |
|
|
|
if action == "Add Watermark": |
|
|
|
add_watermark_button = st.button("Add Watermark", key="add_watermark_btn") |
|
if add_watermark_button: |
|
|
|
if audio_file: |
|
with st.spinner("Adding Watermark..."): |
|
|
|
|
|
|
|
watermark = model.get_watermark(wav, default_sr) |
|
watermarked_audio = wav + watermark |
|
print(watermarked_audio.size()) |
|
size = watermarked_audio.size() |
|
|
|
|
|
print(watermarked_audio.squeeze()) |
|
squeeze = watermarked_audio.squeeze(1) |
|
shape = squeeze.size() |
|
|
|
|
|
|
|
if file_extension_ori in [".wav", ".flac"]: |
|
torchaudio.save("output.wav", squeeze, default_sr, bits_per_sample=16) |
|
watermarked_wav = torchaudio.save("output.wav", squeeze, default_sr, bits_per_sample=16) |
|
|
|
st.audio("output.wav", format="audio/wav") |
|
|
|
with open("output.wav", "rb") as file: |
|
|
|
|
|
binary_data = file.read() |
|
btn = st.download_button( |
|
label="Download watermarked audio", |
|
data=binary_data, |
|
file_name="output.wav", |
|
mime="audio/wav", |
|
) |
|
|
|
|
|
elif file_extension_ori == ".mp3": |
|
torchaudio.save("output.wav", squeeze, default_sr) |
|
watermarked_mp3 = torchaudio.save("output.wav", squeeze, default_sr) |
|
audio = AudioSegment.from_wav("output.wav") |
|
|
|
|
|
audio.export("output.mp3", format="mp3") |
|
st.audio("output.mp3", format="audio/mpeg") |
|
|
|
with open("output.mp3", "rb") as file: |
|
|
|
binary_data = file.read() |
|
st.download_button( |
|
label="Download watermarked audio", |
|
data=binary_data, |
|
file_name="output.mp3", |
|
mime="audio/mpeg", |
|
) |
|
|
|
elif action == "Detect Watermark": |
|
detect_watermark_button = st.button("Detect Watermark", key="detect_watermark_btn") |
|
|
|
if audio_file: |
|
|
|
watermark = model.get_watermark(wav, default_sr) |
|
watermarked_audio = wav + watermark |
|
print(watermarked_audio.size()) |
|
size = watermarked_audio.size() |
|
|
|
|
|
|
|
if detect_watermark_button: |
|
with st.spinner("Detecting..."): |
|
result, message = detector.detect_watermark(watermarked_audio, sample_rate=default_sr, message_threshold=0.5) |
|
st.markdown("This is likely a watermarked audio:") |
|
st.markdown(result) |
|
print(f"\nThis is likely a watermarked audio: {result}") |
|
|
|
|
|
|
|
if file_extension in [".wav", ".flac"]: |
|
wav, sample_rate = torchaudio.load("test.wav") |
|
wav= wav.unsqueeze(0) |
|
|
|
elif file_extension == ".mp3": |
|
|
|
audio = AudioSegment.from_mp3("test.mp3") |
|
|
|
audio.export("test.wav", format="wav") |
|
wav, sample_rate = torchaudio.load("test.wav") |
|
wav= wav.unsqueeze(0) |
|
|
|
result2, message2 = detector.detect_watermark(wav, sample_rate=default_sr, message_threshold=0.5) |
|
st.markdown("This is likely an unwatermarked audio:") |
|
print(f"This is likely an unwatermarked audio: {result2}") |
|
st.markdown(result2) |
|
|
|
|
|
if __name__ == "__main__": |
|
default_sr = 16000 |
|
max_second_encode = 60 |
|
max_second_decode = 30 |
|
len_start_bit = 16 |
|
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') |
|
|
|
model = AudioSeal.load_generator("audioseal_wm_16bits") |
|
detector = AudioSeal.load_detector(("audioseal_detector_16bits")) |
|
main() |
|
|
|
|
|
|
|
|
|
|