Thanh-Lam's picture
Add HF_TOKEN env support and improve network error messages for HF Spaces
56dc9d8
from __future__ import annotations
import functools
import tempfile
from pathlib import Path
from typing import List, Any
import shutil
import csv
import subprocess
import zipfile
import gradio as gr
from src.models import DiarizationEngine, Segment
from src.utils import (
export_segments_json,
format_segments_table,
seconds_to_mmss,
download_audio_from_url,
)
DEFAULT_TOKEN_SENTINEL = "__FROM_FILE_OR_ENV__"
GENDER_MAP = {"nam": "0", "male": "0", "nữ": "1", "nu": "1", "female": "1"}
REGION_MAP = {"bắc": "0", "bac": "0", "north": "0", "trung": "1", "central": "1", "nam": "2", "south": "2"}
ALLOWED_GENDER = {"nam", "nữ", "nu", "male", "female"}
ALLOWED_REGION = {"bắc", "trung", "nam", "bac", "north", "central", "south"}
def diarize_file(
audio_path: str | Path,
hf_token: str | None = None,
device: str = "auto",
show_progress: bool = True,
) -> List[Segment]:
"""API đơn giản để dùng trực tiếp trong Python."""
engine = DiarizationEngine(token=hf_token, device=device)
return engine.run(audio_path, show_progress=show_progress)
def _token_key(raw_token: str | None) -> str:
cleaned = raw_token.strip() if raw_token else None
return cleaned if cleaned else DEFAULT_TOKEN_SENTINEL
@functools.lru_cache(maxsize=2)
def _get_engine(token_key: str, device: str) -> DiarizationEngine:
token_value = None if token_key == DEFAULT_TOKEN_SENTINEL else token_key
return DiarizationEngine(token=token_value, device=device)
def _diarize_action(
audio_path: str | None,
hf_token: str | None,
device: str,
url: str | None = None,
):
import sys
print(f"DEBUG START: audio_path={audio_path}, url={url}", file=sys.stderr)
if not audio_path and not url:
empty_state = ["", "", "", ""]
return "Vui lòng tải file âm thanh hoặc nhập URL.", None, None, [], [], empty_state, None
try:
downloaded_path = None
download_tmp = None
audio_input = audio_path
if url:
print(f"DEBUG: Downloading from URL: {url}", file=sys.stderr)
downloaded_path, download_tmp = download_audio_from_url(url)
audio_input = str(downloaded_path)
print(f"DEBUG: Downloaded to: {audio_input}, tmp={download_tmp}", file=sys.stderr)
print(f"DEBUG: Getting engine...", file=sys.stderr)
engine = _get_engine(_token_key(hf_token), device)
print(f"DEBUG: Running diarization on: {audio_input}", file=sys.stderr)
diarization, prepared_path, prep_tmpdir = engine.diarize(
audio_input, show_progress=False, keep_audio=True
)
print(f"DEBUG: Diarization done. prepared_path={prepared_path}, prep_tmpdir={prep_tmpdir}", file=sys.stderr)
segments = engine.to_segments(diarization)
dict_segments = [
{"start": float(seg.start), "end": float(seg.end), "speaker": seg.speaker}
for seg in segments
]
table = format_segments_table(dict_segments)
output_tmp = Path(tempfile.mkdtemp(prefix="diarization_out_"))
rttm_path = engine.save_rttm(diarization, output_tmp / "output.rttm")
json_path = export_segments_json(dict_segments, output_tmp / "segments.json")
df_rows = [
[
seconds_to_mmss(seg["start"]),
seconds_to_mmss(seg["end"]),
seg["speaker"],
"",
"",
"",
]
for seg in dict_segments
]
source_name = Path(audio_input).stem if audio_input else "unknown"
audio_state = [
str(prepared_path),
str(prep_tmpdir) if prep_tmpdir else "",
source_name,
str(download_tmp) if download_tmp else "",
]
# Đảm bảo prepared_path là file path hợp lệ, không phải None hoặc directory
audio_file_output = str(prepared_path) if prepared_path and Path(prepared_path).is_file() else None
# Debug logging
import sys
print(f"DEBUG: prepared_path={prepared_path}, type={type(prepared_path)}", file=sys.stderr)
print(f"DEBUG: rttm_path={rttm_path}, exists={Path(rttm_path).exists()}", file=sys.stderr)
print(f"DEBUG: json_path={json_path}, exists={Path(json_path).exists()}", file=sys.stderr)
print(f"DEBUG: audio_file_output={audio_file_output}", file=sys.stderr)
return (
table,
str(rttm_path),
str(json_path),
df_rows,
dict_segments,
audio_state,
audio_file_output,
)
except Exception as exc: # pragma: no cover - hiển thị lỗi cho người dùng giao diện
import traceback
print(f"DEBUG ERROR: {exc}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
empty_state = ["", "", "", ""]
return f"Lỗi: {exc}", None, None, [], [], empty_state, None
def _normalize_label(value: Any) -> str:
return str(value).strip().lower() if value is not None else ""
def _table_to_rows(table_data: Any) -> list[list[Any]]:
"""Chuyển giá trị DataFrame/ndarray/list sang list of list để thao tác."""
if table_data is None:
return []
if hasattr(table_data, "values"): # pandas DataFrame hoặc ndarray
try:
return table_data.values.tolist()
except Exception:
pass
if isinstance(table_data, list):
return table_data
if isinstance(table_data, tuple):
return list(table_data)
return []
def _select_row_action(evt: gr.SelectData):
row_idx = evt.index[0] if evt and evt.index else -1
if row_idx is None or row_idx < 0:
return "Chưa chọn hàng", -1
return f"Đang chọn hàng {row_idx + 1}", row_idx
def _apply_dropdown_action(
table_rows: list[list[Any]] | None,
selected_idx: int,
gender_choice: str,
region_choice: str,
transcription_text: str,
):
rows = _table_to_rows(table_rows)
if selected_idx is None or selected_idx < 0 or selected_idx >= len(rows):
return rows, "Chọn một hàng trước."
gender_val = _normalize_label(gender_choice)
region_val = _normalize_label(region_choice)
if gender_val and gender_val not in ALLOWED_GENDER:
return rows, "Giới tính chỉ được chọn nam/nữ."
if region_val and region_val not in ALLOWED_REGION:
return rows, "Vùng miền chỉ được chọn bắc/trung/nam."
new_rows = [list(r) for r in rows]
# row order: start_mmss, end_mmss, speaker, gender, region, transcription
if len(new_rows[selected_idx]) < 6:
new_rows[selected_idx] = (new_rows[selected_idx] + [""] * 6)[:6]
new_rows[selected_idx][3] = gender_val
new_rows[selected_idx][4] = region_val
new_rows[selected_idx][5] = transcription_text
return new_rows, f"Đã áp dụng cho hàng {selected_idx + 1}."
def _import_archives_action(files: list[Any] | None, output_root: str = "outputs"):
if not files:
return "Chọn ít nhất một file ZIP.", None
merged_root = Path(tempfile.mkdtemp(prefix="merged_zip_"))
merged_data = merged_root / "merged"
merged_data.mkdir(parents=True, exist_ok=True)
meta_all = merged_data / "metadata_all.csv"
appended = 0
extracted = 0
with meta_all.open("w", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(
[
"id",
"file_name",
"start_mmss",
"end_mmss",
"gender",
"region",
"transcription",
"speaker",
"duration_sec",
"source",
]
)
for f in files:
zip_path = Path(getattr(f, "name", f))
if not zip_path.exists() and isinstance(f, dict) and "name" in f:
zip_path = Path(f["name"])
if not zip_path.exists():
continue
extracted += 1
dest_dir = merged_data / zip_path.stem
dest_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(dest_dir)
meta_csv = dest_dir / "metadata.csv"
if meta_csv.exists():
with meta_all.open("a", newline="", encoding="utf-8") as out_csv:
writer = csv.writer(out_csv)
with meta_csv.open("r", encoding="utf-8") as src:
next(src, None) # skip header
for line in src:
row = line.strip().split(",")
if row and any(row):
writer.writerow(row + [zip_path.stem])
appended += 1
# Tạo file zip từ thư mục merged_data
zip_base_name = merged_root / "merged_output"
merged_zip = shutil.make_archive(str(zip_base_name), "zip", merged_data)
status = f"Đã gộp {extracted} ZIP, metadata_all.csv có thêm {appended} dòng. Tải merged.zip."
return status, merged_zip
def _split_segments_action(
table_rows: list[list[Any]] | None,
segments_state: list[dict],
audio_state: list[str],
):
if not shutil.which("ffmpeg"):
return "Cần cài ffmpeg để tách đoạn.", None
if not segments_state:
return "Chạy diarization trước.", None
if not audio_state or len(audio_state) < 1 or not audio_state[0]:
return "Thiếu thông tin file đã chuẩn hóa.", None
prepared_path = Path(audio_state[0])
tmp_root = Path(tempfile.mkdtemp(prefix="segments_"))
output_dir = tmp_root / "data"
output_dir.mkdir(parents=True, exist_ok=True)
metadata_path = output_dir / "metadata.csv"
rows = _table_to_rows(table_rows)
try:
with metadata_path.open("w", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(
[
"id",
"file_name",
"start_mmss",
"end_mmss",
"gender",
"region",
"transcription",
"speaker",
"duration_sec",
]
)
for idx, seg in enumerate(segments_state):
row = rows[idx] if idx < len(rows) else []
# row order: start_mmss, end_mmss, speaker, gender, region, transcription
gender = _normalize_label(row[3] if len(row) > 3 else "")
region = _normalize_label(row[4] if len(row) > 4 else "")
transcription = row[5] if len(row) > 5 else ""
if gender and gender not in ALLOWED_GENDER:
return f"Lỗi: giới tính hàng {idx+1} phải là nam/nữ.", None
if region and region not in ALLOWED_REGION:
return f"Lỗi: vùng miền hàng {idx+1} phải là bắc/trung/nam.", None
gender_code = GENDER_MAP.get(gender, "")
region_code = REGION_MAP.get(region, "")
seg_id = f"id_{gender_code or 'x'}_{region_code or 'x'}_{idx:03d}"
gender_disp = "Nam" if gender_code == "0" else "Nữ" if gender_code == "1" else gender
region_disp = (
"Bắc"
if region_code == "0"
else "Trung"
if region_code == "1"
else "Nam"
if region_code == "2"
else region
)
start = float(seg["start"])
end = float(seg["end"])
duration = max(end - start, 0.0)
out_file = output_dir / f"{seg_id}.wav"
cmd = [
"ffmpeg",
"-y",
"-i",
str(prepared_path),
"-ss",
f"{start:.3f}",
"-to",
f"{end:.3f}",
"-ac",
"1",
"-ar",
"16000",
"-vn",
"-f",
"wav",
str(out_file),
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
stderr = result.stderr.strip()
raise RuntimeError(f"ffmpeg lỗi khi tách đoạn {idx}: {stderr}")
writer.writerow(
[
seg_id,
out_file.name,
seconds_to_mmss(start),
seconds_to_mmss(end),
gender_disp,
region_disp,
transcription,
seg.get("speaker", ""),
duration,
]
)
# Tạo file zip từ thư mục output_dir
zip_base_name = tmp_root / "segments_output"
archive = shutil.make_archive(str(zip_base_name), "zip", output_dir)
return f"Tách {len(segments_state)} đoạn thành công. Tải zip bên dưới.", archive
except Exception as exc: # pragma: no cover
return f"Lỗi khi tách: {exc}", None
def build_interface() -> gr.Blocks:
with gr.Blocks(title="Vietnamese Diarization", analytics_enabled=False) as demo:
gr.Markdown(
"""
### Diarization tiếng Việt với pyannote
- Tải file âm thanh, điền Hugging Face access token (hoặc để trống nếu đã đặt trong môi trường/file).
- Chọn thiết bị chạy, nhấn Chạy. Kết quả hiển thị dạng bảng và file RTTM/JSON tải về.
"""
)
segments_state = gr.State([])
audio_state = gr.State({})
with gr.Row():
with gr.Column():
audio_input = gr.Audio(label="Tải file audio (tùy chọn)", type="filepath")
playback = gr.Audio(
label="Audio đã chuyển đổi/đang dùng",
type="filepath",
interactive=False,
)
with gr.Column():
url_input = gr.Textbox(
label="URL YouTube/TikTok (tùy chọn)",
placeholder="Dán link video nếu không tải file",
info="⚠️ Lưu ý: URL download không hoạt động trên HF Spaces (free tier). Vui lòng tải file trực tiếp.",
)
token_input = gr.Textbox(
label="Hugging Face access token (tùy chọn)",
type="password",
placeholder="Để trống nếu đã cấu hình môi trường hoặc hugging_face_key.txt",
)
device_input = gr.Dropdown(
choices=["auto", "cpu", "cuda"],
value="auto",
label="Thiết bị",
)
run_btn = gr.Button("Chạy diarization")
gr.Markdown(
"""
#### Gán nhãn và tách đoạn
- Chọn các ô gender (nam/nữ) và region (bắc/trung/nam) bằng dropdown trong bảng, transcription nhập tay.
- Nhấn "Tách và tải" để tải zip gồm các đoạn WAV và metadata.csv (không lưu lại trên server).
"""
)
segment_df = gr.Dataframe(
value=[],
headers=["start_mmss", "end_mmss", "speaker", "gender", "region", "transcription"],
datatype=["str", "str", "str", "str", "str", "str"],
interactive=True,
type="array",
)
gender_dropdown = gr.Dropdown(choices=["", "nam", "nữ"], value="", label="Giới tính chọn nhanh")
region_dropdown = gr.Dropdown(choices=["", "bắc", "trung", "nam"], value="", label="Vùng miền chọn nhanh")
transcription_input = gr.Textbox(label="Transcription (áp dụng nhanh)", lines=1, placeholder="Nhập lời thoại")
selection_info = gr.Textbox(label="Hàng đang chọn", interactive=False, value="Chưa chọn hàng")
split_btn = gr.Button("Tách và tải")
split_status = gr.Textbox(label="Trạng thái tách", lines=2)
split_zip_file = gr.File(label="Tải ZIP các đoạn đã tách")
gr.Markdown(
"""
#### Nhập ZIP đã tách (gộp nhiều ZIP thành một)
- Tải lên nhiều file ZIP đã tải về trước đó, công cụ sẽ gộp lại và tạo một merged.zip kèm metadata_all.csv.
"""
)
import_files = gr.File(label="Chọn nhiều ZIP", file_count="multiple", file_types=[".zip"])
import_btn = gr.Button("Nhập ZIP vào thư mục chung")
import_status = gr.Textbox(label="Trạng thái nhập ZIP", lines=2)
merged_zip_file = gr.File(label="Tải ZIP đã gộp")
result_box = gr.Textbox(label="Bảng phân đoạn", lines=12)
rttm_file = gr.File(label="Tải RTTM")
json_file = gr.File(label="Tải JSON")
selected_row = gr.State(-1)
run_btn.click(
fn=_diarize_action,
inputs=[audio_input, token_input, device_input, url_input],
outputs=[result_box, rttm_file, json_file, segment_df, segments_state, audio_state, playback],
)
segment_df.select(
fn=_select_row_action,
inputs=None,
outputs=[selection_info, selected_row],
)
gender_dropdown.change(
fn=_apply_dropdown_action,
inputs=[segment_df, selected_row, gender_dropdown, region_dropdown, transcription_input],
outputs=[segment_df, selection_info],
)
region_dropdown.change(
fn=_apply_dropdown_action,
inputs=[segment_df, selected_row, gender_dropdown, region_dropdown, transcription_input],
outputs=[segment_df, selection_info],
)
transcription_input.change(
fn=_apply_dropdown_action,
inputs=[segment_df, selected_row, gender_dropdown, region_dropdown, transcription_input],
outputs=[segment_df, selection_info],
)
split_btn.click(
fn=_split_segments_action,
inputs=[segment_df, segments_state, audio_state],
outputs=[split_status, split_zip_file],
)
import_btn.click(
fn=_import_archives_action,
inputs=[import_files],
outputs=[import_status, merged_zip_file],
)
return demo
if __name__ == "__main__":
import sys
print("=" * 60, file=sys.stderr)
print("Khởi tạo Vietnamese Diarization App...", file=sys.stderr)
print("=" * 60, file=sys.stderr)
try:
demo = build_interface()
print("Interface đã được khởi tạo thành công!", file=sys.stderr)
demo.launch(
server_name="0.0.0.0",
server_port=7860,
)
except Exception as e:
print(f"LỖI khi khởi động app: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)