|
|
import gradio as gr |
|
|
import torch |
|
|
import librosa |
|
|
import soundfile as sf |
|
|
import numpy as np |
|
|
from transformers import Wav2Vec2Processor, AutoModelForCTC |
|
|
import zipfile |
|
|
import os |
|
|
import firebase_admin |
|
|
from firebase_admin import credentials, firestore, storage, get_app |
|
|
from datetime import datetime, timedelta |
|
|
import json |
|
|
import tempfile |
|
|
import uuid |
|
|
from typing import List, Dict, Any |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from pyannote.audio import Pipeline |
|
|
import pympi |
|
|
from pyannote.core import Annotation |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def init_firebase(): |
|
|
|
|
|
try: |
|
|
_ = get_app() |
|
|
return |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
|
|
|
svc_json = os.getenv("firebase_creds") |
|
|
if svc_json: |
|
|
firebase_config = json.loads(svc_json) |
|
|
cred = credentials.Certificate(firebase_config) |
|
|
bucket_name = os.getenv( |
|
|
"FIREBASE_STORAGE_BUCKET", |
|
|
f"{firebase_config.get('project_id')}.firebasestorage.app" |
|
|
) |
|
|
else: |
|
|
cred = credentials.Certificate("serviceAccountKey.json") |
|
|
bucket_name = os.getenv( |
|
|
"FIREBASE_STORAGE_BUCKET", |
|
|
"amis-asr-corrections-dem-8cf3d.firebasestorage.app" |
|
|
) |
|
|
|
|
|
firebase_admin.initialize_app(cred, {"storageBucket": bucket_name}) |
|
|
|
|
|
|
|
|
init_firebase() |
|
|
db = firestore.client() |
|
|
bucket = storage.bucket() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MODEL_NAME = "eleferrand/xlsr53_Amis" |
|
|
lang = "ami" |
|
|
|
|
|
processor = Wav2Vec2Processor.from_pretrained(MODEL_NAME) |
|
|
model = AutoModelForCTC.from_pretrained(MODEL_NAME) |
|
|
model.eval() |
|
|
asr_sr = 16000 |
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") |
|
|
if not HF_TOKEN: |
|
|
raise RuntimeError("Missing Hugging Face token. Set HF_TOKEN or HUGGINGFACE_TOKEN.") |
|
|
|
|
|
DIARIZATION_PIPELINE_ID = "pyannote/speaker-diarization-3.1" |
|
|
try: |
|
|
diar_pipeline = Pipeline.from_pretrained(DIARIZATION_PIPELINE_ID, token=HF_TOKEN) |
|
|
except TypeError: |
|
|
diar_pipeline = Pipeline.from_pretrained(DIARIZATION_PIPELINE_ID, use_auth_token=HF_TOKEN) |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
diar_pipeline.to(torch.device("cuda")) |
|
|
|
|
|
|
|
|
def to_annotation(diar_output) -> Annotation: |
|
|
if isinstance(diar_output, Annotation): |
|
|
return diar_output |
|
|
for attr in ("speaker_diarization", "annotation", "diarization"): |
|
|
ann = getattr(diar_output, attr, None) |
|
|
if isinstance(ann, Annotation): |
|
|
return ann |
|
|
raise TypeError(f"Unexpected diarization output type: {type(diar_output)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fmt_time(s: float) -> str: |
|
|
ms = int(round(s * 1000)) |
|
|
hh = ms // 3_600_000 |
|
|
mm = (ms % 3_600_000) // 60_000 |
|
|
ss = (ms % 60_000) / 1000.0 |
|
|
if hh: |
|
|
return f"{hh:02d}:{mm:02d}:{ss:06.3f}" |
|
|
return f"{mm:02d}:{ss:06.3f}" |
|
|
|
|
|
def parse_ts(ts_str: str) -> float: |
|
|
""" |
|
|
Accepts mm:ss.mmm or hh:mm:ss.mmm or seconds as string, returns seconds (float). |
|
|
""" |
|
|
ts_str = str(ts_str).strip() |
|
|
if ts_str == "": |
|
|
return 0.0 |
|
|
parts = ts_str.split(":") |
|
|
try: |
|
|
if len(parts) == 1: |
|
|
return float(parts[0]) |
|
|
elif len(parts) == 2: |
|
|
m, s = parts |
|
|
return float(m) * 60 + float(s) |
|
|
elif len(parts) == 3: |
|
|
h, m, s = parts |
|
|
return float(h) * 3600 + float(m) * 60 + float(s) |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
try: |
|
|
return float(ts_str) |
|
|
except ValueError: |
|
|
return 0.0 |
|
|
|
|
|
def load_wav_slice(path: str, start: float, end: float | None, target_sr: int = asr_sr) -> np.ndarray: |
|
|
""" |
|
|
Load a slice [start, end) in seconds. If end is None, load to EOF. |
|
|
""" |
|
|
if not path or not os.path.exists(path): |
|
|
return np.array([], dtype=np.float32) |
|
|
if end is None: |
|
|
duration = None |
|
|
else: |
|
|
duration = max(0.0, end - start) |
|
|
if duration <= 0: |
|
|
return np.array([], dtype=np.float32) |
|
|
y, _ = librosa.load(path, sr=target_sr, offset=max(0.0, start), duration=duration, mono=True) |
|
|
return y |
|
|
|
|
|
def asr_transcribe_array(audio_1d: np.ndarray) -> str: |
|
|
if audio_1d.size == 0: |
|
|
return "" |
|
|
with torch.no_grad(): |
|
|
inputs = processor(audio_1d, sampling_rate=asr_sr, return_tensors="pt") |
|
|
logits = model(inputs.input_values).logits |
|
|
pred_ids = torch.argmax(logits, dim=-1) |
|
|
text = processor.batch_decode(pred_ids)[0] |
|
|
return text.replace("[UNK]", "") |
|
|
|
|
|
|
|
|
def ensure_segments(segments_like: Any) -> List[Dict]: |
|
|
""" |
|
|
Accepts either: |
|
|
- a pandas.DataFrame from Gradio Dataframe |
|
|
- a list of row lists/tuples: [Play?, Start, End, Speaker, Text] |
|
|
- a list of dicts: {"start","end","speaker","text"} |
|
|
Returns a normalized list[dict]. |
|
|
""" |
|
|
out: List[Dict] = [] |
|
|
|
|
|
|
|
|
if segments_like is None: |
|
|
return out |
|
|
|
|
|
|
|
|
if pd is not None and isinstance(segments_like, pd.DataFrame): |
|
|
if segments_like.empty: |
|
|
return out |
|
|
rows = segments_like.values.tolist() |
|
|
for row in rows: |
|
|
row = list(row) |
|
|
|
|
|
if len(row) >= 5: |
|
|
row = row[1:5] |
|
|
if len(row) < 4: |
|
|
row += [""] * (4 - len(row)) |
|
|
start_s = parse_ts(row[0]) |
|
|
end_s = parse_ts(row[1]) |
|
|
spk = "SPEAKER" if row[2] is None or row[2] == "" else str(row[2]) |
|
|
txt = "" if row[3] is None else str(row[3]) |
|
|
out.append({"start": float(start_s), "end": float(end_s), "speaker": spk, "text": txt}) |
|
|
return out |
|
|
|
|
|
|
|
|
if isinstance(segments_like, list): |
|
|
if len(segments_like) == 0: |
|
|
return out |
|
|
|
|
|
|
|
|
if isinstance(segments_like[0], (list, tuple)): |
|
|
for row in segments_like: |
|
|
row = list(row) |
|
|
if len(row) >= 5: |
|
|
row = row[1:5] |
|
|
if len(row) < 4: |
|
|
row += [""] * (4 - len(row)) |
|
|
start_s = parse_ts(row[0]) |
|
|
end_s = parse_ts(row[1]) |
|
|
spk = "SPEAKER" if row[2] is None or row[2] == "" else str(row[2]) |
|
|
txt = "" if row[3] is None else str(row[3]) |
|
|
out.append({"start": float(start_s), "end": float(end_s), "speaker": spk, "text": txt}) |
|
|
return out |
|
|
|
|
|
|
|
|
if isinstance(segments_like[0], dict): |
|
|
for seg in segments_like: |
|
|
start_s = parse_ts(seg.get("start", 0)) |
|
|
end_s = parse_ts(seg.get("end", 0)) |
|
|
spk = str(seg.get("speaker", "SPEAKER")) |
|
|
txt = str(seg.get("text", "")) |
|
|
out.append({"start": float(start_s), "end": float(end_s), "speaker": spk, "text": txt}) |
|
|
return out |
|
|
|
|
|
|
|
|
return out |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def diarize_segments(audio_file: str) -> List[Dict]: |
|
|
""" |
|
|
Returns list of {start, end, speaker} using in-memory audio |
|
|
to bypass TorchCodec/FFmpeg loader. |
|
|
""" |
|
|
if not audio_file or not os.path.exists(audio_file): |
|
|
return [] |
|
|
y, sr = librosa.load(audio_file, sr=16000, mono=True) |
|
|
waveform = torch.from_numpy(y).unsqueeze(0) |
|
|
diar_raw = diar_pipeline({"waveform": waveform, "sample_rate": 16000}) |
|
|
diar = to_annotation(diar_raw) |
|
|
|
|
|
segments = [] |
|
|
for turn, _, speaker in diar.itertracks(yield_label=True): |
|
|
segments.append({"start": float(turn.start), "end": float(turn.end), "speaker": str(speaker)}) |
|
|
segments.sort(key=lambda d: (d["start"], d["end"])) |
|
|
return segments |
|
|
|
|
|
def split_into_chunks(segments: List[Dict], chunk_sec: float = 10.0, overlap_sec: float = 0.2) -> List[Dict]: |
|
|
""" |
|
|
Split diarized regions into ~fixed-size chunks. Enforce a small minimum |
|
|
to avoid zero-length/std warnings upstream. |
|
|
""" |
|
|
MIN_LEN = 0.6 |
|
|
out = [] |
|
|
for seg in segments: |
|
|
s, e, spk = seg["start"], seg["end"], seg["speaker"] |
|
|
if e - s < MIN_LEN: |
|
|
continue |
|
|
t = s |
|
|
while t < e: |
|
|
ce = min(e, t + max(chunk_sec, MIN_LEN)) |
|
|
if ce - t >= MIN_LEN: |
|
|
out.append({"start": float(t), "end": float(ce), "speaker": spk}) |
|
|
if ce >= e: |
|
|
break |
|
|
t = ce - min(overlap_sec, max(0.0, chunk_sec * 0.5)) |
|
|
return out |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def transcribe_streaming(audio_file, chunk_len=10.0, overlap=0.2, max_chunks=None): |
|
|
if audio_file is None: |
|
|
yield "", [], [] |
|
|
return |
|
|
|
|
|
diar_segs = diarize_segments(audio_file) |
|
|
chunks = split_into_chunks(diar_segs, chunk_sec=float(chunk_len), overlap_sec=float(overlap)) |
|
|
if max_chunks is not None: |
|
|
chunks = chunks[:max_chunks] |
|
|
|
|
|
rows, running_text, realized = [], "", [] |
|
|
for ch in chunks: |
|
|
y = load_wav_slice(audio_file, ch["start"], ch["end"], target_sr=asr_sr) |
|
|
if y.size == 0: |
|
|
continue |
|
|
text = asr_transcribe_array(y).strip() |
|
|
ch_out = {**ch, "text": text} |
|
|
realized.append(ch_out) |
|
|
|
|
|
running_text += f"[{fmt_time(ch_out['start'])}-{fmt_time(ch_out['end'])}] {ch_out['speaker']}: {ch_out['text']}\n" |
|
|
rows.append([ |
|
|
"▶", |
|
|
fmt_time(ch_out["start"]), |
|
|
fmt_time(ch_out["end"]), |
|
|
ch_out["speaker"], |
|
|
ch_out["text"], |
|
|
]) |
|
|
|
|
|
yield running_text, rows, realized |
|
|
|
|
|
|
|
|
def transcribe_both(audio_file): |
|
|
|
|
|
transcription = asr_transcribe_array(load_wav_slice(audio_file, 0, None)) if audio_file else "" |
|
|
return transcription, transcription |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_eaf( |
|
|
segments: List[Dict], |
|
|
path: str, |
|
|
audio_path: str | None = None, |
|
|
lang_code: str = "ami", |
|
|
trim_overlaps: bool = True |
|
|
): |
|
|
""" |
|
|
Write an ELAN .eaf from the edited segments only. |
|
|
- One time-alignable tier per speaker with the *corrected* text. |
|
|
- Optionally link the primary media. |
|
|
- No dependent/REF tiers, no full-span fallbacks. |
|
|
""" |
|
|
eaf = pympi.Elan.Eaf() |
|
|
|
|
|
def ms(x): |
|
|
try: |
|
|
return int(round(float(x) * 1000)) |
|
|
except Exception: |
|
|
return 0 |
|
|
|
|
|
|
|
|
if audio_path and os.path.exists(audio_path): |
|
|
try: |
|
|
eaf.add_linked_file(audio_path) |
|
|
except Exception: |
|
|
eaf.add_linked_file(audio_path, mimetype="audio/wav") |
|
|
|
|
|
|
|
|
if "ASR" not in eaf.linguistic_types: |
|
|
eaf.add_linguistic_type("ASR", timealignable=True) |
|
|
|
|
|
|
|
|
by_spk: Dict[str, List[Dict]] = {} |
|
|
for seg in segments or []: |
|
|
by_spk.setdefault(seg.get("speaker", "SPEAKER"), []).append(seg) |
|
|
|
|
|
if trim_overlaps: |
|
|
for spk, lst in by_spk.items(): |
|
|
lst.sort(key=lambda d: (d["start"], d["end"])) |
|
|
last_end = -float("inf") |
|
|
for s in lst: |
|
|
if s["start"] < last_end: |
|
|
s["start"] = last_end |
|
|
if s["end"] < s["start"]: |
|
|
s["end"] = s["start"] |
|
|
last_end = s["end"] |
|
|
|
|
|
|
|
|
speakers = sorted(by_spk.keys()) |
|
|
for spk in speakers: |
|
|
if spk not in eaf.tiers: |
|
|
eaf.add_tier(spk, ling="ASR", part=spk, language=lang_code) |
|
|
for seg in by_spk[spk]: |
|
|
|
|
|
eaf.add_annotation(spk, ms(seg["start"]), ms(seg["end"]), seg.get("text", "")) |
|
|
|
|
|
eaf.to_file(path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def segments_to_txt(segments: List[Dict]) -> str: |
|
|
""" |
|
|
Build a human-readable txt transcript from corrected segments. |
|
|
""" |
|
|
lines = [] |
|
|
for seg in sorted(segments, key=lambda d: (d["start"], d["end"])): |
|
|
lines.append(f"[{fmt_time(seg['start'])}-{fmt_time(seg['end'])}] {seg['speaker']}: {seg.get('text','')}") |
|
|
return "\n".join(lines) + ("\n" if lines else "") |
|
|
|
|
|
def prepare_download_from_df(audio_file, df_rows, eaf_trim, eaf_link_audio): |
|
|
""" |
|
|
Convert DF -> segments and return ZIP path containing: |
|
|
- audio.wav (if present) |
|
|
- transcript.eaf (from corrected segments) |
|
|
- transcript.txt (from corrected segments) |
|
|
""" |
|
|
segs = ensure_segments(df_rows) |
|
|
return prepare_download_core(audio_file, segs, eaf_trim, eaf_link_audio) |
|
|
|
|
|
def prepare_download_core(audio_file, segments, eaf_trim, eaf_link_audio): |
|
|
""" |
|
|
Return a ZIP containing exactly: |
|
|
- audio.wav (if present) |
|
|
- transcript.eaf (built from edited segments) |
|
|
- transcript.txt (built from edited segments) |
|
|
""" |
|
|
segs = ensure_segments(segments) |
|
|
if audio_file is None and not segs: |
|
|
return None |
|
|
|
|
|
|
|
|
tmp_eaf = tempfile.NamedTemporaryFile(delete=False, suffix=".eaf") |
|
|
tmp_eaf.close() |
|
|
write_eaf( |
|
|
segments=segs, |
|
|
path=tmp_eaf.name, |
|
|
audio_path=(audio_file if eaf_link_audio else None), |
|
|
lang_code=lang, |
|
|
trim_overlaps=bool(eaf_trim), |
|
|
) |
|
|
|
|
|
|
|
|
tmp_txt = tempfile.NamedTemporaryFile(delete=False, suffix=".txt", mode="w", encoding="utf-8") |
|
|
txt_path = tmp_txt.name |
|
|
tmp_txt.write(segments_to_txt(segs)) |
|
|
tmp_txt.close() |
|
|
|
|
|
|
|
|
tmp_zip = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") |
|
|
tmp_zip.close() |
|
|
with zipfile.ZipFile(tmp_zip.name, "w") as zf: |
|
|
if audio_file and os.path.exists(audio_file): |
|
|
zf.write(audio_file, arcname="audio.wav") |
|
|
zf.write(tmp_eaf.name, arcname="transcript.eaf") |
|
|
zf.write(txt_path, arcname="transcript.txt") |
|
|
|
|
|
|
|
|
try: |
|
|
os.unlink(tmp_eaf.name) |
|
|
except Exception: |
|
|
pass |
|
|
try: |
|
|
os.unlink(txt_path) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
return tmp_zip.name |
|
|
|
|
|
def play_segment(evt, audio_file, df_rows): |
|
|
""" |
|
|
Gradio Dataframe .select callback: |
|
|
- Only reacts when the 'Play' column (col 0) is clicked. |
|
|
- Returns a temp WAV file with audio from that row's [start, end). |
|
|
""" |
|
|
|
|
|
idx_info = evt.index |
|
|
if isinstance(idx_info, (tuple, list)): |
|
|
row_idx, col_idx = idx_info |
|
|
|
|
|
if col_idx != 0: |
|
|
return None |
|
|
else: |
|
|
row_idx = idx_info |
|
|
|
|
|
if audio_file is None or not os.path.exists(audio_file): |
|
|
return None |
|
|
|
|
|
segs = ensure_segments(df_rows) |
|
|
if not segs or row_idx is None: |
|
|
return None |
|
|
|
|
|
try: |
|
|
row_idx = int(row_idx) |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
if row_idx < 0 or row_idx >= len(segs): |
|
|
return None |
|
|
|
|
|
seg = segs[row_idx] |
|
|
start = float(seg.get("start", 0.0)) |
|
|
end = float(seg.get("end", 0.0)) or None |
|
|
|
|
|
y = load_wav_slice(audio_file, start, end, target_sr=asr_sr) |
|
|
if y.size == 0: |
|
|
return None |
|
|
|
|
|
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") |
|
|
tmp.close() |
|
|
sf.write(tmp.name, y, asr_sr) |
|
|
return tmp.name |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def store_correction_from_df(use_zh, original_transcription, audio_file, age, native_speaker, df_rows): |
|
|
""" |
|
|
Persist ONLY audio + .eaf to Firebase Storage; Firestore stores minimal metadata. |
|
|
`df_rows` are the current (possibly corrected) rows from the DataFrame. |
|
|
""" |
|
|
return store_correction_core(bool(use_zh), original_transcription, audio_file, age, native_speaker, ensure_segments(df_rows)) |
|
|
|
|
|
def store_correction_core(use_zh: bool, original_transcription, audio_file, age, native_speaker, segments): |
|
|
try: |
|
|
unique_id = str(uuid.uuid4()) |
|
|
audio_ref = None |
|
|
|
|
|
|
|
|
audio_metadata, audio_url = {}, None |
|
|
if audio_file and os.path.exists(audio_file): |
|
|
audio, sr = librosa.load(audio_file, sr=44100, mono=True) |
|
|
audio_metadata = { |
|
|
"duration": float(librosa.get_duration(y=audio, sr=sr)), |
|
|
"file_size": os.path.getsize(audio_file), |
|
|
} |
|
|
audio_ref = f"audio/{lang}/{unique_id}.wav" |
|
|
blob_audio = bucket.blob(audio_ref) |
|
|
blob_audio.upload_from_filename(audio_file) |
|
|
audio_url = blob_audio.public_url |
|
|
|
|
|
|
|
|
tmp_eaf = tempfile.NamedTemporaryFile(delete=False, suffix=".eaf") |
|
|
tmp_eaf.close() |
|
|
write_eaf( |
|
|
segments=segments or [], |
|
|
path=tmp_eaf.name, |
|
|
audio_path=audio_file if audio_file else None, |
|
|
lang_code=lang, |
|
|
trim_overlaps=True, |
|
|
) |
|
|
|
|
|
|
|
|
eaf_ref = f"elan/{lang}/{unique_id}.eaf" |
|
|
blob_eaf = bucket.blob(eaf_ref) |
|
|
blob_eaf.upload_from_filename(tmp_eaf.name) |
|
|
eaf_url = blob_eaf.public_url |
|
|
try: |
|
|
os.unlink(tmp_eaf.name) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
db.collection("amis_transcriptions").document(unique_id).set({ |
|
|
"id": unique_id, |
|
|
"language": lang, |
|
|
"files": { |
|
|
"audio_path": audio_ref, |
|
|
"audio_url": audio_url, |
|
|
"eaf_path": eaf_ref, |
|
|
"eaf_url": eaf_url, |
|
|
}, |
|
|
"audio_metadata": audio_metadata, |
|
|
"user_info": { |
|
|
"native_amis_speaker": native_speaker, |
|
|
"age": age, |
|
|
}, |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"model_name": MODEL_NAME, |
|
|
"schema": "eaf+audio_only_v2_segments_source", |
|
|
}) |
|
|
|
|
|
if use_zh: |
|
|
return "已儲存:EAF 與音訊已上傳至 Firebase。" |
|
|
else: |
|
|
return "Saved: uploaded EAF + audio to Firebase." |
|
|
except Exception as e: |
|
|
if use_zh: |
|
|
return f"儲存失敗:{e}" |
|
|
else: |
|
|
return f"Save failed: {e}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
|
|
|
EN = { |
|
|
"toggle_on": "切換到繁體中文 (Switch to Traditional Chinese)", |
|
|
"toggle_off": "切換到繁體中文 (Switch to Traditional Chinese)", |
|
|
"toggle_to_en": "切換到英文 (Switch to English)", |
|
|
"title": "Amis ASR Transcription & Correction System", |
|
|
"step1": "Step 1: Audio Upload & Transcription", |
|
|
"step2": "Step 2: Review & Edit Transcription", |
|
|
"step3": "Step 3: User Information", |
|
|
"step4": "Step 4: Save & Download", |
|
|
"audio_label": "Audio Input", |
|
|
"stream_btn": "Stream Diarized Transcription", |
|
|
"chunk_len": "Chunk length (sec)", |
|
|
"overlap": "Chunk overlap (sec)", |
|
|
"orig_text": "Original Transcription (streaming)", |
|
|
"segments": "Segments", |
|
|
"headers": ["Play", "Start", "End", "Speaker", "Text"], |
|
|
"age": "Age", |
|
|
"native": "Native Amis Speaker?", |
|
|
"eaf_trim": "Trim overlaps per speaker for EAF", |
|
|
"eaf_link": "Embed media link in EAF", |
|
|
"save_btn": "Save Correction", |
|
|
"save_status": "Save Status", |
|
|
"dl_btn": "Download ZIP File (audio + .txt + .eaf)", |
|
|
"dl_label": "Download File", |
|
|
"segment_player": "Segment Player", |
|
|
|
|
|
|
|
|
} |
|
|
ZH = { |
|
|
"toggle_on": "切換到英文 (Switch to English)", |
|
|
"toggle_off": "切換到繁體中文 (Switch to Traditional Chinese)", |
|
|
"toggle_to_en": "切換到英文 (Switch to English)", |
|
|
"title": "阿美語轉錄與修正系統", |
|
|
"step1": "步驟一:上傳音訊並進行轉錄", |
|
|
"step2": "步驟二:審閱與編輯段落轉錄", |
|
|
"step3": "步驟三:使用者資訊", |
|
|
"step4": "步驟四:儲存與下載", |
|
|
"audio_label": "音訊輸入", |
|
|
"stream_btn": "開始串流分段轉錄", |
|
|
"chunk_len": "分段長度(秒)", |
|
|
"overlap": "分段重疊(秒)", |
|
|
"orig_text": "原始轉錄(串流中)", |
|
|
"segments": "分段列表", |
|
|
"headers": ["播放", "開始", "結束", "說話者", "內容"], |
|
|
"age": "年齡", |
|
|
"native": "是否為阿美語母語者?", |
|
|
"eaf_trim": "為每位說話者裁切重疊區間(輸出 EAF)", |
|
|
"eaf_link": "於 EAF 中連結音訊媒體", |
|
|
"save_btn": "儲存修正", |
|
|
"save_status": "儲存狀態", |
|
|
"dl_btn": "下載 ZIP(音訊 + .txt + .eaf)", |
|
|
"dl_label": "下載檔案", |
|
|
"segment_player": "分段播放", |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
use_zh_state = gr.State(False) |
|
|
|
|
|
lang_switch = gr.Checkbox(label=EN["toggle_off"], value=False) |
|
|
|
|
|
title = gr.Markdown(EN["title"]) |
|
|
step1 = gr.Markdown(EN["step1"]) |
|
|
with gr.Row(): |
|
|
audio_input = gr.Audio(sources=["upload", "microphone"], type="filepath", label=EN["audio_label"]) |
|
|
|
|
|
step2 = gr.Markdown(EN["step2"]) |
|
|
with gr.Row(): |
|
|
transcribe_button = gr.Button(EN["stream_btn"]) |
|
|
chunk_len = gr.Slider(4, 20, value=10, step=1, label=EN["chunk_len"]) |
|
|
overlap = gr.Slider(0.0, 1.0, value=0.2, step=0.1, label=EN["overlap"]) |
|
|
|
|
|
original_text = gr.Textbox(label=EN["orig_text"], interactive=False, lines=10) |
|
|
|
|
|
segments_table = gr.Dataframe( |
|
|
headers=EN["headers"], |
|
|
datatype=["str", "str", "str", "str", "str"], |
|
|
label=EN["segments"], |
|
|
wrap=True, |
|
|
interactive=True, |
|
|
) |
|
|
segments_state = gr.State([]) |
|
|
|
|
|
|
|
|
segment_player = gr.Audio(label=EN["segment_player"], type="filepath", interactive=False) |
|
|
|
|
|
|
|
|
def df_to_segments(df_rows): |
|
|
return ensure_segments(df_rows) |
|
|
|
|
|
segments_table.change(df_to_segments, inputs=segments_table, outputs=segments_state) |
|
|
|
|
|
|
|
|
segments_table.select( |
|
|
fn=play_segment, |
|
|
inputs=[audio_input, segments_table], |
|
|
outputs=segment_player, |
|
|
) |
|
|
|
|
|
step3 = gr.Markdown(EN["step3"]) |
|
|
with gr.Row(): |
|
|
age_input = gr.Slider(minimum=0, maximum=100, step=1, label=EN["age"], value=25) |
|
|
native_speaker_input = gr.Checkbox(label=EN["native"], value=True) |
|
|
|
|
|
step4 = gr.Markdown(EN["step4"]) |
|
|
with gr.Row(): |
|
|
eaf_trim = gr.Checkbox(value=True, label=EN["eaf_trim"]) |
|
|
eaf_link_audio = gr.Checkbox(value=True, label=EN["eaf_link"]) |
|
|
|
|
|
with gr.Row(): |
|
|
save_button = gr.Button(EN["save_btn"]) |
|
|
save_status = gr.Textbox(label=EN["save_status"], interactive=False) |
|
|
|
|
|
with gr.Row(): |
|
|
download_button = gr.Button(EN["dl_btn"]) |
|
|
download_output = gr.File(label=EN["dl_label"]) |
|
|
|
|
|
|
|
|
def toggle_language(switch): |
|
|
use_zh = bool(switch) |
|
|
D = ZH if use_zh else EN |
|
|
|
|
|
return ( |
|
|
gr.update(value=D["title"]), |
|
|
gr.update(value=D["step1"]), |
|
|
gr.update(label=D["audio_label"]), |
|
|
gr.update(value=D["step2"]), |
|
|
gr.update(value=D["stream_btn"]), |
|
|
gr.update(label=D["chunk_len"]), |
|
|
gr.update(label=D["overlap"]), |
|
|
gr.update(label=D["orig_text"]), |
|
|
gr.update(label=D["segments"], headers=D["headers"]), |
|
|
gr.update(label=D["segment_player"]), |
|
|
gr.update(value=D["step3"]), |
|
|
gr.update(label=D["age"]), |
|
|
gr.update(label=D["native"]), |
|
|
gr.update(value=D["step4"]), |
|
|
gr.update(label=D["eaf_trim"]), |
|
|
gr.update(label=D["eaf_link"]), |
|
|
gr.update(value=D["save_btn"]), |
|
|
gr.update(label=D["save_status"]), |
|
|
gr.update(value=D["dl_btn"]), |
|
|
gr.update(label=D["dl_label"]), |
|
|
gr.update(label=(ZH["toggle_on"] if use_zh else EN["toggle_off"])), |
|
|
use_zh |
|
|
) |
|
|
|
|
|
|
|
|
lang_switch.change( |
|
|
toggle_language, |
|
|
inputs=lang_switch, |
|
|
outputs=[ |
|
|
title, step1, audio_input, |
|
|
step2, transcribe_button, chunk_len, overlap, original_text, segments_table, segment_player, |
|
|
step3, age_input, native_speaker_input, |
|
|
step4, eaf_trim, eaf_link_audio, |
|
|
save_button, save_status, download_button, download_output, |
|
|
lang_switch, use_zh_state |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
transcribe_button.click( |
|
|
fn=transcribe_streaming, |
|
|
inputs=[audio_input, chunk_len, overlap], |
|
|
outputs=[original_text, segments_table, segments_state], |
|
|
queue=True |
|
|
) |
|
|
|
|
|
|
|
|
save_button.click( |
|
|
store_correction_from_df, |
|
|
inputs=[use_zh_state, original_text, audio_input, age_input, native_speaker_input, segments_table], |
|
|
outputs=save_status |
|
|
) |
|
|
|
|
|
|
|
|
download_button.click( |
|
|
prepare_download_from_df, |
|
|
inputs=[audio_input, segments_table, eaf_trim, eaf_link_audio], |
|
|
outputs=download_output |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
demo.launch() |
|
|
|