Spaces:
Sleeping
Sleeping
| import os, io, csv, json, random | |
| from datetime import datetime | |
| import gradio as gr | |
| from huggingface_hub import HfApi, hf_hub_download | |
| import subprocess, pathlib, hashlib | |
| import secrets, time, string | |
| from functools import lru_cache | |
| TOP_DIRS = [ | |
| "Hunyuan_videos", | |
| "Opensora_768", | |
| "RunwayGen4", | |
| "Wan2.2", | |
| "wan21_videos", | |
| ] | |
| def _choose_prefix_from_filename(base: str) -> str: | |
| best = None | |
| for d in TOP_DIRS: | |
| if ( | |
| base.startswith(d + "_") | |
| or base.startswith(d + "/") | |
| or base.startswith(d + "-") | |
| or base.startswith(d) | |
| ): | |
| if best is None or len(d) > len(best): | |
| best = d | |
| if best is not None: | |
| return best | |
| if "_" in base: | |
| return base.split("_")[0] | |
| return os.path.splitext(base)[0] | |
| def _extract_action_from_filename(base: str, prefix: str): | |
| after_prefix = base[len(prefix):] | |
| after_prefix = after_prefix.lstrip("_-") | |
| if not after_prefix: | |
| return None | |
| action = after_prefix.split("_")[0] | |
| return action or None | |
| def _patterns_for(prefix: str, action: str | None, base: str, root_prefix: str): | |
| """ | |
| root_prefix == "" -> 경로가 'Wan2.2/ThrowDiscus/file.mp4' | |
| root_prefix == "video_examples" -> 경로가 'video_examples/Wan2.2/ThrowDiscus/file.mp4' | |
| 즉 root_prefix를 앞에 optional하게 붙여서 두 세트를 만든다. | |
| """ | |
| # alt_prefix_videos: 'Wan2.2_videos' 같은 경우까지 백업 시도 | |
| alt_prefix_videos = prefix if prefix.endswith("_videos") else (prefix + "_videos") | |
| def rp(*parts): | |
| # parts를 '/'로 join, 그리고 root_prefix가 비어있지 않으면 그 앞에 붙이기 | |
| path = "/".join([p for p in parts if p]) | |
| if root_prefix: | |
| return root_prefix + "/" + path | |
| else: | |
| return path | |
| cand = [ | |
| rp(prefix, prefix, base), # Wan2.2/Wan2.2/file | |
| rp(prefix, alt_prefix_videos, base), # Wan2.2/Wan2.2_videos/file | |
| rp(prefix, base), # Wan2.2/file | |
| rp(base), # file at root | |
| ] | |
| if action: | |
| cand += [ | |
| rp(prefix, action, base), # Wan2.2/ThrowDiscus/file | |
| rp(prefix, prefix, action, base), # Wan2.2/Wan2.2/ThrowDiscus/file | |
| rp(prefix, action, f"{prefix}_{action}", base), # Wan2.2/ThrowDiscus/Wan2.2_ThrowDiscus/file | |
| rp(prefix, f"{prefix}_{action}", base), # Wan2.2/Wan2.2_ThrowDiscus/file | |
| ] | |
| # uniq 제거 | |
| uniq = [] | |
| for c in cand: | |
| if c not in uniq: | |
| uniq.append(c) | |
| return uniq | |
| def _candidate_relpaths(url_or_name: str): | |
| base = os.path.basename(url_or_name).strip() | |
| prefix = _choose_prefix_from_filename(base) | |
| action = _extract_action_from_filename(base, prefix) | |
| # 이제 두 root 세트를 합친다: | |
| # 1) 루트 바로 밑 (root_prefix = "") | |
| # 2) 루트 안의 video_examples/ 밑 (root_prefix = "video_examples") | |
| cands_root = _patterns_for(prefix, action, base, root_prefix="") | |
| cands_videos = _patterns_for(prefix, action, base, root_prefix="video_examples") | |
| # 합치고 중복 제거 | |
| full = [] | |
| for c in cands_root + cands_videos: | |
| if c not in full: | |
| full.append(c) | |
| return full | |
| def get_local_video_path(hf_url_or_relpath: str) -> str: | |
| # if already local, just ensure it's muted | |
| if os.path.exists(hf_url_or_relpath): | |
| return ensure_muted_copy(hf_url_or_relpath) | |
| if not HF_TOKEN: | |
| raise RuntimeError("HF_TOKEN is not set or empty in this Space. Please add it as a secret.") | |
| last_err = None | |
| for rel in _candidate_relpaths(hf_url_or_relpath): | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=rel, | |
| repo_type=REPO_TYPE, | |
| token=HF_TOKEN, | |
| local_dir="/tmp", | |
| local_dir_use_symlinks=False, | |
| ) | |
| return ensure_muted_copy(local_path) | |
| except Exception as e: | |
| last_err = e | |
| continue | |
| raise RuntimeError( | |
| "Could not locate video in repo " | |
| + REPO_ID | |
| + ". Tried: " | |
| + ", ".join(_candidate_relpaths(hf_url_or_relpath)) | |
| + f". Last error was {type(last_err).__name__}: {last_err}" | |
| ) | |
| # 사용자가 '선택'을 완료한 값 (초기엔 None) | |
| selected_action = gr.State(None) | |
| selected_phys = gr.State(None) | |
| def _recompute_save(pid_text: str, sel_a, sel_p): | |
| ok = bool(pid_text and pid_text.strip()) and (sel_a is not None) and (sel_p is not None) | |
| return gr.update(interactive=ok, variant=("primary" if ok else "secondary")) | |
| REWARD_FILE = "reward_codes.csv" # 리워드 코드 기록용 파일 (HF dataset 안에 저장) | |
| def _read_codes_bytes(): | |
| try: | |
| p = hf_hub_download( | |
| repo_id=RESULTS_REPO_ID, filename=REWARD_FILE, repo_type="dataset", | |
| token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False | |
| ) | |
| return open(p, "rb").read() | |
| except Exception: | |
| return None | |
| def _append_code(old_bytes, row): | |
| s = io.StringIO() | |
| w = csv.writer(s) | |
| if not old_bytes: | |
| # 새 헤더 | |
| w.writerow(["ts_iso", "participant_id", "reward_code", "total_done"]) | |
| else: | |
| s.write(old_bytes.decode("utf-8", errors="ignore")) | |
| w.writerow(row) | |
| return s.getvalue().encode("utf-8") | |
| def _persist_reward_code(pid: str, code: str, total_done: int): | |
| """리워드 코드를 HF에 reward_codes.csv로 누적 저장(append).""" | |
| old = _read_codes_bytes() | |
| row = [datetime.utcnow().isoformat(), pid.strip(), code, int(total_done)] | |
| newb = _append_code(old, row) | |
| api.upload_file( | |
| path_or_fileobj=io.BytesIO(newb), | |
| path_in_repo=REWARD_FILE, | |
| repo_id=RESULTS_REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message="append reward code" | |
| ) | |
| def _gen_reward_code(pid: str, length: int = 10, forbid_ambiguous: bool=True) -> str: | |
| """ | |
| 참가자에게 보여줄 랜덤 코드. 충돌 위험 매우 낮음. | |
| - PID + 시각 + 보안랜덤으로 시드 | |
| - 대문자/숫자 조합 | |
| """ | |
| alphabet = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" if forbid_ambiguous else string.ascii_uppercase + string.digits | |
| # secrets 기반 난수 + pid/time 섞어서 해시 비슷한 효과 | |
| rnd = secrets.token_hex(8) + pid + str(time.time_ns()) | |
| # 섞고 뽑기 | |
| rng = secrets.SystemRandom() | |
| return "".join(rng.choice(alphabet) for _ in range(length)) | |
| MUTED_CACHE_DIR = "/tmp/hf_video_cache_muted" | |
| pathlib.Path(MUTED_CACHE_DIR).mkdir(parents=True, exist_ok=True) | |
| def _sha1_8(s: str) -> str: | |
| return hashlib.sha1(s.encode("utf-8")).hexdigest()[:8] | |
| def ensure_muted_copy(src_path: str) -> str: | |
| """ | |
| 주어진 mp4에서 오디오 트랙을 제거(-an)한 무음 복사본을 캐시에 생성. | |
| 이미 있으면 그 파일 경로 반환. | |
| 실패하면 원본 경로 반환. | |
| """ | |
| if not src_path or not os.path.exists(src_path): | |
| return src_path | |
| out = os.path.join(MUTED_CACHE_DIR, _sha1_8(src_path) + ".mp4") | |
| if os.path.exists(out): | |
| return out | |
| try: | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", src_path, "-c:v", "copy", "-an", out], | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True | |
| ) | |
| return out if os.path.exists(out) else src_path | |
| except Exception: | |
| return src_path | |
| FEEDBACK_FILE = "final_feedback.csv" | |
| def _read_feedback_bytes(): | |
| try: | |
| p = hf_hub_download( | |
| repo_id=RESULTS_REPO_ID, filename=FEEDBACK_FILE, repo_type="dataset", | |
| token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False | |
| ) | |
| return open(p, "rb").read() | |
| except Exception: | |
| return None | |
| def _append_feedback(old_bytes, row): | |
| s = io.StringIO() | |
| w = csv.writer(s) | |
| if not old_bytes: | |
| # 최종 코멘트 전용 CSV 헤더 | |
| w.writerow(["ts_iso", "participant_id", "final_comment"]) | |
| else: | |
| s.write(old_bytes.decode("utf-8", errors="ignore")) | |
| w.writerow(row) | |
| return s.getvalue().encode("utf-8") | |
| def push_final_feedback(participant_id: str, comment: str): | |
| """마지막에 받는 자유 코멘트를 FEEDBACK_FILE로 저장.""" | |
| if not participant_id or not participant_id.strip(): | |
| return gr.update(visible=True, value="❗ Missing participant ID.") | |
| if comment is None or not str(comment).strip(): | |
| # 비어있으면 저장하지 않고 안내만 | |
| return gr.update(visible=True, value="ℹ️ No comment entered — nothing to submit.") | |
| try: | |
| old = _read_feedback_bytes() | |
| row = [datetime.utcnow().isoformat(), participant_id.strip(), comment.strip()] | |
| newb = _append_feedback(old, row) | |
| if not REPO_ID: | |
| return gr.update(visible=True, value="❗ RESULTS_REPO is not set.") | |
| if not HF_TOKEN: | |
| return gr.update(visible=True, value="❗ HF_TOKEN is missing. Set a write token for the dataset repo.") | |
| api.upload_file( | |
| path_or_fileobj=io.BytesIO(newb), | |
| path_in_repo=FEEDBACK_FILE, | |
| repo_id=RESULTS_REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message="append final feedback" | |
| ) | |
| return gr.update(visible=True, value="✅ Thanks! Your feedback was submitted.") | |
| except Exception as e: | |
| return gr.update( | |
| visible=True, | |
| value=f"❌ Feedback save failed: {type(e).__name__}: {e}" | |
| ) | |
| # -------------------- Config -------------------- | |
| # REPO_ID = os.getenv("RESULTS_REPO", "dghadiya/video_eval") # 업로드한 리포와 일치 | |
| REPO_ID = os.getenv("RESULTS_REPO", "dghadiya/video_examples") | |
| RESULTS_REPO_ID = "dghadiya/video_eval" # <- 네가 원하는 최종 저장 위치 | |
| REPO_TYPE = "dataset" | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| RESULTS_FILE = "results.csv" | |
| TOTAL_PER_PARTICIPANT = 30 # 목표 평가 개수(세션 기준) | |
| # videos.json 예시: {"url": "...mp4", "id": "BodyWeightSquats__XXXX.mp4", "action": "BodyWeightSquats"} | |
| with open("videos.json", "r", encoding="utf-8") as f: | |
| V = json.load(f) | |
| api = HfApi() | |
| # 교수님 지침(그대로, 굵게 처리 포함) | |
| # INSTRUCTION_MD = """ | |
| # **Task:** You will watch a series of **AI-generated videos**. For each video, your job is to rate how well the person’s action in the AI-generated video matches the action specified as "**expected action**". Some things to keep in mind: | |
| # - The generated video should **capture** the expected action **throughout the video**. | |
| # - Try to **focus only** on the expected action and do **not** judge **video quality**, **attractiveness**, **background**, **camera motion**, or **objects**. | |
| # - You will be **paid** once **all the videos are viewed and rated**. | |
| # """ | |
| INSTRUCTION_MD = """ | |
| **Task:** You will watch a series of **AI-generated videos**. | |
| For each video, your job is to rate: | |
| 1. **Action Consistency** - how well the person’s action matches the action specified as the "**Expected action**". | |
| 2. **Temporal Coherence** - how natural and physically possible the motion looks. | |
| Some things to keep in mind: | |
| - Try to **focus only** on the expected action and motion quality, and do **not** judge **video quality**, **attractiveness**, **background**, **camera motion**, or **objects**. | |
| - For **physical plausibility**, look for **smooth and realistic** motion without impossible poses, missing limbs, or extreme stretching. | |
| - Action consistency and physical plausibility are **not mutually exclusive** with each other. | |
| - **Physically plausible motion does not imply correct depiction of action.** | |
| - **A video cannot portray action consistency if it has physically impossible movements.** | |
| - **0: indicates really poor depiction, 10: represents perfect, realistic depiction** | |
| - The **Save & Next** button will be enabled **only after you have clicked or adjusted both sliders at least once**. | |
| - You will be **paid** once **all the videos are viewed and rated**. | |
| """ | |
| # -------------------- Helper funcs -------------------- | |
| def _load_eval_counts(): | |
| """ | |
| Hugging Face dataset의 results.csv를 읽어 video_id별 평가 개수(dict)를 반환. | |
| 없으면 0으로 초기화. | |
| """ | |
| # 모든 id를 0으로 초기화 | |
| counts = {} | |
| for v in V: | |
| vid = _get_video_id(v) | |
| counts[vid] = 0 | |
| b = _read_csv_bytes() | |
| if not b: | |
| return counts | |
| s = io.StringIO(b.decode("utf-8", errors="ignore")) | |
| r = csv.reader(s) | |
| rows = list(r) | |
| if not rows: | |
| return counts | |
| # 헤더 파악 | |
| header = rows[0] | |
| body = rows[1:] if header and ("video_id" in header or "overall" in header) else rows | |
| vid_col = None | |
| if header and "video_id" in header: | |
| vid_col = header.index("video_id") | |
| for row in body: | |
| try: | |
| vid = row[vid_col] if vid_col is not None else row[2] # 기본 포맷: ts, pid, video_id, overall, notes | |
| if vid in counts: | |
| counts[vid] += 1 | |
| except Exception: | |
| continue | |
| return counts | |
| def _get_video_id(v: dict) -> str: | |
| if "id" in v and v["id"]: | |
| return v["id"] | |
| # id가 없으면 URL 파일명으로 대체 | |
| return os.path.basename(v.get("url", "")) | |
| def _read_csv_bytes(): | |
| try: | |
| p = hf_hub_download( | |
| repo_id=RESULTS_REPO_ID, filename=RESULTS_FILE, repo_type="dataset", | |
| token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False | |
| ) | |
| return open(p, "rb").read() | |
| except Exception: | |
| return None | |
| # def _append(old_bytes, row): | |
| # s = io.StringIO() | |
| # w = csv.writer(s) | |
| # if not old_bytes: | |
| # # ✅ 새 헤더 | |
| # w.writerow(["ts_iso", "participant_id", "video_id", "overall", "notes"]) | |
| # else: | |
| # s.write(old_bytes.decode("utf-8", errors="ignore")) | |
| # w.writerow(row) | |
| # return s.getvalue().encode("utf-8") | |
| def _append(old_bytes, row): | |
| s = io.StringIO() | |
| w = csv.writer(s) | |
| if not old_bytes: | |
| # ✅ new header with two scores | |
| w.writerow(["ts_iso", "participant_id", "video_id", | |
| "action_consistency", "physical_plausibility", "notes"]) | |
| else: | |
| s.write(old_bytes.decode("utf-8", errors="ignore")) | |
| w.writerow(row) | |
| return s.getvalue().encode("utf-8") | |
| # def push(participant_id, video_id, score, notes=""): | |
| # if not participant_id or not participant_id.strip(): | |
| # return gr.update(visible=True, value="❗ Please enter your Participant ID before proceeding.") | |
| # if not video_id or score is None: | |
| # return gr.update(visible=True, value="❗ Fill out all fields.") | |
| # try: | |
| # old = _read_csv_bytes() | |
| # row = [ | |
| # datetime.utcnow().isoformat(), | |
| # participant_id.strip(), | |
| # video_id, # ✅ action 대신 video_id 저장 | |
| # float(score), # overall | |
| # notes or "" | |
| # ] | |
| # newb = _append(old, row) | |
| # if not REPO_ID: | |
| # return gr.update(visible=True, value="❗ RESULTS_REPO is not set.") | |
| # if not HF_TOKEN: | |
| # return gr.update(visible=True, value="❗ HF_TOKEN is missing. Set a write token for the dataset repo.") | |
| # api.upload_file( | |
| # path_or_fileobj=io.BytesIO(newb), | |
| # path_in_repo=RESULTS_FILE, | |
| # repo_id=REPO_ID, | |
| # repo_type="dataset", | |
| # token=HF_TOKEN, | |
| # commit_message="append" | |
| # ) | |
| # return gr.update(visible=True, value=f"✅ Saved successfully!") | |
| # except Exception as e: | |
| # return gr.update( | |
| # visible=True, | |
| # value=f"❌ Save failed: {type(e).__name__}: {e}\n" | |
| # f"- Check HF_TOKEN permission\n- Check REPO_ID\n- Create dataset repo if missing" | |
| # ) | |
| def push(participant_id, video_id, action_score, phys_score, notes=""): | |
| if not participant_id or not participant_id.strip(): | |
| return gr.update(visible=True, value="❗ Please enter your Participant ID before proceeding.") | |
| if not video_id or action_score is None or phys_score is None: | |
| return gr.update(visible=True, value="❗ Fill out all fields.") | |
| try: | |
| old = _read_csv_bytes() | |
| row = [ | |
| datetime.utcnow().isoformat(), | |
| participant_id.strip(), | |
| video_id, | |
| float(action_score), | |
| float(phys_score), | |
| notes or "" | |
| ] | |
| newb = _append(old, row) | |
| if not REPO_ID: | |
| return gr.update(visible=True, value="❗ RESULTS_REPO is not set.") | |
| if not HF_TOKEN: | |
| return gr.update(visible=True, value="❗ HF_TOKEN is missing. Set a write token for the dataset repo.") | |
| api.upload_file( | |
| path_or_fileobj=io.BytesIO(newb), | |
| path_in_repo=RESULTS_FILE, | |
| repo_id=RESULTS_REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message="append" | |
| ) | |
| return gr.update(visible=True, value=f"✅ Saved successfully!") | |
| except Exception as e: | |
| return gr.update( | |
| visible=True, | |
| value=f"❌ Save failed: {type(e).__name__}: {e}\n" | |
| f"- Check HF_TOKEN permission\n- Check REPO_ID\n- Create dataset repo if missing" | |
| ) | |
| def _extract_action(v): | |
| if "action" in v and v["action"]: | |
| return v["action"] | |
| raw = v.get("id", "") | |
| return raw.split("__")[0].split(".")[0] | |
| def pick_one(): | |
| v = random.choice(V) | |
| return v["url"], _extract_action(v) | |
| def _progress_html(done, total): | |
| pct = int(100 * done / max(1, total)) | |
| return f""" | |
| <div style="border:1px solid #ddd; height:20px; border-radius:6px; overflow:hidden; margin-top:6px;"> | |
| <div style="height:100%; width:{pct}%; background:#3b82f6;"></div> | |
| </div> | |
| <div style="font-size:12px; margin-top:4px;">{done} / {total}</div> | |
| """ | |
| def _build_order_with_anchor(total:int, anchor_idx:int, repeats:int, pool_size:int, min_gap:int=1): | |
| """ | |
| total: TOTAL_PER_PARTICIPANT (e.g., 30) | |
| anchor_idx: index of the anchor video in V (0 for first item) | |
| repeats: how many times to show anchor (e.g., 5) | |
| pool_size: len(V) | |
| min_gap: 최소 간격(인접 금지 => 1) | |
| return: list of indices (length=total) | |
| """ | |
| assert repeats <= total, "repeats must be <= total" | |
| assert pool_size >= 1, "videos pool must be non-empty" | |
| # 1) 다른 비디오 25개(중복 없이) 뽑기 | |
| others_needed = total - repeats | |
| # anchor를 제외한 후보 인덱스 | |
| candidates = list(range(1, pool_size)) if anchor_idx == 0 else [i for i in range(pool_size) if i != anchor_idx] | |
| if len(candidates) < others_needed: | |
| raise ValueError("Not enough unique non-anchor videos to fill the schedule without duplication.") | |
| others = random.sample(candidates, k=others_needed) | |
| # 2) 기본 시퀀스(others)를 무작위로 섞기 | |
| random.shuffle(others) | |
| # 3) 앵커를 min_gap를 만족하도록 삽입할 위치 선정 | |
| # 30개를 5구간으로 나눠, 각 구간 내에서 충돌 덜 나게 배치 | |
| # (간단하고 안정적인 방식) | |
| seq = others[:] # 길이=25 | |
| anchor_positions = [] | |
| segment = total // repeats # 30//5 = 6 | |
| for k in range(repeats): | |
| # 각 구간 [k*segment, (k+1)*segment) 안에서 후보 위치를 고름 | |
| lo = k * segment | |
| hi = (k + 1) * segment if k < repeats - 1 else total # 마지막은 끝까지 | |
| # 경계 내 임의 오프셋 선택 (여유를 두고 충돌을 피함) | |
| candidate_pos = random.randrange(lo, hi) | |
| # 인접 금지 보정: 이미 배정된 anchor 위치들과의 거리가 min_gap 이상 되도록 조정 | |
| # 필요 시 좌우로 근접한 빈 슬롯 탐색 | |
| def ok(pos): | |
| return all(abs(pos - p) >= min_gap + 1 for p in anchor_positions) # 연속금지 => 거리 >= 2 | |
| # 근방 탐색 폭 | |
| found = None | |
| for delta in range(0, segment): # 구간 크기 내에서 탐색 | |
| # 좌/우 번갈아가며 후보 시도 | |
| for sign in (+1, -1): | |
| pos = candidate_pos + sign * delta | |
| if 0 <= pos < total and ok(pos): | |
| found = pos | |
| break | |
| if found is not None: | |
| break | |
| if found is None: | |
| # 최후: 0..total-1 범위에서 아무 데나 충돌 없는 곳 찾기 | |
| for pos in range(total): | |
| if ok(pos): | |
| found = pos | |
| break | |
| if found is None: | |
| raise RuntimeError("Failed to place anchor without adjacency. Try different strategy or loosen min_gap.") | |
| anchor_positions.append(found) | |
| # 4) others를 기반으로 길이 total의 빈 시퀀스를 만들고 anchor를 주입 | |
| # 우선 빈 리스트를 만들고 anchor 위치를 채운 후, 나머지를 others로 채움 | |
| result = [None] * total | |
| for pos in anchor_positions: | |
| result[pos] = anchor_idx | |
| # others 포인터 | |
| j = 0 | |
| for i in range(total): | |
| if result[i] is None: | |
| result[i] = others[j] | |
| j += 1 | |
| # 안전체크 | |
| assert len(result) == total | |
| # 인접 anchor 없는지 확인 | |
| for i in range(1, total): | |
| assert not (result[i] == anchor_idx and result[i-1] == anchor_idx), "Adjacent anchors found." | |
| # anchor 개수 확인 | |
| assert sum(1 for x in result if x == anchor_idx) == repeats, "Anchor count mismatch." | |
| return result | |
| # -------------------- Example videos (download to local cache) -------------------- | |
| EXAMPLES = { | |
| "BodyWeightSquats": { | |
| "real": "examples/BodyWeightSquats_real.mp4", | |
| "bad": "examples/BodyWeightSquats_bad.mp4", | |
| }, | |
| "WallPushUps": { | |
| "real": "examples/WallPushUps_real.mp4", | |
| "bad": "examples/WallPushUps_bad.mp4", | |
| }, | |
| } | |
| # EX_CACHE = {} | |
| # for cls, files in EXAMPLES.items(): | |
| # EX_CACHE[cls] = {"real": None, "bad": None} | |
| # for kind, fname in files.items(): | |
| # try: | |
| # EX_CACHE[cls][kind] = hf_hub_download( | |
| # repo_id=REPO_ID, | |
| # filename=fname, | |
| # repo_type="dataset", | |
| # token=HF_TOKEN, | |
| # local_dir="/tmp", | |
| # local_dir_use_symlinks=False, | |
| # ) | |
| # except Exception as e: | |
| # print(f"[WARN] example missing: {cls} {kind} -> {fname}: {e}") | |
| # for cls in EX_CACHE: | |
| # for kind in EX_CACHE[cls]: | |
| # if EX_CACHE[cls][kind]: | |
| # EX_CACHE[cls][kind] = ensure_muted_copy(EX_CACHE[cls][kind]) | |
| CLEAN_BG_CSS = r""" | |
| /* 전체 페이지 배경을 흰색으로 (Spaces의 회색 배경 제거) */ | |
| html, body { background:#ffffff !important; } | |
| /* Intro/Eval 섹션의 래퍼 박스만 투명 처리 (입력창/슬라이더 등 컨트롤은 놔둠) */ | |
| #intro .gr-panel, #intro .gr-group, #intro .gr-box, #intro .gr-row, #intro .gr-column, | |
| #eval .gr-panel, #eval .gr-group, #eval .gr-box, #eval .gr-row, #eval .gr-column { | |
| background: transparent !important; | |
| box-shadow: none !important; | |
| border-color: transparent !important; | |
| } | |
| /* 비디오 카드 주변 툴바(작은 회색 박스)도 투명하게 */ | |
| #intro [data-testid="block-video"] .prose, | |
| #eval [data-testid="block-video"] .prose { | |
| background: transparent !important; | |
| box-shadow: none !important; | |
| border-color: transparent !important; | |
| } | |
| """ | |
| # -------------------- UI -------------------- | |
| with gr.Blocks(css=CLEAN_BG_CSS) as demo: | |
| # Blocks 안, 어디서든 한 번만 추가(권장 위치: Blocks 시작 직후) | |
| gr.HTML(""" | |
| <script> | |
| (function(){ | |
| // 모든 일반 DOM + Shadow DOM 안의 <video>까지 찾아 음소거 | |
| function eachNodeWithShadow(root, fn){ | |
| const walker = document.createTreeWalker(root, NodeFilter.SHOW_ELEMENT); | |
| fn(root); | |
| while (walker.nextNode()){ | |
| const el = walker.currentNode; | |
| fn(el); | |
| if (el.shadowRoot){ | |
| eachNodeWithShadow(el.shadowRoot, fn); | |
| } | |
| } | |
| } | |
| function muteVideoEl(v){ | |
| try{ | |
| // 재생 전에 반드시 muted 속성이 있어야 브라우저가 소리 차단 | |
| v.muted = true; | |
| v.volume = 0.0; | |
| v.setAttribute('muted',''); | |
| v.setAttribute('playsinline',''); | |
| v.setAttribute('preload','metadata'); | |
| // 사용자가 볼륨/음소거를 바꿔도 다시 0으로 | |
| if (!v._muteHooked){ | |
| v.addEventListener('volumechange', () => { | |
| if (!v.muted || v.volume > 0) { v.muted = true; v.volume = 0.0; } | |
| }); | |
| v.addEventListener('play', () => { | |
| v.muted = true; v.volume = 0.0; | |
| }); | |
| v._muteHooked = true; | |
| } | |
| }catch(e){} | |
| } | |
| function muteAll(root){ | |
| eachNodeWithShadow(root || document, (el)=>{ | |
| if (el.tagName && el.tagName.toLowerCase() === 'video'){ | |
| muteVideoEl(el); | |
| }else if (el.querySelectorAll){ | |
| el.querySelectorAll('video').forEach(muteVideoEl); | |
| } | |
| }); | |
| } | |
| // 초기 렌더에서 | |
| muteAll(document); | |
| // 이후 DOM 변화 감시(Shadow DOM 내부 변화도 포착) | |
| const obs = new MutationObserver(() => muteAll(document)); | |
| obs.observe(document, {subtree:true, childList:true, attributes:false}); | |
| })(); | |
| </script> | |
| """) | |
| order_state = gr.State(value=[]) # v4에서는 value= 권장 | |
| ptr_state = gr.State(value=0) | |
| cur_video_id = gr.State(value="") | |
| reward_code_state = gr.State(value="") # 완료 시 생성한 코드 저장(중복 생성 방지) | |
| selected_action = gr.State(value=None) # 아직 선택 없음 | |
| selected_phys = gr.State(value=None) # 아직 선택 없음 | |
| # ------------------ PAGE 1: Intro + Examples ------------------ | |
| # page_intro = gr.Group(visible=True) | |
| page_intro = gr.Group(visible=True, elem_id="intro") | |
| with page_intro: | |
| # gr.Markdown("## 🎯 Action Consistency Human Evaluation") | |
| gr.Markdown("## 🎯 Human Evaluation: Action Consistency & Temporal Coherence") | |
| gr.Markdown(INSTRUCTION_MD) | |
| understood = gr.Checkbox(label="I have read and understand the task.", value=False) | |
| start_btn = gr.Button("Yes, start", variant="secondary", interactive=False) | |
| def _toggle_start(checked: bool): | |
| return gr.update(interactive=checked, variant=("primary" if checked else "secondary")) | |
| understood.change(_toggle_start, inputs=understood, outputs=start_btn) | |
| # # Examples: Squats | |
| # with gr.Group(): | |
| # gr.Markdown("### Examples: BodyWeightSquats") | |
| # with gr.Row(): | |
| # with gr.Column(): | |
| # gr.Markdown("**Expected depiction of action**") | |
| # gr.Video(value=EX_CACHE["BodyWeightSquats"]["real"], height=240, autoplay=False, elem_id="ex_squats_real",) | |
| # with gr.Column(): | |
| # gr.Markdown("**Poorly generated action**") | |
| # gr.Video(value=EX_CACHE["BodyWeightSquats"]["bad"], height=240, autoplay=False) | |
| # if not (EX_CACHE["BodyWeightSquats"]["real"] and EX_CACHE["BodyWeightSquats"]["bad"]): | |
| # gr.Markdown("> ⚠️ Upload `examples/BodyWeightSquats_real.mp4` and `_bad.mp4` to show both samples.") | |
| # # Examples: WallPushUps | |
| # with gr.Group(): | |
| # gr.Markdown("### Examples: WallPushUps") | |
| # with gr.Row(): | |
| # with gr.Column(): | |
| # gr.Markdown("**Expected depiction of action**") | |
| # gr.Video(value=EX_CACHE["WallPushUps"]["real"], height=240, autoplay=False, elem_id="ex_wallpushups_real",) | |
| # with gr.Column(): | |
| # gr.Markdown("**Poorly generated action**") | |
| # gr.Video(value=EX_CACHE["WallPushUps"]["bad"], height=240, autoplay=False) | |
| # if not (EX_CACHE["WallPushUps"]["real"] and EX_CACHE["WallPushUps"]["bad"]): | |
| # gr.Markdown("> ⚠️ Upload `examples/WallPushUps_real.mp4` and `_bad.mp4` to show both samples.") | |
| # Examples: BodyWeightSquats | |
| with gr.Group(): | |
| gr.Markdown("### Examples: BodyWeightSquats") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("**Expected depiction of action**") | |
| gr.Video( | |
| value="examples/BodyWeightSquats_good.mp4", # 무음으로 만든 좋은 예시 | |
| height=240, | |
| autoplay=False, | |
| elem_id="ex_squats_real", | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("**Poorly generated action**") | |
| gr.Video( | |
| value="examples/BodyWeightSquats_bad.mp4", # 나쁜 예시 | |
| height=240, | |
| autoplay=False, | |
| ) | |
| # Examples: WallPushUps | |
| with gr.Group(): | |
| gr.Markdown("### Examples: WallPushUps") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("**Expected depiction of action**") | |
| gr.Video( | |
| value="examples/WallPushups_good.mp4", # 무음으로 다시 만든 good 버전 | |
| height=240, | |
| autoplay=False, | |
| elem_id="ex_wallpushups_real", | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("**Poorly generated action**") | |
| gr.Video( | |
| value="examples/WallPushups_bad.mp4", # bad 버전 | |
| height=240, | |
| autoplay=False, | |
| ) | |
| gr.HTML(""" | |
| <script> | |
| (function(){ | |
| const ids = [ | |
| "ex_squats_real", | |
| "ex_wallpushups_real", | |
| ]; | |
| function muteById(id){ | |
| const host = document.getElementById(id); | |
| if(!host) return; | |
| // gr.Video는 Shadow DOM 내부에 <video>가 랜더링됨 | |
| const roots = [host, host.shadowRoot].filter(Boolean); | |
| roots.forEach(root=>{ | |
| const vids = root.querySelectorAll('video'); | |
| vids.forEach(v=>{ | |
| try{ | |
| v.muted = true; | |
| v.volume = 0.0; | |
| v.setAttribute('muted',''); | |
| v.setAttribute('playsinline',''); | |
| // 사용자가 바꿔도 즉시 원복 | |
| if(!v._exMuteHooked){ | |
| v.addEventListener('volumechange', ()=>{ | |
| if(!v.muted || v.volume > 0){ v.muted = true; v.volume = 0.0; } | |
| }); | |
| v.addEventListener('play', ()=>{ | |
| v.muted = true; v.volume = 0.0; | |
| }); | |
| v._exMuteHooked = true; | |
| } | |
| }catch(e){} | |
| }); | |
| }); | |
| } | |
| function apply(){ | |
| ids.forEach(muteById); | |
| } | |
| // 초기 + DOM변경 시 적용 | |
| apply(); | |
| new MutationObserver(apply).observe(document, {subtree:true, childList:true}); | |
| })(); | |
| </script> | |
| """) | |
| # ------------------ PAGE 2: Evaluation ------------------ | |
| page_eval = gr.Group(visible=False, elem_id="eval") | |
| with page_eval: | |
| # PID 입력 | |
| with gr.Row(): | |
| pid = gr.Textbox(label="Participant ID (required)", placeholder="e.g., Youngsun-2025/10/24") | |
| # 지침(원문) + 비디오 + 진행바 / 오른쪽에 슬라이더 + Save&Next | |
| with gr.Row(): #equal_height=True | |
| with gr.Column(scale=1): | |
| gr.Markdown(INSTRUCTION_MD) # 교수님 문구 그대로 | |
| video = gr.Video(label="Video", height=360) | |
| progress = gr.HTML(_progress_html(0, TOTAL_PER_PARTICIPANT)) | |
| with gr.Column(scale=1): | |
| action_tb = gr.Textbox(label="Expected action", interactive=False) | |
| # NEW: two separate sliders | |
| score_action = gr.Slider( | |
| minimum=0.0, maximum=10.0, step=0.1, value=5.0, | |
| label="Action Consistency (0.0 - 10.0)" | |
| ) | |
| score_phys = gr.Slider( | |
| minimum=0.0, maximum=10.0, step=0.1, value=5.0, | |
| label="Temporal Coherence (0.0 - 10.0)" | |
| ) | |
| save_next = gr.Button("💾 Save & Next ▶", variant="secondary", interactive=False) | |
| status = gr.Markdown(visible=False) | |
| done_state = gr.State(0) | |
| # PID 입력에 따라 Save&Next 토글 | |
| def _toggle_by_pid(pid_text: str): | |
| enabled = bool(pid_text and pid_text.strip()) | |
| return gr.update(interactive=enabled, variant=("primary" if enabled else "secondary")) | |
| # pid.change(_toggle_by_pid, inputs=pid, outputs=save_next) | |
| def _on_action_release(val: float, pid_text: str, sel_p): | |
| # 사용자가 액션 슬라이더를 한 번이라도 놓으면 선택 완료 | |
| return val, _recompute_save(pid_text, val, sel_p) | |
| def _on_phys_release(val: float, pid_text: str, sel_a): | |
| return val, _recompute_save(pid_text, sel_a, val) | |
| # release: 마우스를 놓을 때 1회 확정 | |
| score_action.release(_on_action_release, | |
| inputs=[score_action, pid, selected_phys], | |
| outputs=[selected_action, save_next]) | |
| score_phys.release(_on_phys_release, | |
| inputs=[score_phys, pid, selected_action], | |
| outputs=[selected_phys, save_next]) | |
| # PID가 바뀌면 현재 선택 상태로 Save 버튼 재계산 | |
| pid.change(_recompute_save, | |
| inputs=[pid, selected_action, selected_phys], | |
| outputs=save_next) | |
| page_thanks = gr.Group(visible=False) | |
| with page_thanks: | |
| reward_msg = gr.Markdown(visible=False) | |
| reward_code_box = gr.Textbox(label="Your reward code (copy & paste)", interactive=False, visible=False) | |
| reward_pid_box = gr.Textbox(label="Your participant ID", interactive=False, visible=False) | |
| gr.Markdown("### Any comments (optional)") | |
| feedback_tb = gr.Textbox( | |
| label="Any comments (optional)", | |
| placeholder="Leave any feedback for the study organizers (optional).", | |
| lines=4 | |
| ) | |
| feedback_submit = gr.Button("Submit feedback") | |
| feedback_status = gr.Markdown(visible=False) | |
| # -------- 페이지 전환 & 첫 로드 -------- | |
| ANCHOR_IDX = 0 # videos.json의 맨 첫 비디오 | |
| ANCHOR_REPEATS = 5 # 앵커 5회 | |
| MIN_GAP = 1 # 앵커 연속 금지(인접 금지) | |
| def _build_order_least_first_with_anchor(total:int, anchor_idx:int, repeats:int, min_gap:int=1): | |
| """ | |
| - results.csv를 읽어 video_id별 카운트를 계산 | |
| - 앵커(첫 비디오) 5회 포함, 연속 금지 | |
| - 나머지는 '가장 적게 평가된 순'으로 중복 없이 채움 | |
| """ | |
| assert repeats <= total | |
| N = len(V) | |
| assert N >= 1 | |
| # 0) id 매핑 | |
| def vid_of(i): return _get_video_id(V[i]) | |
| # 1) 현재 누적 카운트 로드 | |
| counts = _load_eval_counts() | |
| # 2) 앵커 제외 후보(중복 없이) 정렬: 카운트 오름차순, 동률은 랜덤 셔플 | |
| anchor_vid = vid_of(anchor_idx) | |
| candidates = [i for i in range(N) if i != anchor_idx] | |
| # 동률 랜덤화를 위해 일단 셔플 | |
| random.shuffle(candidates) | |
| candidates.sort(key=lambda i: counts.get(vid_of(i), 0)) | |
| others_needed = total - repeats | |
| if len(candidates) < others_needed: | |
| raise ValueError("Not enough unique non-anchor videos to fill the schedule without duplication.") | |
| others = candidates[:others_needed] # 중복 없이 선택 | |
| # 3) others를 베이스 시퀀스로(랜덤 살짝 섞기) | |
| random.shuffle(others) | |
| # 4) 앵커를 구간 배치(연속 금지) | |
| seq = [None] * total | |
| segment = total // repeats if repeats > 0 else total | |
| anchor_positions = [] | |
| for k in range(repeats): | |
| lo = k * segment | |
| hi = (k + 1) * segment if k < repeats - 1 else total | |
| cand = random.randrange(lo, hi) | |
| def ok(pos): | |
| return all(abs(pos - p) >= (min_gap + 1) for p in anchor_positions) | |
| found = None | |
| for d in range(0, max(1, segment)): | |
| for sgn in (+1, -1): | |
| pos = cand + sgn * d | |
| if 0 <= pos < total and ok(pos): | |
| found = pos | |
| break | |
| if found is not None: | |
| break | |
| if found is None: | |
| # 마지막 수단: 전체 탐색 | |
| for pos in range(total): | |
| if ok(pos): | |
| found = pos | |
| break | |
| if found is None: | |
| raise RuntimeError("Failed to place anchor without adjacency.") | |
| anchor_positions.append(found) | |
| for pos in anchor_positions: | |
| seq[pos] = anchor_idx | |
| # 5) 빈 자리를 others로 채우기 | |
| j = 0 | |
| for i in range(total): | |
| if seq[i] is None: | |
| seq[i] = others[j] | |
| j += 1 | |
| # 6) 안전 체크 | |
| assert sum(1 for x in seq if x == anchor_idx) == repeats | |
| for i in range(1, total): | |
| assert not (seq[i] == anchor_idx and seq[i-1] == anchor_idx), "Adjacent anchors found." | |
| return seq | |
| # def _start_and_load_first(): | |
| # total = TOTAL_PER_PARTICIPANT | |
| # order = _build_order_least_first_with_anchor( | |
| # total=total, | |
| # anchor_idx=ANCHOR_IDX, | |
| # repeats=ANCHOR_REPEATS, | |
| # min_gap=MIN_GAP | |
| # ) | |
| # first_idx = order[0] | |
| # v0 = V[first_idx] | |
| # return ( | |
| # gr.update(visible=False), | |
| # gr.update(visible=True), | |
| # gr.update(visible=False), | |
| # v0["url"], | |
| # _extract_action(v0), | |
| # 5.0, # score_action default | |
| # 5.0, # score_phys default ✅ NEW | |
| # gr.update(visible=False, value=""), | |
| # 0, | |
| # _progress_html(0, TOTAL_PER_PARTICIPANT), | |
| # order, | |
| # 1, | |
| # _get_video_id(v0), # cur_video_id | |
| # "", | |
| # None, # ⬅️ selected_action 초기화(= 아직 선택 안됨) | |
| # None, # ⬅️ selected_phys 초기화 | |
| # gr.update(interactive=False, variant="secondary"), # ⬅️ Save 버튼 잠금 | |
| # ) | |
| def _start_and_load_first(): | |
| total = TOTAL_PER_PARTICIPANT | |
| order = _build_order_least_first_with_anchor( | |
| total=total, | |
| anchor_idx=ANCHOR_IDX, | |
| repeats=ANCHOR_REPEATS, | |
| min_gap=MIN_GAP | |
| ) | |
| first_idx = order[0] | |
| v0 = V[first_idx] | |
| local_vid_path = get_local_video_path(v0["url"]) # <= 여기서 로컬 경로 확보 | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| local_vid_path, # <- v0["url"] 대신 로컬 경로 | |
| _extract_action(v0), | |
| 5.0, | |
| 5.0, | |
| gr.update(visible=False, value=""), | |
| 0, | |
| _progress_html(0, TOTAL_PER_PARTICIPANT), | |
| order, | |
| 1, | |
| _get_video_id(v0), | |
| "", | |
| None, | |
| None, | |
| gr.update(interactive=False, variant="secondary"), | |
| ) | |
| start_btn.click( | |
| _start_and_load_first, | |
| inputs=[], | |
| outputs=[ | |
| page_intro, page_eval, page_thanks, | |
| video, action_tb, | |
| score_action, score_phys, # <-- two sliders | |
| status, | |
| done_state, progress, order_state, ptr_state, cur_video_id, | |
| reward_code_state, | |
| selected_action, selected_phys, | |
| save_next, | |
| ] | |
| ) | |
| # def save_and_next(participant_id, cur_vid, action_score, phys_score, | |
| # done_cnt, order, ptr, reward_code): | |
| # try: | |
| # if not participant_id or not participant_id.strip(): | |
| # return ( | |
| # gr.update(visible=True), | |
| # gr.update(visible=False), | |
| # gr.update(visible=True, value="❗ Please enter your Participant ID."), | |
| # gr.update(), gr.update(), | |
| # done_cnt, | |
| # _progress_html(done_cnt, TOTAL_PER_PARTICIPANT), | |
| # 5.0, # reset action slider | |
| # 5.0, # reset phys slider ✅ | |
| # ptr, | |
| # cur_vid, | |
| # reward_code, | |
| # gr.update(visible=False), | |
| # gr.update(visible=False), | |
| # gr.update(visible=False), | |
| # None, # ⬅️ selected_action reset | |
| # None, # ⬅️ selected_phys reset | |
| # gr.update(interactive=False, variant="secondary"), # ⬅️ lock button | |
| # ) | |
| # # save both scores | |
| # status_msg = push(participant_id, cur_vid, action_score, phys_score, "") | |
| # new_done = int(done_cnt) + 1 | |
| # if new_done >= TOTAL_PER_PARTICIPANT or ptr >= len(order): | |
| # code = reward_code.strip() or _gen_reward_code(participant_id, length=10) | |
| # try: | |
| # _persist_reward_code(participant_id, code, new_done) | |
| # except Exception: | |
| # pass | |
| # thanks_text = ( | |
| # "## 🎉 Thank you so much!\n" | |
| # "Your responses have been recorded. You may now close this window.\n\n" | |
| # "**Below are your reward code and ID.** Please copy them and submit them to **AMT** to receive your payment." | |
| # ) | |
| # return ( | |
| # gr.update(visible=False), | |
| # gr.update(visible=True), | |
| # status_msg, | |
| # None, | |
| # "", | |
| # TOTAL_PER_PARTICIPANT, | |
| # _progress_html(TOTAL_PER_PARTICIPANT, TOTAL_PER_PARTICIPANT), | |
| # 5.0, # reset action | |
| # 5.0, # reset phys | |
| # len(order), | |
| # cur_vid, | |
| # code, | |
| # gr.update(visible=True, value=thanks_text), | |
| # gr.update(visible=True, value=code), | |
| # gr.update(visible=True, value=participant_id.strip()), | |
| # None, # ⬅️ selected_action reset | |
| # None, # ⬅️ selected_phys reset | |
| # gr.update(interactive=False, variant="secondary"), | |
| # ) | |
| # # next video | |
| # next_idx = order[ptr] | |
| # v = V[next_idx] | |
| # next_vid = _get_video_id(v) | |
| # local_vid_path = get_local_video_path(v["url"]) # 새 비디오 로컬 경로 | |
| # return ( | |
| # gr.update(visible=True), | |
| # gr.update(visible=False), | |
| # status_msg, | |
| # local_vid_path, | |
| # v["url"], | |
| # _extract_action(v), | |
| # new_done, | |
| # _progress_html(new_done, TOTAL_PER_PARTICIPANT), | |
| # 5.0, # reset action | |
| # 5.0, # reset phys | |
| # ptr + 1, | |
| # next_vid, | |
| # reward_code, | |
| # gr.update(visible=False), | |
| # gr.update(visible=False), | |
| # gr.update(visible=False), | |
| # None, # ⬅️ selected_action reset | |
| # None, # ⬅️ selected_phys reset | |
| # gr.update(interactive=False, variant="secondary"), | |
| # ) | |
| # except Exception as e: | |
| # return ( | |
| # gr.update(visible=True), | |
| # gr.update(visible=False), | |
| # gr.update(visible=True, value=f"❌ Error: {type(e).__name__}: {e}"), | |
| # gr.update(), gr.update(), | |
| # done_cnt, | |
| # _progress_html(done_cnt, TOTAL_PER_PARTICIPANT), | |
| # 5.0, | |
| # 5.0, | |
| # ptr, | |
| # cur_vid, | |
| # reward_code, | |
| # gr.update(visible=False), | |
| # gr.update(visible=False), | |
| # gr.update(visible=False), | |
| # None, # ⬅️ reset on error too | |
| # None, | |
| # gr.update(interactive=False, variant="secondary"), | |
| # ) | |
| def save_and_next(participant_id, cur_vid, action_score, phys_score, | |
| done_cnt, order, ptr, reward_code): | |
| try: | |
| # ---------------------------------- | |
| # 0. Participant ID 없을 때 (에러 안내하고 그대로 stay) | |
| # ---------------------------------- | |
| if not participant_id or not participant_id.strip(): | |
| # done_cnt는 아직 state라서 그대로 넘기면 됨 (그건 이미 int일 수도 있고 str일 수도 있는데, | |
| # 그대로 보여주는 용도라 괜찮고, 어차피 progress_html도 그걸로 다시 그림) | |
| return ( | |
| # 1 page_eval | |
| gr.update(visible=True), | |
| # 2 page_thanks | |
| gr.update(visible=False), | |
| # 3 status | |
| gr.update(visible=True, value="❗ Please enter your Participant ID."), | |
| # 4 video (unchanged) | |
| gr.update(), | |
| # 5 action_tb (unchanged) | |
| gr.update(), | |
| # 6 done_state (keep same) | |
| done_cnt, | |
| # 7 progress html | |
| _progress_html(done_cnt, TOTAL_PER_PARTICIPANT), | |
| # 8 score_action reset | |
| 5.0, | |
| # 9 score_phys reset | |
| 5.0, | |
| # 10 ptr_state (unchanged) | |
| ptr, | |
| # 11 cur_video_id (unchanged) | |
| cur_vid, | |
| # 12 reward_code_state (unchanged) | |
| reward_code, | |
| # 13 reward_msg | |
| gr.update(visible=False), | |
| # 14 reward_code_box | |
| gr.update(visible=False), | |
| # 15 reward_pid_box | |
| gr.update(visible=False), | |
| # 16 selected_action reset | |
| None, | |
| # 17 selected_phys reset | |
| None, | |
| # 18 save_next button -> lock again, because not valid | |
| gr.update(interactive=False, variant="secondary"), | |
| ) | |
| # ---------------------------------- | |
| # 1. 정상 저장 | |
| # ---------------------------------- | |
| status_msg = push(participant_id, cur_vid, action_score, phys_score, "") | |
| # done_cnt는 gr.State로 관리 중이었고, 이게 int로 와야 정상. | |
| # 하지만 이전 wiring 문제 때문에 문자열이 올 수도 있었음. 한 번 캐스팅 시도: | |
| try: | |
| cur_done_int = int(done_cnt) | |
| except Exception: | |
| # 만약 str이 오면 그냥 0으로부터 다시 센다 (안전빵 fallback) | |
| cur_done_int = 0 | |
| new_done = cur_done_int + 1 | |
| # ---------------------------------- | |
| # 2. 마지막 비디오 다 끝난 경우 (보상 코드 보여주고 종료 화면) | |
| # ---------------------------------- | |
| if new_done >= TOTAL_PER_PARTICIPANT or ptr >= len(order): | |
| code = reward_code.strip() if isinstance(reward_code, str) and reward_code.strip() else _gen_reward_code(participant_id, length=10) | |
| try: | |
| _persist_reward_code(participant_id, code, new_done) | |
| except Exception: | |
| pass | |
| thanks_text = ( | |
| "## 🎉 Thank you so much!\n" | |
| "Your responses have been recorded. You may now close this window.\n\n" | |
| "**Below are your reward code and ID.** Please copy them and submit them to **AMT** to receive your payment." | |
| ) | |
| return ( | |
| # 1 page_eval -> hide | |
| gr.update(visible=False), | |
| # 2 page_thanks -> show | |
| gr.update(visible=True), | |
| # 3 status | |
| status_msg, | |
| # 4 video -> no next video | |
| None, | |
| # 5 action_tb -> blank | |
| "", | |
| # 6 done_state -> final count | |
| TOTAL_PER_PARTICIPANT, | |
| # 7 progress -> 30/30 full | |
| _progress_html(TOTAL_PER_PARTICIPANT, TOTAL_PER_PARTICIPANT), | |
| # 8 score_action reset | |
| 5.0, | |
| # 9 score_phys reset | |
| 5.0, | |
| # 10 ptr_state -> end ptr | |
| len(order), | |
| # 11 cur_video_id -> keep current or empty | |
| cur_vid, | |
| # 12 reward_code_state -> store code | |
| code, | |
| # 13 reward_msg markdown -> visible with thank you | |
| gr.update(visible=True, value=thanks_text), | |
| # 14 reward_code_box -> visible code | |
| gr.update(visible=True, value=code), | |
| # 15 reward_pid_box -> visible pid | |
| gr.update(visible=True, value=participant_id.strip()), | |
| # 16 selected_action reset | |
| None, | |
| # 17 selected_phys reset | |
| None, | |
| # 18 save_next -> disable | |
| gr.update(interactive=False, variant="secondary"), | |
| ) | |
| # ---------------------------------- | |
| # 3. 아직 끝 안 났으니까 다음 비디오 로드 | |
| # ---------------------------------- | |
| next_idx = order[ptr] | |
| v = V[next_idx] | |
| next_vid_id = _get_video_id(v) | |
| # 로컬 mp4 경로 (mute 사본) | |
| local_vid_path = get_local_video_path(v["url"]) | |
| return ( | |
| # 1 page_eval -> stay visible | |
| gr.update(visible=True), | |
| # 2 page_thanks -> hide | |
| gr.update(visible=False), | |
| # 3 status -> "✅ Saved successfully!" 같은 메시지 | |
| status_msg, | |
| # 4 video -> next local video file path | |
| local_vid_path, | |
| # 5 action_tb -> expected action text | |
| _extract_action(v), | |
| # 6 done_state -> incremented int | |
| new_done, | |
| # 7 progress -> updated bar | |
| _progress_html(new_done, TOTAL_PER_PARTICIPANT), | |
| # 8 score_action reset to mid | |
| 5.0, | |
| # 9 score_phys reset to mid | |
| 5.0, | |
| # 10 ptr_state -> ptr+1 | |
| ptr + 1, | |
| # 11 cur_video_id -> this video's ID/path for logging | |
| next_vid_id, | |
| # 12 reward_code_state -> unchanged so far | |
| reward_code, | |
| # 13 reward_msg -> hide at this stage | |
| gr.update(visible=False), | |
| # 14 reward_code_box -> hide | |
| gr.update(visible=False), | |
| # 15 reward_pid_box -> hide | |
| gr.update(visible=False), | |
| # 16 selected_action reset | |
| None, | |
| # 17 selected_phys reset | |
| None, | |
| # 18 save_next -> disable until sliders moved again | |
| gr.update(interactive=False, variant="secondary"), | |
| ) | |
| except Exception as e: | |
| # 에러 fallback | |
| return ( | |
| # 1 page_eval | |
| gr.update(visible=True), | |
| # 2 page_thanks | |
| gr.update(visible=False), | |
| # 3 status | |
| gr.update(visible=True, value=f"❌ Error: {type(e).__name__}: {e}"), | |
| # 4 video unchanged | |
| gr.update(), | |
| # 5 action_tb unchanged | |
| gr.update(), | |
| # 6 done_state unchanged | |
| done_cnt, | |
| # 7 progress use old done_cnt safely | |
| _progress_html(done_cnt if isinstance(done_cnt, int) else 0, TOTAL_PER_PARTICIPANT), | |
| # 8 score_action reset | |
| 5.0, | |
| # 9 score_phys reset | |
| 5.0, | |
| # 10 ptr_state unchanged | |
| ptr, | |
| # 11 cur_video_id unchanged | |
| cur_vid, | |
| # 12 reward_code_state unchanged | |
| reward_code, | |
| # 13 reward_msg hide | |
| gr.update(visible=False), | |
| # 14 reward_code_box hide | |
| gr.update(visible=False), | |
| # 15 reward_pid_box hide | |
| gr.update(visible=False), | |
| # 16 selected_action reset | |
| None, | |
| # 17 selected_phys reset | |
| None, | |
| # 18 save_next disable | |
| gr.update(interactive=False, variant="secondary"), | |
| ) | |
| save_next.click( | |
| save_and_next, | |
| inputs=[pid, cur_video_id, score_action, score_phys, done_state, order_state, ptr_state, reward_code_state], | |
| outputs=[ | |
| page_eval, page_thanks, | |
| status, video, action_tb, | |
| done_state, progress, score_action, score_phys, | |
| ptr_state, cur_video_id, | |
| reward_code_state, | |
| reward_msg, reward_code_box, reward_pid_box, | |
| selected_action, selected_phys, # ⬅️ 추가 | |
| save_next, # ⬅️ 버튼 상태 갱신 | |
| ] | |
| ) | |
| feedback_submit.click( | |
| push_final_feedback, | |
| # 완료 후 page_thanks에서 보여주는 participant ID 박스를 사용 (값이 채워져 있음) | |
| inputs=[reward_pid_box, feedback_tb], | |
| outputs=feedback_status | |
| ) | |
| if __name__ == "__main__": | |
| # demo.launch() | |
| demo.launch(ssr_mode=False) | |