Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import os | |
| from contextlib import asynccontextmanager | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, Literal | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.responses import JSONResponse | |
| from slop_farmer.config import PrSearchRefreshOptions | |
| from slop_farmer.data.ghreplica_api import GhReplicaProbeUnavailableError, GhrProbeClient | |
| from slop_farmer.data.snapshot_materialize import materialize_hf_dataset_snapshot | |
| from slop_farmer.data.snapshot_paths import ( | |
| CURRENT_ANALYSIS_MANIFEST_PATH, | |
| default_hf_materialize_dir, | |
| ) | |
| from slop_farmer.reports.analysis_service import ( | |
| get_analysis_best, | |
| get_analysis_meta_bug, | |
| get_analysis_status, | |
| get_pr_analysis, | |
| list_analysis_duplicate_prs, | |
| list_analysis_meta_bugs, | |
| ) | |
| from slop_farmer.reports.pr_search_service import ( | |
| get_pr_search_cluster, | |
| get_pr_search_clusters, | |
| get_pr_search_contributor_pulls, | |
| get_pr_search_pull_contributor, | |
| get_pr_search_similar_lookup, | |
| get_pr_search_status, | |
| list_pr_search_clusters, | |
| run_pr_search_refresh, | |
| ) | |
| from slop_farmer.reports.read_views import ( | |
| check_issue_cluster_membership, | |
| get_contributor, | |
| get_contributor_risk, | |
| get_contributor_status, | |
| get_issue_best, | |
| get_issue_cluster, | |
| get_issue_cluster_status, | |
| get_issue_clusters_for_pr, | |
| get_snapshot_surfaces, | |
| list_contributors, | |
| list_issue_clusters, | |
| list_issue_duplicate_prs, | |
| ) | |
| class PrSearchApiSettings: | |
| default_repo: str | None | |
| index_path: Path | |
| output_dir: Path | |
| snapshot_dir: Path | None = None | |
| hf_repo_id: str | None = None | |
| hf_revision: str | None = None | |
| hf_materialize_dir: Path | None = None | |
| ghr_base_url: str | None = None | |
| http_timeout: int = 180 | |
| http_max_retries: int = 5 | |
| refresh_if_missing: bool = False | |
| rebuild_on_start: bool = False | |
| include_drafts: bool = False | |
| include_closed: bool = False | |
| similar_limit_default: int = 10 | |
| similar_limit_max: int = 50 | |
| candidate_limit_default: int = 5 | |
| candidate_limit_max: int = 20 | |
| cluster_list_limit_default: int = 50 | |
| cluster_list_limit_max: int = 200 | |
| issue_list_limit_default: int = 50 | |
| issue_list_limit_max: int = 200 | |
| contributor_list_limit_default: int = 50 | |
| contributor_list_limit_max: int = 200 | |
| probe_limit_default: int = 10 | |
| probe_limit_max: int = 25 | |
| def from_env(cls) -> PrSearchApiSettings: | |
| output_dir = Path(os.environ.get("OUTPUT_DIR", "data")).resolve() | |
| index_path = Path( | |
| os.environ.get("INDEX_PATH", str(output_dir / "state" / "pr-search.duckdb")) | |
| ).resolve() | |
| snapshot_dir = _env_path("SNAPSHOT_DIR") | |
| hf_materialize_dir = _env_path("HF_MATERIALIZE_DIR") | |
| return cls( | |
| default_repo=os.environ.get("DEFAULT_REPO"), | |
| index_path=index_path, | |
| output_dir=output_dir, | |
| snapshot_dir=snapshot_dir, | |
| hf_repo_id=os.environ.get("HF_REPO_ID"), | |
| hf_revision=os.environ.get("HF_REVISION"), | |
| hf_materialize_dir=hf_materialize_dir, | |
| ghr_base_url=os.environ.get("GHR_BASE_URL"), | |
| http_timeout=_env_int("HTTP_TIMEOUT", 180), | |
| http_max_retries=_env_int("HTTP_MAX_RETRIES", 5), | |
| refresh_if_missing=_env_bool("REFRESH_IF_MISSING", False), | |
| rebuild_on_start=_env_bool("REBUILD_ON_START", False), | |
| include_drafts=_env_bool("INCLUDE_DRAFTS", False), | |
| include_closed=_env_bool("INCLUDE_CLOSED", False), | |
| similar_limit_default=_env_int("SIMILAR_LIMIT_DEFAULT", 10), | |
| similar_limit_max=_env_int("SIMILAR_LIMIT_MAX", 50), | |
| candidate_limit_default=_env_int("CANDIDATE_LIMIT_DEFAULT", 5), | |
| candidate_limit_max=_env_int("CANDIDATE_LIMIT_MAX", 20), | |
| cluster_list_limit_default=_env_int("CLUSTER_LIST_LIMIT_DEFAULT", 50), | |
| cluster_list_limit_max=_env_int("CLUSTER_LIST_LIMIT_MAX", 200), | |
| issue_list_limit_default=_env_int("ISSUE_LIST_LIMIT_DEFAULT", 50), | |
| issue_list_limit_max=_env_int("ISSUE_LIST_LIMIT_MAX", 200), | |
| contributor_list_limit_default=_env_int("CONTRIBUTOR_LIST_LIMIT_DEFAULT", 50), | |
| contributor_list_limit_max=_env_int("CONTRIBUTOR_LIST_LIMIT_MAX", 200), | |
| probe_limit_default=_env_int("PROBE_LIMIT_DEFAULT", 10), | |
| probe_limit_max=_env_int("PROBE_LIMIT_MAX", 25), | |
| ) | |
| def create_app(settings: PrSearchApiSettings | None = None) -> FastAPI: | |
| api_settings = settings or PrSearchApiSettings.from_env() | |
| async def lifespan(app: FastAPI): | |
| app.state.settings = api_settings | |
| app.state.ready = False | |
| app.state.startup_error = None | |
| try: | |
| _bootstrap_snapshot_assets(api_settings) | |
| _bootstrap_index(api_settings) | |
| app.state.ready = _is_ready(api_settings) | |
| except Exception as exc: | |
| app.state.startup_error = str(exc) | |
| yield | |
| app = FastAPI(title="slop PR search API", version="0.1.1", lifespan=lifespan) | |
| async def handle_value_error(_request: Request, exc: ValueError) -> JSONResponse: | |
| status_code = 404 if _looks_not_found(exc) else 400 | |
| return JSONResponse({"detail": str(exc)}, status_code=status_code) | |
| async def handle_probe_unavailable( | |
| _request: Request, exc: GhReplicaProbeUnavailableError | |
| ) -> JSONResponse: | |
| return JSONResponse({"detail": str(exc)}, status_code=exc.status_code) | |
| async def healthz() -> dict[str, bool]: | |
| return {"ok": True} | |
| async def readyz(request: Request) -> JSONResponse: | |
| settings = request.app.state.settings | |
| error = request.app.state.startup_error | |
| ready = request.app.state.ready and _is_ready(settings) | |
| if ready: | |
| return JSONResponse({"ok": True}) | |
| detail = error or _readiness_detail(settings) | |
| return JSONResponse({"ok": False, "detail": detail}, status_code=503) | |
| async def repo_status(owner: str, repo: str, request: Request) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| status = get_pr_search_status(settings.index_path, repo=repo_slug) | |
| issue_snapshot_dir = _surface_snapshot_dir(settings, repo_slug, surface="issues") | |
| contributor_snapshot_dir = _surface_snapshot_dir( | |
| settings, repo_slug, surface="contributors" | |
| ) | |
| return { | |
| **status, | |
| "surfaces": { | |
| "issues": get_snapshot_surfaces(issue_snapshot_dir)["issues"], | |
| "contributors": get_snapshot_surfaces(contributor_snapshot_dir)["contributors"], | |
| }, | |
| } | |
| async def pr_similar( | |
| owner: str, | |
| repo: str, | |
| number: int, | |
| request: Request, | |
| limit: int | None = None, | |
| mode: Literal["auto", "indexed", "live"] = "auto", | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_pr_search_similar_lookup( | |
| settings.index_path, | |
| repo=repo_slug, | |
| pr_number=number, | |
| limit=_limit( | |
| limit, default=settings.similar_limit_default, maximum=settings.similar_limit_max | |
| ), | |
| mode=mode, | |
| client=_probe_client(settings), | |
| ) | |
| async def pr_clusters( | |
| owner: str, | |
| repo: str, | |
| number: int, | |
| request: Request, | |
| limit: int | None = None, | |
| mode: Literal["auto", "indexed", "live"] = "auto", | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_pr_search_clusters( | |
| settings.index_path, | |
| repo=repo_slug, | |
| pr_number=number, | |
| limit=_limit( | |
| limit, | |
| default=settings.candidate_limit_default, | |
| maximum=settings.candidate_limit_max, | |
| ), | |
| mode=mode, | |
| client=_probe_client(settings), | |
| ) | |
| async def cluster_view( | |
| owner: str, | |
| repo: str, | |
| cluster_id: str, | |
| request: Request, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_pr_search_cluster(settings.index_path, repo=repo_slug, cluster_id=cluster_id) | |
| async def cluster_list( | |
| owner: str, | |
| repo: str, | |
| request: Request, | |
| limit: int | None = None, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return list_pr_search_clusters( | |
| settings.index_path, | |
| repo=repo_slug, | |
| limit=_limit( | |
| limit, | |
| default=settings.cluster_list_limit_default, | |
| maximum=settings.cluster_list_limit_max, | |
| ), | |
| ) | |
| async def contributor_pulls( | |
| owner: str, | |
| repo: str, | |
| login: str, | |
| request: Request, | |
| limit: int | None = None, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_pr_search_contributor_pulls( | |
| settings.index_path, | |
| repo=repo_slug, | |
| author_login=login, | |
| limit=_limit( | |
| limit, default=settings.similar_limit_default, maximum=settings.similar_limit_max | |
| ), | |
| ) | |
| async def pull_contributor( | |
| owner: str, | |
| repo: str, | |
| number: int, | |
| request: Request, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_pr_search_pull_contributor(settings.index_path, repo=repo_slug, pr_number=number) | |
| async def analysis_status( | |
| owner: str, | |
| repo: str, | |
| request: Request, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| snapshot_id: str | None = None, | |
| analysis_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_analysis_status( | |
| settings.index_path, | |
| repo=repo_slug, | |
| variant=variant, | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| ) | |
| async def pr_analysis( | |
| owner: str, | |
| repo: str, | |
| number: int, | |
| request: Request, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| snapshot_id: str | None = None, | |
| analysis_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_pr_analysis( | |
| settings.index_path, | |
| repo=repo_slug, | |
| pr_number=number, | |
| variant=variant, | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| ) | |
| async def analysis_meta_bugs( | |
| owner: str, | |
| repo: str, | |
| request: Request, | |
| limit: int | None = None, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| snapshot_id: str | None = None, | |
| analysis_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return list_analysis_meta_bugs( | |
| settings.index_path, | |
| repo=repo_slug, | |
| variant=variant, | |
| limit=_limit( | |
| limit, | |
| default=settings.cluster_list_limit_default, | |
| maximum=settings.cluster_list_limit_max, | |
| ), | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| ) | |
| async def analysis_meta_bug( | |
| owner: str, | |
| repo: str, | |
| cluster_id: str, | |
| request: Request, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| snapshot_id: str | None = None, | |
| analysis_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_analysis_meta_bug( | |
| settings.index_path, | |
| repo=repo_slug, | |
| cluster_id=cluster_id, | |
| variant=variant, | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| ) | |
| async def analysis_duplicate_prs( | |
| owner: str, | |
| repo: str, | |
| request: Request, | |
| limit: int | None = None, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| snapshot_id: str | None = None, | |
| analysis_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return list_analysis_duplicate_prs( | |
| settings.index_path, | |
| repo=repo_slug, | |
| variant=variant, | |
| limit=_limit( | |
| limit, | |
| default=settings.cluster_list_limit_default, | |
| maximum=settings.cluster_list_limit_max, | |
| ), | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| ) | |
| async def analysis_best( | |
| owner: str, | |
| repo: str, | |
| request: Request, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| snapshot_id: str | None = None, | |
| analysis_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_analysis_best( | |
| settings.index_path, | |
| repo=repo_slug, | |
| variant=variant, | |
| snapshot_id=snapshot_id, | |
| analysis_id=analysis_id, | |
| ) | |
| async def issue_status( | |
| owner: str, | |
| repo: str, | |
| request: Request, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_issue_cluster_status( | |
| _surface_snapshot_dir(settings, repo_slug, surface="issues"), | |
| variant=variant, | |
| ) | |
| async def issue_clusters( | |
| owner: str, | |
| repo: str, | |
| request: Request, | |
| limit: int | None = None, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return list_issue_clusters( | |
| _surface_snapshot_dir(settings, repo_slug, surface="issues"), | |
| limit=_limit( | |
| limit, | |
| default=settings.issue_list_limit_default, | |
| maximum=settings.issue_list_limit_max, | |
| ), | |
| variant=variant, | |
| ) | |
| async def issue_cluster( | |
| owner: str, | |
| repo: str, | |
| cluster_id: str, | |
| request: Request, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_issue_cluster( | |
| _surface_snapshot_dir(settings, repo_slug, surface="issues"), | |
| cluster_id=cluster_id, | |
| variant=variant, | |
| ) | |
| async def issue_clusters_for_pr( | |
| owner: str, | |
| repo: str, | |
| number: int, | |
| request: Request, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_issue_clusters_for_pr( | |
| _surface_snapshot_dir(settings, repo_slug, surface="issues"), | |
| pr_number=number, | |
| variant=variant, | |
| ) | |
| async def issue_membership_for_pr( | |
| owner: str, | |
| repo: str, | |
| number: int, | |
| request: Request, | |
| cluster_id: str | None = None, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return check_issue_cluster_membership( | |
| _surface_snapshot_dir(settings, repo_slug, surface="issues"), | |
| pr_number=number, | |
| cluster_id=cluster_id, | |
| variant=variant, | |
| ) | |
| async def issue_duplicate_prs( | |
| owner: str, | |
| repo: str, | |
| request: Request, | |
| limit: int | None = None, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return list_issue_duplicate_prs( | |
| _surface_snapshot_dir(settings, repo_slug, surface="issues"), | |
| limit=_limit( | |
| limit, | |
| default=settings.issue_list_limit_default, | |
| maximum=settings.issue_list_limit_max, | |
| ), | |
| variant=variant, | |
| ) | |
| async def issue_best( | |
| owner: str, | |
| repo: str, | |
| request: Request, | |
| variant: Literal["auto", "hybrid", "deterministic"] = "auto", | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_issue_best( | |
| _surface_snapshot_dir(settings, repo_slug, surface="issues"), | |
| variant=variant, | |
| ) | |
| async def contributor_status( | |
| owner: str, | |
| repo: str, | |
| request: Request, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_contributor_status( | |
| _surface_snapshot_dir(settings, repo_slug, surface="contributors") | |
| ) | |
| async def contributors( | |
| owner: str, | |
| repo: str, | |
| request: Request, | |
| limit: int | None = None, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return list_contributors( | |
| _surface_snapshot_dir(settings, repo_slug, surface="contributors"), | |
| limit=_limit( | |
| limit, | |
| default=settings.contributor_list_limit_default, | |
| maximum=settings.contributor_list_limit_max, | |
| ), | |
| ) | |
| async def contributor( | |
| owner: str, | |
| repo: str, | |
| login: str, | |
| request: Request, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_contributor( | |
| _surface_snapshot_dir(settings, repo_slug, surface="contributors"), | |
| author_login=login, | |
| ) | |
| async def contributor_risk( | |
| owner: str, | |
| repo: str, | |
| login: str, | |
| request: Request, | |
| ) -> dict[str, Any]: | |
| settings = request.app.state.settings | |
| repo_slug = _repo_slug(settings, owner, repo) | |
| return get_contributor_risk( | |
| _surface_snapshot_dir(settings, repo_slug, surface="contributors"), | |
| author_login=login, | |
| ) | |
| return app | |
| def _bootstrap_index(settings: PrSearchApiSettings) -> None: | |
| settings.output_dir.mkdir(parents=True, exist_ok=True) | |
| settings.index_path.parent.mkdir(parents=True, exist_ok=True) | |
| if not _needs_refresh(settings): | |
| return | |
| if settings.snapshot_dir is None and settings.hf_repo_id is None: | |
| return | |
| run_pr_search_refresh( | |
| PrSearchRefreshOptions( | |
| snapshot_dir=settings.snapshot_dir, | |
| output_dir=settings.output_dir, | |
| db=settings.index_path, | |
| hf_repo_id=settings.hf_repo_id, | |
| hf_revision=settings.hf_revision, | |
| hf_materialize_dir=settings.hf_materialize_dir, | |
| include_drafts=settings.include_drafts, | |
| include_closed=settings.include_closed, | |
| ) | |
| ) | |
| def _bootstrap_snapshot_assets(settings: PrSearchApiSettings) -> None: | |
| if settings.snapshot_dir is not None or settings.hf_repo_id is None: | |
| return | |
| materialize_dir = settings.hf_materialize_dir or default_hf_materialize_dir( | |
| settings.output_dir, | |
| settings.hf_repo_id, | |
| settings.hf_revision, | |
| ) | |
| materialize_hf_dataset_snapshot( | |
| repo_id=settings.hf_repo_id, | |
| local_dir=materialize_dir, | |
| revision=settings.hf_revision, | |
| ) | |
| def _needs_refresh(settings: PrSearchApiSettings) -> bool: | |
| if settings.rebuild_on_start: | |
| return True | |
| if not settings.refresh_if_missing: | |
| return False | |
| return not _is_ready(settings) | |
| def _is_ready(settings: PrSearchApiSettings) -> bool: | |
| if not settings.index_path.exists(): | |
| return False | |
| try: | |
| get_pr_search_status(settings.index_path, repo=settings.default_repo) | |
| except Exception: | |
| return False | |
| return True | |
| def _readiness_detail(settings: PrSearchApiSettings) -> str: | |
| if not settings.index_path.exists(): | |
| return f"index not found at {settings.index_path}" | |
| try: | |
| get_pr_search_status(settings.index_path, repo=settings.default_repo) | |
| except Exception as exc: | |
| return str(exc) | |
| return "ready" | |
| def _repo_slug(settings: PrSearchApiSettings, owner: str, repo: str) -> str: | |
| repo_slug = f"{owner}/{repo}" | |
| if settings.default_repo and repo_slug != settings.default_repo: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"repo {settings.default_repo} is the only configured repo for this deployment", | |
| ) | |
| return repo_slug | |
| def _active_snapshot_dir(settings: PrSearchApiSettings, repo_slug: str) -> Path: | |
| return _status_snapshot_dir(get_pr_search_status(settings.index_path, repo=repo_slug)) | |
| def _surface_snapshot_dir( | |
| settings: PrSearchApiSettings, | |
| repo_slug: str, | |
| *, | |
| surface: Literal["issues", "contributors"], | |
| ) -> Path: | |
| active_snapshot_dir = _active_snapshot_dir(settings, repo_slug) | |
| if _surface_available(active_snapshot_dir, surface=surface): | |
| return active_snapshot_dir | |
| materialized_snapshot_dir = _materialized_snapshot_dir(settings) | |
| if materialized_snapshot_dir is not None and _surface_available( | |
| materialized_snapshot_dir, surface=surface | |
| ): | |
| return materialized_snapshot_dir | |
| return active_snapshot_dir | |
| def _status_snapshot_dir(status: dict[str, Any]) -> Path: | |
| snapshot_dir = status.get("snapshot_dir") | |
| if not snapshot_dir: | |
| raise HTTPException(status_code=503, detail="active snapshot directory is unavailable") | |
| return Path(str(snapshot_dir)) | |
| def _materialized_snapshot_dir(settings: PrSearchApiSettings) -> Path | None: | |
| if settings.hf_repo_id is None: | |
| return None | |
| return settings.hf_materialize_dir or default_hf_materialize_dir( | |
| settings.output_dir, | |
| settings.hf_repo_id, | |
| settings.hf_revision, | |
| ) | |
| def _surface_available(snapshot_dir: Path, *, surface: Literal["issues", "contributors"]) -> bool: | |
| if not snapshot_dir.exists(): | |
| return False | |
| if surface == "issues": | |
| return (snapshot_dir / CURRENT_ANALYSIS_MANIFEST_PATH).exists() or any( | |
| snapshot_dir.glob("analysis-report*.json") | |
| ) | |
| return (snapshot_dir / "new-contributors-report.json").exists() | |
| def _limit(value: int | None, *, default: int, maximum: int) -> int: | |
| limit = default if value is None else value | |
| if limit < 1: | |
| raise HTTPException(status_code=400, detail="limit must be at least 1") | |
| if limit > maximum: | |
| raise HTTPException(status_code=400, detail=f"limit must be at most {maximum}") | |
| return limit | |
| def _probe_client(settings: PrSearchApiSettings) -> Any: | |
| if not settings.ghr_base_url: | |
| return None | |
| return GhrProbeClient( | |
| base_url=settings.ghr_base_url, | |
| timeout=settings.http_timeout, | |
| max_retries=settings.http_max_retries, | |
| ) | |
| def _looks_not_found(exc: ValueError) -> bool: | |
| message = str(exc).lower() | |
| return ( | |
| "not found" in message | |
| or "analysis report was not found" in message | |
| or "no analysis report was found" in message | |
| or "published analysis" in message | |
| or "materialized snapshot" in message | |
| or "no active pr search run" in message | |
| or "was not found in the active indexed universe" in message | |
| ) | |
| def _env_bool(name: str, default: bool) -> bool: | |
| raw = os.environ.get(name) | |
| if raw is None: | |
| return default | |
| return raw.strip().lower() in {"1", "true", "yes", "on"} | |
| def _env_int(name: str, default: int) -> int: | |
| raw = os.environ.get(name) | |
| return default if raw is None else int(raw) | |
| def _env_path(name: str) -> Path | None: | |
| raw = os.environ.get(name) | |
| return None if raw is None else Path(raw).resolve() | |
| app = create_app() | |