Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import shutil | |
| import zipfile | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from uuid import uuid4 | |
| import gradio as gr | |
| from pipeline import run_pipeline | |
| DEFAULT_CONFIG: Dict[str, Any] = { | |
| "model": os.getenv("OPENAI_MODEL", "gpt-4o-mini"), | |
| "rewrite": False, | |
| "projects": [{"name": "STANDARD", "description": "Generic scoring"}], | |
| # OCR knobs | |
| "ocr_max_pages": 8, | |
| "ocr_dpi": 200, | |
| # Reporting knobs | |
| "top_n": 25, | |
| "bucket_thresholds": { | |
| "top": 8.0, | |
| "strong": 6.5, | |
| "maybe": 5.0 | |
| }, | |
| } | |
| TMP_ROOT = Path("/tmp/resume_evaluator").resolve() | |
| UPLOAD_DIR = TMP_ROOT / "input_uploads" | |
| OUTPUT_ROOT = TMP_ROOT / "output_root" # persistent across runs for dedupe manifest | |
| RESULTS_ZIP = TMP_ROOT / "results.zip" | |
| def _ensure_dirs() -> None: | |
| TMP_ROOT.mkdir(parents=True, exist_ok=True) | |
| UPLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
| OUTPUT_ROOT.mkdir(parents=True, exist_ok=True) | |
| def _clean_upload_staging_only() -> None: | |
| _ensure_dirs() | |
| if UPLOAD_DIR.exists(): | |
| shutil.rmtree(UPLOAD_DIR, ignore_errors=True) | |
| UPLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
| def _parse_config(config_text: str) -> Dict[str, Any]: | |
| text = (config_text or "").strip() | |
| if not text: | |
| return dict(DEFAULT_CONFIG) | |
| try: | |
| cfg = json.loads(text) | |
| except json.JSONDecodeError as e: | |
| raise ValueError(f"Config JSON is invalid: {e}") from e | |
| if not isinstance(cfg, dict): | |
| raise ValueError("Config JSON must be an object (dict).") | |
| merged = dict(DEFAULT_CONFIG) | |
| merged.update(cfg) | |
| return merged | |
| def _stage_inputs(uploaded_files: Optional[List[str]]) -> List[str]: | |
| _clean_upload_staging_only() | |
| staged: List[str] = [] | |
| uploaded_files = uploaded_files or [] | |
| for f in uploaded_files: | |
| src = Path(f) | |
| if not src.exists(): | |
| continue | |
| if src.suffix.lower() == ".zip": | |
| with zipfile.ZipFile(src, "r") as z: | |
| z.extractall(UPLOAD_DIR) | |
| staged.extend([str(p.resolve()) for p in sorted(UPLOAD_DIR.rglob("*.pdf"))]) | |
| continue | |
| if src.suffix.lower() == ".pdf": | |
| dst = UPLOAD_DIR / src.name | |
| shutil.copy2(src, dst) | |
| staged.append(str(dst.resolve())) | |
| # dedupe while preserving order | |
| seen = set() | |
| out: List[str] = [] | |
| for p in staged: | |
| if p not in seen: | |
| seen.add(p) | |
| out.append(p) | |
| return out | |
| def _zip_dir(src_dir: Path, zip_path: Path) -> None: | |
| if zip_path.exists(): | |
| zip_path.unlink() | |
| with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z: | |
| for p in sorted(src_dir.rglob("*")): | |
| if p.is_file(): | |
| z.write(p, arcname=str(p.relative_to(src_dir))) | |
| def process(uploaded_files: Optional[List[str]], config_text: str) -> Tuple[str, Optional[str]]: | |
| _ensure_dirs() | |
| cfg = _parse_config(config_text) | |
| pdf_paths = _stage_inputs(uploaded_files) | |
| if not pdf_paths: | |
| return ("No PDFs found. Upload PDFs or a ZIP containing PDFs.", None) | |
| job_id = uuid4().hex[:10] # short stable id | |
| try: | |
| result = run_pipeline( | |
| input_files=pdf_paths, | |
| config=cfg, | |
| output_root=str(OUTPUT_ROOT), | |
| job_id=job_id, | |
| ) | |
| except Exception as e: | |
| return (f"Pipeline failed: {type(e).__name__}: {e}", None) | |
| job_dir = Path(result["job_dir"]) | |
| zip_path = Path(result["zip_path"]) | |
| # (Optional) also drop a copy at /tmp/results.zip for Gradio download stability | |
| try: | |
| if RESULTS_ZIP.exists(): | |
| RESULTS_ZIP.unlink() | |
| shutil.copy2(zip_path, RESULTS_ZIP) | |
| except Exception: | |
| # not fatal | |
| pass | |
| counts = result.get("counts", {}) | |
| status = ( | |
| f"job_id={job_id} | " | |
| f"total={counts.get('total', 0)} " | |
| f"success={counts.get('success', 0)} " | |
| f"skipped={counts.get('skipped', 0)} " | |
| f"failed={counts.get('failed', 0)}" | |
| ) | |
| return (status, str(RESULTS_ZIP if RESULTS_ZIP.exists() else zip_path)) | |
| def build_ui() -> gr.Blocks: | |
| with gr.Blocks(title="Resume Evaluator") as demo: | |
| gr.Markdown("# Resume Evaluator") | |
| files = gr.File( | |
| label="Upload PDF(s) or a ZIP", | |
| file_count="multiple", | |
| type="filepath", | |
| ) | |
| config = gr.Code( | |
| label="Config JSON (optional)", | |
| language="json", | |
| value=json.dumps(DEFAULT_CONFIG, indent=2), | |
| ) | |
| btn = gr.Button("Process", variant="primary") | |
| status = gr.Textbox(label="Status", interactive=False) | |
| results = gr.File(label="Download Results ZIP", interactive=False) | |
| btn.click(fn=process, inputs=[files, config], outputs=[status, results]) | |
| return demo | |
| demo = build_ui() | |
| if __name__ == "__main__": | |
| # Keep SSR off; it’s still noisy in HF | |
| demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False) |