TheJackBright's picture
Deploy GitHub root master to Space
c296d62
#!/usr/bin/env python3
"""Acceptance checks for required files, artifacts, and submission readiness."""
from __future__ import annotations
import json
import os
from pathlib import Path
import re
import sys
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
REQUIRED_FILES = [
"openenv.yaml",
"__init__.py",
"client.py",
"models.py",
"server/__init__.py",
"server/app.py",
"app/env/env_core.py",
"app/env/fastapi_app.py",
"app/env/client.py",
"app/agents/orchestrator.py",
"app/training/grpo_trl.py",
"app/hf_space/training_runner.py",
"scripts/deploy_training_space.py",
"scripts/pull_training_artifacts.py",
"scripts/generate_hf_training_report.py",
"scripts/train_sft_trl.py",
"scripts/train_grpo_trl.py",
"scripts/evaluate_policy_ablations.py",
"scripts/merge_adapters_safe.py",
"scripts/test_inference_postsave.py",
"scripts/deploy_space.sh",
"scripts/bootstrap_openenv.sh",
"docs/training.md",
"docs/deployment.md",
"docs/evaluation.md",
"docs/submission_checklist.md",
]
REQUIRED_ARTIFACTS = [
"data/processed/normalized_drugs.parquet",
"data/processed/drug_classes.parquet",
"data/processed/interactions.parquet",
"data/processed/burden_rules.yaml",
"data/processed/taper_rules.yaml",
"data/processed/substitution_rules.yaml",
"data/processed/retrieval_corpus.jsonl",
"data/processed/graph_edges.parquet",
"data/processed/patients_synthetic.parquet",
"data/processed/provenance_manifest.json",
"data/processed/feature_dictionary.json",
"data/scenarios/scenarios_easy.jsonl",
"data/scenarios/scenarios_medium.jsonl",
"data/scenarios/scenarios_hard.jsonl",
"outputs/reports/benchmark_report.json",
"outputs/reports/baselines.json",
]
REQUIRED_SUBMISSION_ENV_VARS = [
"POLYGUARD_SUBMISSION_GITHUB_URL",
"POLYGUARD_SUBMISSION_HF_SPACE_URL",
"POLYGUARD_SUBMISSION_COLAB_URL",
"POLYGUARD_SUBMISSION_VIDEO_OR_BLOG_URL",
]
REQUIRED_README_MARKERS = [
"Problem Statement",
"Environment",
"Capabilities",
"Tasks",
"Reward Model / Evaluation Logic",
"Post-Training Strategy",
"GitHub Repo URL",
"HF Space URL",
"Colab Notebook URL",
"YouTube Video URL",
"Hugging Face Blog URL",
]
PLACEHOLDER_PATTERNS = [
"your-username",
"your-hf-username",
"your-colab-id",
"your-video-id",
"your-polyguard-post",
"https://github.com/...",
"https://huggingface.co/spaces/...",
"https://colab.research.google.com/...",
]
REAL_LINK_MARKERS = {
"github": "https://github.com/",
"hf_space": "https://huggingface.co/spaces/",
"colab": "https://colab.research.google.com/",
"youtube": "https://www.youtube.com/",
"hf_blog": "https://huggingface.co/blog/",
}
ACCEPTED_SFT_BACKENDS = {"trl_unsloth", "trl_transformers"}
ACCEPTED_GRPO_BACKENDS = {"trl_unsloth", "trl_transformers", "trl_grpo", "unsloth_grpo"}
REQUIRED_TRACKED_RESULT_ASSETS = [
"docs/results/avg_reward.png",
"docs/results/policy_stack_avg_reward.png",
]
REQUIRED_HF_SWEEP_CHARTS = [
"outputs/plots/sft_vs_grpo_reward.png",
"outputs/plots/sft_loss_curves.png",
"outputs/plots/qwen_model_sft_reward.png",
"outputs/plots/qwen_model_sft_loss.png",
"outputs/plots/sft_validity_reward.png",
"outputs/plots/grpo_reward_curves.png",
"outputs/plots/qwen_model_grpo_reward.png",
"outputs/plots/reward_component_bars.png",
"outputs/plots/anti_cheat_failure_rates.png",
"outputs/plots/train_holdout_gap.png",
"outputs/plots/inference_validity_reward.png",
"outputs/plots/inference_latency_validity.png",
]
URL_RE = re.compile(r"https?://[^\s)]+")
def _missing(root: Path, rel_paths: list[str], require_non_empty: bool = False) -> list[str]:
missing: list[str] = []
for rel in rel_paths:
path = root / rel
if not path.exists():
missing.append(rel)
continue
if require_non_empty and path.is_file() and path.stat().st_size == 0:
missing.append(rel)
return missing
def _readme_checks(root: Path) -> dict[str, list[str]]:
readme = root / "README.md"
if not readme.exists():
return {"missing_markers": REQUIRED_README_MARKERS, "missing_links": ["README.md missing"]}
text = readme.read_text(encoding="utf-8")
missing_markers = [marker for marker in REQUIRED_README_MARKERS if marker not in text]
found_links = URL_RE.findall(text)
missing_links = []
if len(found_links) < 4:
missing_links.append("fewer than 4 URLs found in README")
return {
"missing_markers": missing_markers,
"missing_links": missing_links,
}
def _read_json(root: Path, rel: str) -> dict:
path = root / rel
if not path.exists():
return {}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
return payload if isinstance(payload, dict) else {}
def _readme_text(root: Path) -> str:
path = root / "README.md"
if not path.exists():
return ""
return path.read_text(encoding="utf-8")
def _readme_placeholder_failures(text: str) -> list[str]:
if any(pattern in text for pattern in PLACEHOLDER_PATTERNS):
return ["README placeholder links present"]
return []
def _readme_has_real_submission_links(text: str) -> bool:
required = ["github", "hf_space", "colab"]
story = "youtube" in REAL_LINK_MARKERS and REAL_LINK_MARKERS["youtube"] in text
story = story or REAL_LINK_MARKERS["hf_blog"] in text
return all(REAL_LINK_MARKERS[key] in text for key in required) and story
def _env_link_checks(strict: bool, readme_text: str = "") -> list[str]:
missing: list[str] = []
readme_has_real_links = _readme_has_real_submission_links(readme_text)
for key in REQUIRED_SUBMISSION_ENV_VARS:
value = os.getenv(key, "").strip()
if strict and not readme_has_real_links:
if not value or not value.startswith("http"):
missing.append(key)
return missing
def _strict_training_checks(root: Path) -> list[str]:
failures: list[str] = []
sft = _read_json(root, "outputs/reports/sft_trl_run.json")
sft_status = str(sft.get("status", ""))
sft_backend = str(sft.get("backend", ""))
sft_artifact = str(sft.get("artifact_path", "") or "")
sft_examples = int(sft.get("examples_used", 0) or 0)
if sft_status != "ok":
failures.append("SFT report status is not ok")
if sft_backend not in ACCEPTED_SFT_BACKENDS:
failures.append("SFT report uses fallback backend")
if not sft_artifact:
failures.append("SFT artifact path is empty or missing")
if sft_examples <= 0:
failures.append("SFT report has no training examples")
grpo = _read_json(root, "outputs/reports/grpo_trl_run.json")
grpo_status = str(grpo.get("status", ""))
grpo_backend = str(grpo.get("backend", ""))
grpo_artifact = str(grpo.get("artifact_path", "") or "")
if grpo_status != "ok":
failures.append("GRPO report status is not ok")
if grpo_backend not in ACCEPTED_GRPO_BACKENDS:
failures.append("GRPO report uses fallback backend")
if not grpo_artifact:
failures.append("GRPO artifact path is empty or missing")
postsave = _read_json(root, "outputs/reports/postsave_inference.json")
if str(postsave.get("model_source", "")) == "fallback_policy":
failures.append("post-save inference uses fallback policy")
if postsave.get("model_load_error"):
failures.append("post-save inference has model load error")
improvement = _read_json(root, "outputs/reports/improvement_report.json")
if improvement.get("improved") is not True:
failures.append("improvement report is not positive")
sweep = _read_json(root, "outputs/reports/hf_sweep_summary.json")
anti_hacking = _read_json(root, "outputs/reports/anti_hacking_overfit_report.json")
if not sweep:
failures.append("HF training sweep summary missing")
elif int(sweep.get("completed_models", 0) or 0) <= 0:
failures.append("HF training sweep has no completed models")
else:
sft_only_sweep = str(sweep.get("training_mode") or "full") == "sft-baseline"
for row in sweep.get("models", []):
if not isinstance(row, dict) or row.get("status") != "completed":
continue
label = str(row.get("label") or row.get("model_id") or "model")
if row.get("fallback_detected"):
failures.append(f"HF sweep model {label} used fallback backend")
if not row.get("reward_range_ok", False):
failures.append(f"HF sweep model {label} has reward range failures")
artifact_paths = row.get("artifact_paths", {})
if not isinstance(artifact_paths, dict):
artifact_paths = {}
if not artifact_paths.get("sft"):
failures.append(f"HF sweep model {label} missing SFT artifact")
if not sft_only_sweep and not artifact_paths.get("grpo"):
failures.append(f"HF sweep model {label} missing GRPO artifact")
if anti_hacking.get("passed") is not True:
failures.append("anti-hacking/overfit report is not passing")
return failures
def _strict_asset_checks(root: Path) -> list[str]:
failures: list[str] = []
missing_assets = _missing(root, REQUIRED_TRACKED_RESULT_ASSETS, require_non_empty=True)
if missing_assets:
failures.append("tracked result assets missing")
hf_verify = _read_json(root, "docs/results/hf_space_verification.json")
if hf_verify.get("passed") is not True:
failures.append("HF deployment verification missing")
missing_sweep_charts = _missing(root, REQUIRED_HF_SWEEP_CHARTS, require_non_empty=True)
if missing_sweep_charts:
failures.append("HF sweep charts missing")
return failures
def _strict_submission_checks(root: Path, readme_text: str) -> list[str]:
failures: list[str] = []
failures.extend(_readme_placeholder_failures(readme_text))
if not _readme_has_real_submission_links(readme_text):
failures.append("README real submission links missing")
failures.extend(_strict_training_checks(root))
failures.extend(_strict_asset_checks(root))
return list(dict.fromkeys(failures))
def run_checks(root: Path = ROOT, strict_submission_links: bool = False) -> dict[str, object]:
missing_files = _missing(root, REQUIRED_FILES)
missing_artifacts = _missing(root, REQUIRED_ARTIFACTS, require_non_empty=True)
readme_check = _readme_checks(root)
readme_text = _readme_text(root)
missing_submission_env = _env_link_checks(strict=strict_submission_links, readme_text=readme_text)
strict_submission_failures = (
_strict_submission_checks(root, readme_text=readme_text) if strict_submission_links else []
)
summary: dict[str, object] = {
"missing_files": missing_files,
"missing_artifacts": missing_artifacts,
"missing_readme_markers": readme_check["missing_markers"],
"missing_readme_links": readme_check["missing_links"],
"strict_submission_links": strict_submission_links,
"missing_submission_env": missing_submission_env,
"strict_submission_failures": strict_submission_failures,
"submission_ready": False,
"status": "ok",
}
has_failures = bool(
missing_files
or missing_artifacts
or readme_check["missing_markers"]
or readme_check["missing_links"]
or missing_submission_env
or strict_submission_failures
)
summary["submission_ready"] = strict_submission_links and not has_failures
if has_failures:
summary["status"] = "fail"
return summary
def main() -> None:
root = Path(__file__).resolve().parents[1]
strict_submission_links = os.getenv("POLYGUARD_ENFORCE_SUBMISSION_LINKS", "false").lower() in {
"1",
"true",
"yes",
"on",
}
summary = run_checks(root=root, strict_submission_links=strict_submission_links)
out = root / "outputs" / "reports" / "acceptance_gate.json"
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(summary, ensure_ascii=True, indent=2), encoding="utf-8")
if summary["status"] == "fail":
raise SystemExit(f"acceptance_gate_failed: {summary}")
print("acceptance_gate_ok")
if __name__ == "__main__":
main()