hunterschep's picture
Update app.py
ce509fd verified
import gradio as gr
import torch
import librosa
import soundfile as sf # not currently used; safe to keep for future
import numpy as np
from transformers import Wav2Vec2Processor, AutoModelForCTC
import zipfile
import os
import firebase_admin
from firebase_admin import credentials, firestore, storage, get_app
from datetime import datetime, timedelta
import json
import tempfile
import uuid
from typing import List, Dict, Any
import pandas as pd
# =========================
# Diarization / ELAN
# =========================
from pyannote.audio import Pipeline # requires HF token & terms accepted
import pympi
from pyannote.core import Annotation
# =========================
# Firebase initialization
# =========================
def init_firebase():
# If already initialized (e.g., Gradio reload), skip.
try:
_ = get_app()
return
except ValueError:
pass
# Prefer deployed: env var 'firebase_creds' contains the *full* JSON, not a path
svc_json = os.getenv("firebase_creds")
if svc_json:
firebase_config = json.loads(svc_json)
cred = credentials.Certificate(firebase_config)
bucket_name = os.getenv(
"FIREBASE_STORAGE_BUCKET",
f"{firebase_config.get('project_id')}.firebasestorage.app" # <-- CHANGED
)
else:
cred = credentials.Certificate("serviceAccountKey.json")
bucket_name = os.getenv(
"FIREBASE_STORAGE_BUCKET",
"amis-asr-corrections-dem-8cf3d.firebasestorage.app" # <-- CHANGED
)
firebase_admin.initialize_app(cred, {"storageBucket": bucket_name})
# Call once
init_firebase()
db = firestore.client()
bucket = storage.bucket()
# =========================
# Models & pipelines
# =========================
MODEL_NAME = "eleferrand/xlsr53_Amis"
lang = "ami"
processor = Wav2Vec2Processor.from_pretrained(MODEL_NAME)
model = AutoModelForCTC.from_pretrained(MODEL_NAME)
model.eval()
asr_sr = 16000
# HF token (either variable name)
HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
if not HF_TOKEN:
raise RuntimeError("Missing Hugging Face token. Set HF_TOKEN or HUGGINGFACE_TOKEN.")
DIARIZATION_PIPELINE_ID = "pyannote/speaker-diarization-3.1"
try:
diar_pipeline = Pipeline.from_pretrained(DIARIZATION_PIPELINE_ID, token=HF_TOKEN)
except TypeError:
diar_pipeline = Pipeline.from_pretrained(DIARIZATION_PIPELINE_ID, use_auth_token=HF_TOKEN)
# Optional: use GPU if available
if torch.cuda.is_available():
diar_pipeline.to(torch.device("cuda"))
# Helper: robustly get a pyannote.core.Annotation back
def to_annotation(diar_output) -> Annotation:
if isinstance(diar_output, Annotation):
return diar_output
for attr in ("speaker_diarization", "annotation", "diarization"):
ann = getattr(diar_output, attr, None)
if isinstance(ann, Annotation):
return ann
raise TypeError(f"Unexpected diarization output type: {type(diar_output)}")
# =========================
# Utility helpers
# =========================
def fmt_time(s: float) -> str:
ms = int(round(s * 1000))
hh = ms // 3_600_000
mm = (ms % 3_600_000) // 60_000
ss = (ms % 60_000) / 1000.0
if hh:
return f"{hh:02d}:{mm:02d}:{ss:06.3f}"
return f"{mm:02d}:{ss:06.3f}"
def parse_ts(ts_str: str) -> float:
"""
Accepts mm:ss.mmm or hh:mm:ss.mmm or seconds as string, returns seconds (float).
"""
ts_str = str(ts_str).strip()
if ts_str == "":
return 0.0
parts = ts_str.split(":")
try:
if len(parts) == 1:
return float(parts[0])
elif len(parts) == 2:
m, s = parts
return float(m) * 60 + float(s)
elif len(parts) == 3:
h, m, s = parts
return float(h) * 3600 + float(m) * 60 + float(s)
except ValueError:
pass
# Fallback: try float directly
try:
return float(ts_str)
except ValueError:
return 0.0
def load_wav_slice(path: str, start: float, end: float | None, target_sr: int = asr_sr) -> np.ndarray:
"""
Load a slice [start, end) in seconds. If end is None, load to EOF.
"""
if not path or not os.path.exists(path):
return np.array([], dtype=np.float32)
if end is None:
duration = None
else:
duration = max(0.0, end - start)
if duration <= 0:
return np.array([], dtype=np.float32)
y, _ = librosa.load(path, sr=target_sr, offset=max(0.0, start), duration=duration, mono=True)
return y
def asr_transcribe_array(audio_1d: np.ndarray) -> str:
if audio_1d.size == 0:
return ""
with torch.no_grad():
inputs = processor(audio_1d, sampling_rate=asr_sr, return_tensors="pt")
logits = model(inputs.input_values).logits
pred_ids = torch.argmax(logits, dim=-1)
text = processor.batch_decode(pred_ids)[0]
return text.replace("[UNK]", "")
# Convert a DF (rows) or list-of-dicts into normalized segments
def ensure_segments(segments_like: Any) -> List[Dict]:
"""
Accepts either:
- a pandas.DataFrame from Gradio Dataframe
- a list of row lists/tuples: [Play?, Start, End, Speaker, Text]
- a list of dicts: {"start","end","speaker","text"}
Returns a normalized list[dict].
"""
out: List[Dict] = []
# Case 0: None
if segments_like is None:
return out
# Case 1: pandas.DataFrame (from Gradio Dataframe)
if pd is not None and isinstance(segments_like, pd.DataFrame):
if segments_like.empty:
return out
rows = segments_like.values.tolist()
for row in rows:
row = list(row)
# If we have a leading Play column, drop it: [Play, Start, End, Speaker, Text]
if len(row) >= 5:
row = row[1:5]
if len(row) < 4:
row += [""] * (4 - len(row))
start_s = parse_ts(row[0])
end_s = parse_ts(row[1])
spk = "SPEAKER" if row[2] is None or row[2] == "" else str(row[2])
txt = "" if row[3] is None else str(row[3])
out.append({"start": float(start_s), "end": float(end_s), "speaker": spk, "text": txt})
return out
# Case 2: list input
if isinstance(segments_like, list):
if len(segments_like) == 0:
return out
# 2a: DataFrame-like rows (list/tuple)
if isinstance(segments_like[0], (list, tuple)):
for row in segments_like:
row = list(row)
if len(row) >= 5:
row = row[1:5]
if len(row) < 4:
row += [""] * (4 - len(row))
start_s = parse_ts(row[0])
end_s = parse_ts(row[1])
spk = "SPEAKER" if row[2] is None or row[2] == "" else str(row[2])
txt = "" if row[3] is None else str(row[3])
out.append({"start": float(start_s), "end": float(end_s), "speaker": spk, "text": txt})
return out
# 2b: dicts
if isinstance(segments_like[0], dict):
for seg in segments_like:
start_s = parse_ts(seg.get("start", 0))
end_s = parse_ts(seg.get("end", 0))
spk = str(seg.get("speaker", "SPEAKER"))
txt = str(seg.get("text", ""))
out.append({"start": float(start_s), "end": float(end_s), "speaker": spk, "text": txt})
return out
# Unknown shapes => return empty to avoid corrupt export
return out
# =========================
# VAD/diarization & chunking
# =========================
def diarize_segments(audio_file: str) -> List[Dict]:
"""
Returns list of {start, end, speaker} using in-memory audio
to bypass TorchCodec/FFmpeg loader.
"""
if not audio_file or not os.path.exists(audio_file):
return []
y, sr = librosa.load(audio_file, sr=16000, mono=True)
waveform = torch.from_numpy(y).unsqueeze(0) # (1, time)
diar_raw = diar_pipeline({"waveform": waveform, "sample_rate": 16000})
diar = to_annotation(diar_raw)
segments = []
for turn, _, speaker in diar.itertracks(yield_label=True):
segments.append({"start": float(turn.start), "end": float(turn.end), "speaker": str(speaker)})
segments.sort(key=lambda d: (d["start"], d["end"]))
return segments
def split_into_chunks(segments: List[Dict], chunk_sec: float = 10.0, overlap_sec: float = 0.2) -> List[Dict]:
"""
Split diarized regions into ~fixed-size chunks. Enforce a small minimum
to avoid zero-length/std warnings upstream.
"""
MIN_LEN = 0.6 # seconds
out = []
for seg in segments:
s, e, spk = seg["start"], seg["end"], seg["speaker"]
if e - s < MIN_LEN:
continue
t = s
while t < e:
ce = min(e, t + max(chunk_sec, MIN_LEN))
if ce - t >= MIN_LEN:
out.append({"start": float(t), "end": float(ce), "speaker": spk})
if ce >= e:
break
t = ce - min(overlap_sec, max(0.0, chunk_sec * 0.5)) # keep overlap < chunk
return out
# =========================
# Transcription (streaming)
# =========================
def transcribe_streaming(audio_file, chunk_len=10.0, overlap=0.2, max_chunks=None):
if audio_file is None:
yield "", [], []
return
diar_segs = diarize_segments(audio_file)
chunks = split_into_chunks(diar_segs, chunk_sec=float(chunk_len), overlap_sec=float(overlap))
if max_chunks is not None:
chunks = chunks[:max_chunks]
rows, running_text, realized = [], "", []
for ch in chunks:
y = load_wav_slice(audio_file, ch["start"], ch["end"], target_sr=asr_sr)
if y.size == 0:
continue
text = asr_transcribe_array(y).strip()
ch_out = {**ch, "text": text}
realized.append(ch_out)
running_text += f"[{fmt_time(ch_out['start'])}-{fmt_time(ch_out['end'])}] {ch_out['speaker']}: {ch_out['text']}\n"
rows.append([
"▶", # Play button cell
fmt_time(ch_out["start"]),
fmt_time(ch_out["end"]),
ch_out["speaker"],
ch_out["text"],
])
# 3 outputs: original_text (read-only), segments_table (editable), segments_state (hidden)
yield running_text, rows, realized
def transcribe_both(audio_file):
# fallback non-stream function (kept for compatibility)
transcription = asr_transcribe_array(load_wav_slice(audio_file, 0, None)) if audio_file else ""
return transcription, transcription
# =========================
# ELAN (.eaf) writer per ELAN best practices
# =========================
def write_eaf(
segments: List[Dict],
path: str,
audio_path: str | None = None,
lang_code: str = "ami",
trim_overlaps: bool = True
):
"""
Write an ELAN .eaf from the edited segments only.
- One time-alignable tier per speaker with the *corrected* text.
- Optionally link the primary media.
- No dependent/REF tiers, no full-span fallbacks.
"""
eaf = pympi.Elan.Eaf()
def ms(x):
try:
return int(round(float(x) * 1000))
except Exception:
return 0
# Link primary media if provided
if audio_path and os.path.exists(audio_path):
try:
eaf.add_linked_file(audio_path)
except Exception:
eaf.add_linked_file(audio_path, mimetype="audio/wav")
# ELAN time-alignable tiers (top-level) per speaker
if "ASR" not in eaf.linguistic_types:
eaf.add_linguistic_type("ASR", timealignable=True)
# Normalize and (optionally) trim overlaps within each speaker
by_spk: Dict[str, List[Dict]] = {}
for seg in segments or []:
by_spk.setdefault(seg.get("speaker", "SPEAKER"), []).append(seg)
if trim_overlaps:
for spk, lst in by_spk.items():
lst.sort(key=lambda d: (d["start"], d["end"]))
last_end = -float("inf")
for s in lst:
if s["start"] < last_end:
s["start"] = last_end
if s["end"] < s["start"]:
s["end"] = s["start"]
last_end = s["end"]
# Create tiers & write corrected annotations
speakers = sorted(by_spk.keys())
for spk in speakers:
if spk not in eaf.tiers:
eaf.add_tier(spk, ling="ASR", part=spk, language=lang_code)
for seg in by_spk[spk]:
# IMPORTANT: write the corrected text
eaf.add_annotation(spk, ms(seg["start"]), ms(seg["end"]), seg.get("text", ""))
eaf.to_file(path)
# =========================
# Download helpers
# =========================
def segments_to_txt(segments: List[Dict]) -> str:
"""
Build a human-readable txt transcript from corrected segments.
"""
lines = []
for seg in sorted(segments, key=lambda d: (d["start"], d["end"])):
lines.append(f"[{fmt_time(seg['start'])}-{fmt_time(seg['end'])}] {seg['speaker']}: {seg.get('text','')}")
return "\n".join(lines) + ("\n" if lines else "")
def prepare_download_from_df(audio_file, df_rows, eaf_trim, eaf_link_audio):
"""
Convert DF -> segments and return ZIP path containing:
- audio.wav (if present)
- transcript.eaf (from corrected segments)
- transcript.txt (from corrected segments)
"""
segs = ensure_segments(df_rows)
return prepare_download_core(audio_file, segs, eaf_trim, eaf_link_audio)
def prepare_download_core(audio_file, segments, eaf_trim, eaf_link_audio):
"""
Return a ZIP containing exactly:
- audio.wav (if present)
- transcript.eaf (built from edited segments)
- transcript.txt (built from edited segments)
"""
segs = ensure_segments(segments)
if audio_file is None and not segs:
return None
# Write EAF from edited segments
tmp_eaf = tempfile.NamedTemporaryFile(delete=False, suffix=".eaf")
tmp_eaf.close()
write_eaf(
segments=segs,
path=tmp_eaf.name,
audio_path=(audio_file if eaf_link_audio else None),
lang_code=lang,
trim_overlaps=bool(eaf_trim),
)
# Write TXT from edited segments
tmp_txt = tempfile.NamedTemporaryFile(delete=False, suffix=".txt", mode="w", encoding="utf-8")
txt_path = tmp_txt.name
tmp_txt.write(segments_to_txt(segs))
tmp_txt.close()
# Zip audio + eaf + txt
tmp_zip = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
tmp_zip.close()
with zipfile.ZipFile(tmp_zip.name, "w") as zf:
if audio_file and os.path.exists(audio_file):
zf.write(audio_file, arcname="audio.wav")
zf.write(tmp_eaf.name, arcname="transcript.eaf")
zf.write(txt_path, arcname="transcript.txt")
# cleanup temp eaf/txt
try:
os.unlink(tmp_eaf.name)
except Exception:
pass
try:
os.unlink(txt_path)
except Exception:
pass
return tmp_zip.name
def play_segment(evt, audio_file, df_rows):
"""
Gradio Dataframe .select callback:
- Only reacts when the 'Play' column (col 0) is clicked.
- Returns a temp WAV file with audio from that row's [start, end).
"""
# Event index can be (row, col) or just row
idx_info = evt.index
if isinstance(idx_info, (tuple, list)):
row_idx, col_idx = idx_info
# Only play when the Play column is clicked
if col_idx != 0:
return None
else:
row_idx = idx_info
if audio_file is None or not os.path.exists(audio_file):
return None
segs = ensure_segments(df_rows)
if not segs or row_idx is None:
return None
try:
row_idx = int(row_idx)
except Exception:
return None
if row_idx < 0 or row_idx >= len(segs):
return None
seg = segs[row_idx]
start = float(seg.get("start", 0.0))
end = float(seg.get("end", 0.0)) or None
y = load_wav_slice(audio_file, start, end, target_sr=asr_sr)
if y.size == 0:
return None
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
tmp.close()
sf.write(tmp.name, y, asr_sr)
return tmp.name
# =========================
# Save correction (segments from DF at click-time)
# =========================
def store_correction_from_df(use_zh, original_transcription, audio_file, age, native_speaker, df_rows):
"""
Persist ONLY audio + .eaf to Firebase Storage; Firestore stores minimal metadata.
`df_rows` are the current (possibly corrected) rows from the DataFrame.
"""
return store_correction_core(bool(use_zh), original_transcription, audio_file, age, native_speaker, ensure_segments(df_rows))
def store_correction_core(use_zh: bool, original_transcription, audio_file, age, native_speaker, segments):
try:
unique_id = str(uuid.uuid4())
audio_ref = None
# Upload audio (if provided) + collect simple metadata
audio_metadata, audio_url = {}, None
if audio_file and os.path.exists(audio_file):
audio, sr = librosa.load(audio_file, sr=44100, mono=True)
audio_metadata = {
"duration": float(librosa.get_duration(y=audio, sr=sr)),
"file_size": os.path.getsize(audio_file),
}
audio_ref = f"audio/{lang}/{unique_id}.wav"
blob_audio = bucket.blob(audio_ref)
blob_audio.upload_from_filename(audio_file)
audio_url = blob_audio.public_url # or signed url
# Build fresh .eaf from edited segments (CORRECTED TEXT ONLY)
tmp_eaf = tempfile.NamedTemporaryFile(delete=False, suffix=".eaf")
tmp_eaf.close()
write_eaf(
segments=segments or [],
path=tmp_eaf.name,
audio_path=audio_file if audio_file else None,
lang_code=lang,
trim_overlaps=True,
)
# Upload .eaf
eaf_ref = f"elan/{lang}/{unique_id}.eaf"
blob_eaf = bucket.blob(eaf_ref)
blob_eaf.upload_from_filename(tmp_eaf.name)
eaf_url = blob_eaf.public_url
try:
os.unlink(tmp_eaf.name)
except Exception:
pass
# Minimal Firestore doc (no transcripts saved)
db.collection("amis_transcriptions").document(unique_id).set({
"id": unique_id,
"language": lang,
"files": {
"audio_path": audio_ref,
"audio_url": audio_url,
"eaf_path": eaf_ref,
"eaf_url": eaf_url,
},
"audio_metadata": audio_metadata,
"user_info": {
"native_amis_speaker": native_speaker,
"age": age,
},
"timestamp": datetime.now().isoformat(),
"model_name": MODEL_NAME,
"schema": "eaf+audio_only_v2_segments_source",
})
if use_zh:
return "已儲存:EAF 與音訊已上傳至 Firebase。"
else:
return "Saved: uploaded EAF + audio to Firebase."
except Exception as e:
if use_zh:
return f"儲存失敗:{e}"
else:
return f"Save failed: {e}"
# =========================
# Gradio UI
# =========================
with gr.Blocks() as demo:
# UI text dictionaries
EN = {
"toggle_on": "切換到繁體中文 (Switch to Traditional Chinese)",
"toggle_off": "切換到繁體中文 (Switch to Traditional Chinese)", # initial label
"toggle_to_en": "切換到英文 (Switch to English)",
"title": "Amis ASR Transcription & Correction System",
"step1": "Step 1: Audio Upload & Transcription",
"step2": "Step 2: Review & Edit Transcription",
"step3": "Step 3: User Information",
"step4": "Step 4: Save & Download",
"audio_label": "Audio Input",
"stream_btn": "Stream Diarized Transcription",
"chunk_len": "Chunk length (sec)",
"overlap": "Chunk overlap (sec)",
"orig_text": "Original Transcription (streaming)",
"segments": "Segments",
"headers": ["Play", "Start", "End", "Speaker", "Text"],
"age": "Age",
"native": "Native Amis Speaker?",
"eaf_trim": "Trim overlaps per speaker for EAF",
"eaf_link": "Embed media link in EAF",
"save_btn": "Save Correction",
"save_status": "Save Status",
"dl_btn": "Download ZIP File (audio + .txt + .eaf)",
"dl_label": "Download File",
"segment_player": "Segment Player", # 🔴 added
}
ZH = {
"toggle_on": "切換到英文 (Switch to English)",
"toggle_off": "切換到繁體中文 (Switch to Traditional Chinese)",
"toggle_to_en": "切換到英文 (Switch to English)",
"title": "阿美語轉錄與修正系統",
"step1": "步驟一:上傳音訊並進行轉錄",
"step2": "步驟二:審閱與編輯段落轉錄",
"step3": "步驟三:使用者資訊",
"step4": "步驟四:儲存與下載",
"audio_label": "音訊輸入",
"stream_btn": "開始串流分段轉錄",
"chunk_len": "分段長度(秒)",
"overlap": "分段重疊(秒)",
"orig_text": "原始轉錄(串流中)",
"segments": "分段列表",
"headers": ["播放", "開始", "結束", "說話者", "內容"],
"age": "年齡",
"native": "是否為阿美語母語者?",
"eaf_trim": "為每位說話者裁切重疊區間(輸出 EAF)",
"eaf_link": "於 EAF 中連結音訊媒體",
"save_btn": "儲存修正",
"save_status": "儲存狀態",
"dl_btn": "下載 ZIP(音訊 + .txt + .eaf)",
"dl_label": "下載檔案",
"segment_player": "分段播放",
}
# Language state (False=EN, True=ZH)
use_zh_state = gr.State(False)
lang_switch = gr.Checkbox(label=EN["toggle_off"], value=False)
title = gr.Markdown(EN["title"])
step1 = gr.Markdown(EN["step1"])
with gr.Row():
audio_input = gr.Audio(sources=["upload", "microphone"], type="filepath", label=EN["audio_label"])
step2 = gr.Markdown(EN["step2"])
with gr.Row():
transcribe_button = gr.Button(EN["stream_btn"])
chunk_len = gr.Slider(4, 20, value=10, step=1, label=EN["chunk_len"])
overlap = gr.Slider(0.0, 1.0, value=0.2, step=0.1, label=EN["overlap"])
original_text = gr.Textbox(label=EN["orig_text"], interactive=False, lines=10)
segments_table = gr.Dataframe(
headers=EN["headers"],
datatype=["str", "str", "str", "str", "str"], # Play, Start, End, Speaker, Text
label=EN["segments"],
wrap=True,
interactive=True,
)
segments_state = gr.State([])
# New: player for individual segments
segment_player = gr.Audio(label=EN["segment_player"], type="filepath", interactive=False)
def df_to_segments(df_rows):
return ensure_segments(df_rows)
segments_table.change(df_to_segments, inputs=segments_table, outputs=segments_state)
# Click the Play cell in a row to hear that segment
segments_table.select(
fn=play_segment,
inputs=[audio_input, segments_table],
outputs=segment_player,
)
step3 = gr.Markdown(EN["step3"])
with gr.Row():
age_input = gr.Slider(minimum=0, maximum=100, step=1, label=EN["age"], value=25)
native_speaker_input = gr.Checkbox(label=EN["native"], value=True)
step4 = gr.Markdown(EN["step4"])
with gr.Row():
eaf_trim = gr.Checkbox(value=True, label=EN["eaf_trim"])
eaf_link_audio = gr.Checkbox(value=True, label=EN["eaf_link"])
with gr.Row():
save_button = gr.Button(EN["save_btn"])
save_status = gr.Textbox(label=EN["save_status"], interactive=False)
with gr.Row():
download_button = gr.Button(EN["dl_btn"])
download_output = gr.File(label=EN["dl_label"])
# Language toggle updates EVERY feasible label/text
def toggle_language(switch):
use_zh = bool(switch)
D = ZH if use_zh else EN
return (
gr.update(value=D["title"]), # title (Markdown)
gr.update(value=D["step1"]), # step1
gr.update(label=D["audio_label"]), # audio_input
gr.update(value=D["step2"]), # step2
gr.update(value=D["stream_btn"]), # transcribe_button (button text)
gr.update(label=D["chunk_len"]), # chunk_len
gr.update(label=D["overlap"]), # overlap
gr.update(label=D["orig_text"]), # original_text
gr.update(label=D["segments"], headers=D["headers"]), # segments_table
gr.update(label=D["segment_player"]), # segment_player
gr.update(value=D["step3"]), # step3
gr.update(label=D["age"]), # age_input
gr.update(label=D["native"]), # native_speaker_input
gr.update(value=D["step4"]), # step4
gr.update(label=D["eaf_trim"]), # eaf_trim
gr.update(label=D["eaf_link"]), # eaf_link_audio
gr.update(value=D["save_btn"]), # save_button
gr.update(label=D["save_status"]), # save_status
gr.update(value=D["dl_btn"]), # download_button
gr.update(label=D["dl_label"]), # download_output
gr.update(label=(ZH["toggle_on"] if use_zh else EN["toggle_off"])), # lang_switch label
use_zh # state
)
lang_switch.change(
toggle_language,
inputs=lang_switch,
outputs=[
title, step1, audio_input,
step2, transcribe_button, chunk_len, overlap, original_text, segments_table, segment_player,
step3, age_input, native_speaker_input,
step4, eaf_trim, eaf_link_audio,
save_button, save_status, download_button, download_output,
lang_switch, use_zh_state
]
)
# STREAMING transcription: generator yields updates
transcribe_button.click(
fn=transcribe_streaming,
inputs=[audio_input, chunk_len, overlap],
outputs=[original_text, segments_table, segments_state],
queue=True
)
# Save: includes language state for localized messages
save_button.click(
store_correction_from_df,
inputs=[use_zh_state, original_text, audio_input, age_input, native_speaker_input, segments_table],
outputs=save_status
)
# Download (label localized by toggle; function itself returns a file path)
download_button.click(
prepare_download_from_df,
inputs=[audio_input, segments_table, eaf_trim, eaf_link_audio],
outputs=download_output
)
if __name__ == "__main__":
# demo.launch(share=True)
demo.launch()