Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| from collections.abc import Iterable, Mapping, Sequence | |
| from contextlib import suppress | |
| from pathlib import Path | |
| from typing import Any, Protocol | |
| from uuid import uuid4 | |
| from slop_farmer.config import PrSearchRefreshOptions, RepoRef, resolve_github_token | |
| from slop_farmer.data.github_api import GitHubClient | |
| from slop_farmer.data.normalize import normalize_pr_file, normalize_pull_request | |
| from slop_farmer.data.search_duckdb import ( | |
| connect_pr_search_db, | |
| fetch_rows, | |
| get_candidate_cluster_rows, | |
| get_cluster, | |
| get_cluster_ids_for_prs, | |
| get_cluster_members, | |
| get_contributor, | |
| get_contributor_pulls, | |
| get_document, | |
| get_feature, | |
| get_pair_neighbor_row, | |
| get_run_counts, | |
| get_scope_run_artifact, | |
| get_shared_cluster_ids, | |
| get_similar_pr_rows, | |
| insert_rows, | |
| replace_active_run, | |
| resolve_active_run, | |
| update_run_status, | |
| ) | |
| from slop_farmer.reports.pr_scope import PrScopeClusterOptions | |
| from slop_farmer.reports.pr_search_scope import ( | |
| build_pr_scope_search_artifacts, | |
| build_scope_feature_for_pull_request, | |
| build_scope_feature_idf_for_indexed_documents, | |
| iso_timestamp, | |
| load_pr_search_snapshot, | |
| rank_scope_cluster_candidates, | |
| rank_scope_feature_matches, | |
| resolve_pr_search_snapshot_dir, | |
| scope_feature_pair_explanation, | |
| scope_options_from_settings, | |
| ) | |
| class ProbeClientLike(Protocol): | |
| def get_pull_request(self, owner: str, repo: str, number: int) -> dict[str, Any]: ... | |
| def iter_pull_files(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]: ... | |
| def run_pr_search_refresh(options: PrSearchRefreshOptions) -> dict[str, Any]: | |
| snapshot_dir = resolve_pr_search_snapshot_dir(options) | |
| snapshot = load_pr_search_snapshot(snapshot_dir) | |
| repo = str(snapshot["repo"]) | |
| db_path = resolve_pr_search_db_path(options.db, output_dir=options.output_dir) | |
| started_at = iso_timestamp() | |
| scope_options = PrScopeClusterOptions( | |
| include_closed=options.include_closed, | |
| include_drafts=options.include_drafts, | |
| ) | |
| artifacts = build_pr_scope_search_artifacts( | |
| snapshot["pull_requests"], | |
| snapshot["pr_files"], | |
| options=scope_options, | |
| suppression_rules=options.cluster_suppression_rules, | |
| limit_prs=options.limit_prs, | |
| ) | |
| run_id = uuid4().hex | |
| source_type = "hf_dataset_repo" if options.hf_repo_id else "local_snapshot" | |
| connection = connect_pr_search_db(db_path) | |
| try: | |
| insert_rows( | |
| connection, | |
| "pr_search_runs", | |
| [ | |
| { | |
| "id": run_id, | |
| "repo": repo, | |
| "snapshot_id": snapshot["snapshot_id"], | |
| "snapshot_dir": str(snapshot_dir), | |
| "source_type": source_type, | |
| "hf_repo_id": options.hf_repo_id, | |
| "hf_revision": options.hf_revision, | |
| "started_at": started_at, | |
| "finished_at": None, | |
| "status": "running", | |
| "settings_json": artifacts["settings_json"], | |
| "notes": None, | |
| } | |
| ], | |
| ) | |
| connection.execute("BEGIN") | |
| created_at = iso_timestamp() | |
| insert_rows( | |
| connection, | |
| "pr_search_documents", | |
| _scoped_rows(artifacts["documents"], run_id=run_id, repo=repo), | |
| ) | |
| insert_rows( | |
| connection, | |
| "pr_search_contributors", | |
| _contributor_rows( | |
| snapshot["contributors"], | |
| run_id=run_id, | |
| repo=repo, | |
| snapshot_id=str(snapshot["snapshot_id"]), | |
| ), | |
| ) | |
| insert_rows( | |
| connection, | |
| "pr_scope_features", | |
| _scoped_rows( | |
| artifacts["features"], | |
| run_id=run_id, | |
| repo=repo, | |
| computed_at=created_at, | |
| ), | |
| ) | |
| insert_rows( | |
| connection, | |
| "pr_scope_run_artifacts", | |
| _scoped_rows( | |
| [artifacts["run_artifact"]], | |
| run_id=run_id, | |
| repo=repo, | |
| computed_at=created_at, | |
| ), | |
| ) | |
| insert_rows( | |
| connection, | |
| "pr_scope_neighbors", | |
| _scoped_rows( | |
| artifacts["neighbors"], | |
| run_id=run_id, | |
| repo=repo, | |
| created_at=created_at, | |
| ), | |
| ) | |
| insert_rows( | |
| connection, | |
| "pr_scope_clusters", | |
| _scoped_rows( | |
| artifacts["clusters"], | |
| run_id=run_id, | |
| repo=repo, | |
| created_at=created_at, | |
| ), | |
| ) | |
| insert_rows( | |
| connection, | |
| "pr_scope_cluster_members", | |
| _scoped_rows(artifacts["cluster_members"], run_id=run_id, repo=repo), | |
| ) | |
| insert_rows( | |
| connection, | |
| "pr_scope_cluster_candidates", | |
| _scoped_rows(artifacts["cluster_candidates"], run_id=run_id, repo=repo), | |
| ) | |
| finished_at = iso_timestamp() | |
| update_run_status( | |
| connection, | |
| run_id=run_id, | |
| status="succeeded", | |
| finished_at=finished_at, | |
| ) | |
| if options.replace_active: | |
| replace_active_run( | |
| connection, | |
| repo=repo, | |
| run_id=run_id, | |
| activated_at=finished_at, | |
| ) | |
| connection.execute("COMMIT") | |
| counts = get_run_counts(connection, run_id=run_id) | |
| return { | |
| "db_path": str(db_path), | |
| "run_id": run_id, | |
| "repo": repo, | |
| "snapshot_id": snapshot["snapshot_id"], | |
| "snapshot_dir": str(snapshot_dir), | |
| "source_type": source_type, | |
| "active_updated": bool(options.replace_active), | |
| "row_counts": counts, | |
| } | |
| except Exception as exc: | |
| with suppress(Exception): | |
| connection.execute("ROLLBACK") | |
| update_run_status( | |
| connection, | |
| run_id=run_id, | |
| status="failed", | |
| finished_at=iso_timestamp(), | |
| notes=str(exc), | |
| ) | |
| raise | |
| finally: | |
| connection.close() | |
| def get_pr_search_status(db_path: Path, *, repo: str | None = None) -> dict[str, Any]: | |
| connection = connect_pr_search_db(db_path, read_only=True) | |
| try: | |
| active_run = resolve_active_run(connection, repo=repo) | |
| return { | |
| **_without_json_fields(active_run), | |
| "settings": _json_dict(active_run.get("settings_json")), | |
| "row_counts": get_run_counts(connection, run_id=str(active_run["id"])), | |
| } | |
| finally: | |
| connection.close() | |
| def get_pr_search_similar( | |
| db_path: Path, | |
| *, | |
| pr_number: int, | |
| repo: str | None = None, | |
| limit: int = 10, | |
| ) -> dict[str, Any]: | |
| connection = connect_pr_search_db(db_path, read_only=True) | |
| try: | |
| active_run = resolve_active_run(connection, repo=repo) | |
| run_id = str(active_run["id"]) | |
| document = _require_document(connection, run_id=run_id, pr_number=pr_number) | |
| similar_rows = get_similar_pr_rows( | |
| connection, run_id=run_id, pr_number=pr_number, limit=limit | |
| ) | |
| cluster_ids_by_pr = get_cluster_ids_for_prs( | |
| connection, | |
| run_id=run_id, | |
| pr_numbers=[int(row["neighbor_pr_number"]) for row in similar_rows], | |
| ) | |
| results = [] | |
| for row in similar_rows: | |
| results.append( | |
| { | |
| **_without_json_fields(row), | |
| "neighbor_title": _require_document( | |
| connection, | |
| run_id=run_id, | |
| pr_number=int(row["neighbor_pr_number"]), | |
| )["title"], | |
| "cluster_ids": cluster_ids_by_pr.get(int(row["neighbor_pr_number"]), []), | |
| "shared_filenames": _json_list(row.get("shared_filenames_json")), | |
| "shared_directories": _json_list(row.get("shared_directories_json")), | |
| } | |
| ) | |
| return { | |
| "repo": active_run["repo"], | |
| "snapshot_id": active_run["snapshot_id"], | |
| "run_id": run_id, | |
| "pr": document, | |
| "similar_prs": results, | |
| "similar_count": len(results), | |
| } | |
| finally: | |
| connection.close() | |
| def get_pr_search_candidate_clusters( | |
| db_path: Path, | |
| *, | |
| pr_number: int, | |
| repo: str | None = None, | |
| limit: int = 5, | |
| ) -> dict[str, Any]: | |
| connection = connect_pr_search_db(db_path, read_only=True) | |
| try: | |
| active_run = resolve_active_run(connection, repo=repo) | |
| run_id = str(active_run["id"]) | |
| document = _require_document(connection, run_id=run_id, pr_number=pr_number) | |
| rows = get_candidate_cluster_rows( | |
| connection, run_id=run_id, pr_number=pr_number, limit=limit | |
| ) | |
| candidates = [] | |
| for row in rows: | |
| evidence = _json_dict(row.get("evidence_json")) | |
| candidates.append( | |
| { | |
| **_without_json_fields(row), | |
| "shared_filenames": _json_list(row.get("shared_filenames_json")), | |
| "shared_directories": _json_list(row.get("shared_directories_json")), | |
| "evidence": evidence, | |
| "matched_member_pr_numbers": evidence.get("matched_member_pr_numbers") or [], | |
| "reason": evidence.get("reason") or "", | |
| } | |
| ) | |
| return { | |
| "repo": active_run["repo"], | |
| "snapshot_id": active_run["snapshot_id"], | |
| "run_id": run_id, | |
| "pr": document, | |
| "candidate_clusters": candidates, | |
| "candidate_cluster_count": len(candidates), | |
| } | |
| finally: | |
| connection.close() | |
| def get_pr_search_contributor( | |
| db_path: Path, | |
| *, | |
| author_login: str, | |
| repo: str | None = None, | |
| ) -> dict[str, Any]: | |
| connection = connect_pr_search_db(db_path, read_only=True) | |
| try: | |
| active_run = resolve_active_run(connection, repo=repo) | |
| run_id = str(active_run["id"]) | |
| contributor = _require_contributor(connection, run_id=run_id, author_login=author_login) | |
| pulls = _document_rows( | |
| get_contributor_pulls(connection, run_id=run_id, author_login=author_login, limit=20) | |
| ) | |
| return { | |
| "repo": active_run["repo"], | |
| "snapshot_id": active_run["snapshot_id"], | |
| "run_id": run_id, | |
| "contributor": contributor, | |
| "pulls": pulls, | |
| "pull_count": len(pulls), | |
| } | |
| finally: | |
| connection.close() | |
| def get_pr_search_contributor_pulls( | |
| db_path: Path, | |
| *, | |
| author_login: str, | |
| repo: str | None = None, | |
| limit: int = 20, | |
| ) -> dict[str, Any]: | |
| connection = connect_pr_search_db(db_path, read_only=True) | |
| try: | |
| active_run = resolve_active_run(connection, repo=repo) | |
| run_id = str(active_run["id"]) | |
| contributor = _require_contributor(connection, run_id=run_id, author_login=author_login) | |
| pulls = _document_rows( | |
| get_contributor_pulls(connection, run_id=run_id, author_login=author_login, limit=limit) | |
| ) | |
| return { | |
| "repo": active_run["repo"], | |
| "snapshot_id": active_run["snapshot_id"], | |
| "run_id": run_id, | |
| "contributor": contributor, | |
| "pulls": pulls, | |
| "pull_count": len(pulls), | |
| } | |
| finally: | |
| connection.close() | |
| def get_pr_search_pull_contributor( | |
| db_path: Path, | |
| *, | |
| pr_number: int, | |
| repo: str | None = None, | |
| ) -> dict[str, Any]: | |
| connection = connect_pr_search_db(db_path, read_only=True) | |
| try: | |
| active_run = resolve_active_run(connection, repo=repo) | |
| run_id = str(active_run["id"]) | |
| document = _require_document(connection, run_id=run_id, pr_number=pr_number) | |
| author_login = str(document.get("author_login") or "").strip() | |
| if not author_login: | |
| raise ValueError(f"PR #{pr_number} does not have an indexed author_login.") | |
| contributor = _require_contributor(connection, run_id=run_id, author_login=author_login) | |
| return { | |
| "repo": active_run["repo"], | |
| "snapshot_id": active_run["snapshot_id"], | |
| "run_id": run_id, | |
| "pr": _without_json_fields(document), | |
| "contributor": contributor, | |
| } | |
| finally: | |
| connection.close() | |
| def get_pr_search_similar_lookup( | |
| db_path: Path, | |
| *, | |
| pr_number: int, | |
| repo: str | None = None, | |
| limit: int = 10, | |
| mode: str = "auto", | |
| client: ProbeClientLike | None = None, | |
| ) -> dict[str, Any]: | |
| resolved_mode = _normalize_lookup_mode(mode) | |
| if resolved_mode != "live": | |
| try: | |
| result = get_pr_search_similar(db_path, pr_number=pr_number, repo=repo, limit=limit) | |
| except ValueError as exc: | |
| if resolved_mode == "indexed" or not _is_index_miss(exc): | |
| raise | |
| else: | |
| result["query"] = { | |
| "pr_number": pr_number, | |
| "mode_requested": resolved_mode, | |
| "mode_used": "indexed", | |
| "source": "active_index", | |
| } | |
| return result | |
| live_result = probe_pr_search_live( | |
| db_path, | |
| pr_number=pr_number, | |
| repo=repo, | |
| limit=limit, | |
| client=client, | |
| ) | |
| return { | |
| "repo": live_result["repo"], | |
| "snapshot_id": live_result["snapshot_id"], | |
| "run_id": live_result["run_id"], | |
| "query": { | |
| "pr_number": pr_number, | |
| "mode_requested": resolved_mode, | |
| "mode_used": "live", | |
| "source": live_result["probe_source"]["provider"], | |
| }, | |
| "pr": live_result["probe_pr"], | |
| "probe_source": live_result["probe_source"], | |
| "similar_prs": live_result["similar_prs"], | |
| "similar_count": len(live_result["similar_prs"]), | |
| } | |
| def get_pr_search_clusters( | |
| db_path: Path, | |
| *, | |
| pr_number: int, | |
| repo: str | None = None, | |
| limit: int = 5, | |
| mode: str = "auto", | |
| client: ProbeClientLike | None = None, | |
| ) -> dict[str, Any]: | |
| resolved_mode = _normalize_lookup_mode(mode) | |
| if resolved_mode != "live": | |
| try: | |
| result = _get_pr_search_clusters_indexed( | |
| db_path, | |
| pr_number=pr_number, | |
| repo=repo, | |
| limit=limit, | |
| ) | |
| except ValueError as exc: | |
| if resolved_mode == "indexed" or not _is_index_miss(exc): | |
| raise | |
| else: | |
| result["query"] = { | |
| "pr_number": pr_number, | |
| "mode_requested": resolved_mode, | |
| "mode_used": "indexed", | |
| "source": "active_index", | |
| } | |
| return result | |
| live_result = probe_pr_search_live( | |
| db_path, | |
| pr_number=pr_number, | |
| repo=repo, | |
| limit=limit, | |
| client=client, | |
| ) | |
| return { | |
| "repo": live_result["repo"], | |
| "snapshot_id": live_result["snapshot_id"], | |
| "run_id": live_result["run_id"], | |
| "query": { | |
| "pr_number": pr_number, | |
| "mode_requested": resolved_mode, | |
| "mode_used": "live", | |
| "source": live_result["probe_source"]["provider"], | |
| }, | |
| "pr": live_result["probe_pr"], | |
| "probe_source": live_result["probe_source"], | |
| "assigned_clusters": [], | |
| "candidate_clusters": live_result["candidate_clusters"], | |
| "assigned_cluster_count": 0, | |
| "candidate_cluster_count": len(live_result["candidate_clusters"]), | |
| } | |
| def get_pr_search_cluster( | |
| db_path: Path, | |
| *, | |
| cluster_id: str, | |
| repo: str | None = None, | |
| ) -> dict[str, Any]: | |
| connection = connect_pr_search_db(db_path, read_only=True) | |
| try: | |
| active_run = resolve_active_run(connection, repo=repo) | |
| run_id = str(active_run["id"]) | |
| cluster = get_cluster(connection, run_id=run_id, cluster_id=cluster_id) | |
| if cluster is None: | |
| raise ValueError(f"Cluster {cluster_id!r} was not found in the active run.") | |
| members = get_cluster_members(connection, run_id=run_id, cluster_id=cluster_id) | |
| return { | |
| "repo": active_run["repo"], | |
| "snapshot_id": active_run["snapshot_id"], | |
| "run_id": run_id, | |
| "cluster": _cluster_summary(cluster), | |
| "members": members, | |
| "member_count": len(members), | |
| } | |
| finally: | |
| connection.close() | |
| def list_pr_search_clusters( | |
| db_path: Path, | |
| *, | |
| repo: str | None = None, | |
| limit: int = 50, | |
| ) -> dict[str, Any]: | |
| connection = connect_pr_search_db(db_path, read_only=True) | |
| try: | |
| active_run = resolve_active_run(connection, repo=repo) | |
| run_id = str(active_run["id"]) | |
| rows = fetch_rows( | |
| connection, | |
| """ | |
| SELECT | |
| cl.*, | |
| d.title AS representative_title, | |
| d.html_url AS representative_html_url, | |
| d.state AS representative_state, | |
| d.draft AS representative_draft | |
| FROM pr_scope_clusters AS cl | |
| LEFT JOIN pr_search_documents AS d | |
| ON d.run_id = cl.run_id AND d.pr_number = cl.representative_pr_number | |
| WHERE cl.run_id = ? | |
| ORDER BY cl.cluster_size DESC, cl.average_similarity DESC, cl.cluster_id | |
| LIMIT ? | |
| """, | |
| [run_id, limit], | |
| ) | |
| clusters = [] | |
| for index, row in enumerate(rows, start=1): | |
| clusters.append({"rank": index, **_cluster_summary(row)}) | |
| return { | |
| "repo": active_run["repo"], | |
| "snapshot_id": active_run["snapshot_id"], | |
| "run_id": run_id, | |
| "clusters": clusters, | |
| "cluster_count": len(clusters), | |
| } | |
| finally: | |
| connection.close() | |
| def explain_pr_search_pair( | |
| db_path: Path, | |
| *, | |
| left_pr_number: int, | |
| right_pr_number: int, | |
| repo: str | None = None, | |
| ) -> dict[str, Any]: | |
| connection = connect_pr_search_db(db_path, read_only=True) | |
| try: | |
| active_run = resolve_active_run(connection, repo=repo) | |
| run_id = str(active_run["id"]) | |
| left_document = _require_document(connection, run_id=run_id, pr_number=left_pr_number) | |
| right_document = _require_document(connection, run_id=run_id, pr_number=right_pr_number) | |
| neighbor_row = get_pair_neighbor_row( | |
| connection, | |
| run_id=run_id, | |
| left_pr_number=left_pr_number, | |
| right_pr_number=right_pr_number, | |
| ) | |
| shared_cluster_ids = get_shared_cluster_ids( | |
| connection, | |
| run_id=run_id, | |
| left_pr_number=left_pr_number, | |
| right_pr_number=right_pr_number, | |
| ) | |
| if neighbor_row is not None: | |
| shared_filenames = _json_list(neighbor_row.get("shared_filenames_json")) | |
| shared_directories = _json_list(neighbor_row.get("shared_directories_json")) | |
| return { | |
| "repo": active_run["repo"], | |
| "snapshot_id": active_run["snapshot_id"], | |
| "run_id": run_id, | |
| "materialized": True, | |
| "left_pr": left_document, | |
| "right_pr": right_document, | |
| "pair": { | |
| "similarity": neighbor_row["similarity"], | |
| "content_similarity": neighbor_row["content_similarity"], | |
| "size_similarity": neighbor_row["size_similarity"], | |
| "breadth_similarity": neighbor_row["breadth_similarity"], | |
| "concentration_similarity": neighbor_row["concentration_similarity"], | |
| "shared_filenames": shared_filenames, | |
| "shared_directories": shared_directories, | |
| }, | |
| "shared_cluster_ids": shared_cluster_ids, | |
| } | |
| left_feature = _require_feature(connection, run_id=run_id, pr_number=left_pr_number) | |
| right_feature = _require_feature(connection, run_id=run_id, pr_number=right_pr_number) | |
| pair = scope_feature_pair_explanation( | |
| left_feature, | |
| right_feature, | |
| options=scope_options_from_settings(_json_dict(active_run.get("settings_json"))), | |
| ) | |
| return { | |
| "repo": active_run["repo"], | |
| "snapshot_id": active_run["snapshot_id"], | |
| "run_id": run_id, | |
| "materialized": False, | |
| "left_pr": left_document, | |
| "right_pr": right_document, | |
| "pair": pair, | |
| "shared_cluster_ids": shared_cluster_ids, | |
| } | |
| finally: | |
| connection.close() | |
| def probe_pr_search_live( | |
| db_path: Path, | |
| *, | |
| pr_number: int, | |
| repo: str | None = None, | |
| limit: int = 10, | |
| client: ProbeClientLike | None = None, | |
| ) -> dict[str, Any]: | |
| connection = connect_pr_search_db(db_path, read_only=True) | |
| try: | |
| active_run = resolve_active_run(connection, repo=repo) | |
| run_id = str(active_run["id"]) | |
| repo_slug = repo or str(active_run["repo"]) | |
| repo_ref = RepoRef.parse(repo_slug) | |
| settings = scope_options_from_settings(_json_dict(active_run.get("settings_json"))) | |
| indexed_documents = fetch_rows( | |
| connection, | |
| """ | |
| SELECT * | |
| FROM pr_search_documents | |
| WHERE run_id = ? | |
| ORDER BY pr_number | |
| """, | |
| [run_id], | |
| ) | |
| indexed_features = fetch_rows( | |
| connection, | |
| """ | |
| SELECT * | |
| FROM pr_scope_features | |
| WHERE run_id = ? | |
| ORDER BY pr_number | |
| """, | |
| [run_id], | |
| ) | |
| run_artifact = get_scope_run_artifact(connection, run_id=run_id) | |
| cluster_rows = fetch_rows( | |
| connection, | |
| """ | |
| SELECT * | |
| FROM pr_scope_clusters | |
| WHERE run_id = ? | |
| ORDER BY cluster_id | |
| """, | |
| [run_id], | |
| ) | |
| cluster_member_rows = fetch_rows( | |
| connection, | |
| """ | |
| SELECT cluster_id, pr_number | |
| FROM pr_scope_cluster_members | |
| WHERE run_id = ? | |
| ORDER BY cluster_id, pr_number | |
| """, | |
| [run_id], | |
| ) | |
| cluster_members: dict[str, list[int]] = {} | |
| for row in cluster_member_rows: | |
| cluster_members.setdefault(str(row["cluster_id"]), []).append(int(row["pr_number"])) | |
| probe_client = client or GitHubClient(token=resolve_github_token()) | |
| extracted_at = iso_timestamp() | |
| pr_detail = probe_client.get_pull_request(repo_ref.owner, repo_ref.name, pr_number) | |
| pr_row = normalize_pull_request( | |
| repo_ref.slug, | |
| pr_detail, | |
| pr_detail, | |
| str(active_run["snapshot_id"]), | |
| extracted_at, | |
| ) | |
| pr_files = [ | |
| normalize_pr_file( | |
| repo_ref.slug, | |
| pr_number, | |
| item, | |
| str(active_run["snapshot_id"]), | |
| extracted_at, | |
| ) | |
| for item in probe_client.iter_pull_files(repo_ref.owner, repo_ref.name, pr_number) | |
| ] | |
| feature_idf = ( | |
| _json_float_dict(run_artifact.get("idf_json")) if run_artifact is not None else {} | |
| ) | |
| if not feature_idf: | |
| snapshot = load_pr_search_snapshot(Path(str(active_run["snapshot_dir"]))) | |
| feature_idf = build_scope_feature_idf_for_indexed_documents( | |
| indexed_documents, | |
| snapshot["pr_files"], | |
| options=settings, | |
| ) | |
| query_feature = build_scope_feature_for_pull_request( | |
| pr_row, | |
| pr_files, | |
| feature_idf=feature_idf, | |
| options=settings, | |
| ) | |
| similarity_rows = rank_scope_feature_matches( | |
| query_feature, | |
| indexed_features, | |
| options=settings, | |
| limit=limit, | |
| ) | |
| cluster_ids_by_pr = get_cluster_ids_for_prs( | |
| connection, | |
| run_id=run_id, | |
| pr_numbers=[int(row["right_pr_number"]) for row in similarity_rows], | |
| ) | |
| live_similar_prs = [] | |
| for row in similarity_rows: | |
| indexed_document = _require_document( | |
| connection, | |
| run_id=run_id, | |
| pr_number=int(row["right_pr_number"]), | |
| ) | |
| live_similar_prs.append( | |
| { | |
| **row, | |
| "neighbor_pr_number": int(row["right_pr_number"]), | |
| "neighbor_title": indexed_document["title"], | |
| "cluster_ids": cluster_ids_by_pr.get(int(row["right_pr_number"]), []), | |
| } | |
| ) | |
| assigned_cluster_ids = set( | |
| get_cluster_ids_for_prs(connection, run_id=run_id, pr_numbers=[pr_number]).get( | |
| pr_number, [] | |
| ) | |
| ) | |
| candidate_clusters = rank_scope_cluster_candidates( | |
| similarity_rows=similarity_rows, | |
| clusters=cluster_rows, | |
| cluster_members=cluster_members, | |
| assigned_cluster_ids=assigned_cluster_ids, | |
| limit=min(5, max(limit, 1)), | |
| ) | |
| cluster_by_id = {str(row["cluster_id"]): row for row in cluster_rows} | |
| for row in candidate_clusters: | |
| cluster = cluster_by_id[row["cluster_id"]] | |
| row.update( | |
| { | |
| "representative_pr_number": cluster["representative_pr_number"], | |
| "cluster_size": cluster["cluster_size"], | |
| "average_similarity": cluster["average_similarity"], | |
| "summary": cluster["summary"], | |
| "shared_filenames": _json_list(cluster.get("shared_filenames_json")), | |
| "shared_directories": _json_list(cluster.get("shared_directories_json")), | |
| "matched_member_pr_numbers": row["evidence"].get("matched_member_pr_numbers") | |
| or [], | |
| "reason": row["evidence"].get("reason") or "", | |
| } | |
| ) | |
| return { | |
| "repo": repo_slug, | |
| "snapshot_id": active_run["snapshot_id"], | |
| "run_id": run_id, | |
| "probe_pr": { | |
| "pr_number": pr_number, | |
| "title": pr_row.get("title") or "", | |
| "html_url": pr_row.get("html_url"), | |
| "base_ref": pr_row.get("base_ref"), | |
| "changed_files": int(pr_row.get("changed_files") or 0), | |
| }, | |
| "probe_source": _probe_source_metadata( | |
| probe_client, | |
| owner=repo_ref.owner, | |
| repo=repo_ref.name, | |
| number=pr_number, | |
| ), | |
| "similar_prs": live_similar_prs, | |
| "candidate_clusters": candidate_clusters, | |
| } | |
| finally: | |
| connection.close() | |
| def probe_pr_search_github( | |
| db_path: Path, | |
| *, | |
| pr_number: int, | |
| repo: str | None = None, | |
| limit: int = 10, | |
| client: ProbeClientLike | None = None, | |
| ) -> dict[str, Any]: | |
| return probe_pr_search_live( | |
| db_path, | |
| pr_number=pr_number, | |
| repo=repo, | |
| limit=limit, | |
| client=client, | |
| ) | |
| def resolve_pr_search_db_path(db_path: Path | None, *, output_dir: Path) -> Path: | |
| return (db_path or output_dir / "state" / "pr-search.duckdb").resolve() | |
| def _scoped_rows(rows: list[dict[str, Any]], **extra: Any) -> list[dict[str, Any]]: | |
| return [{**extra, **row} for row in rows] | |
| def _get_pr_search_clusters_indexed( | |
| db_path: Path, | |
| *, | |
| pr_number: int, | |
| repo: str | None = None, | |
| limit: int = 5, | |
| ) -> dict[str, Any]: | |
| connection = connect_pr_search_db(db_path, read_only=True) | |
| try: | |
| active_run = resolve_active_run(connection, repo=repo) | |
| run_id = str(active_run["id"]) | |
| document = _require_document(connection, run_id=run_id, pr_number=pr_number) | |
| candidate_rows = get_candidate_cluster_rows( | |
| connection, | |
| run_id=run_id, | |
| pr_number=pr_number, | |
| limit=limit, | |
| ) | |
| assigned_cluster_ids = get_cluster_ids_for_prs( | |
| connection, | |
| run_id=run_id, | |
| pr_numbers=[pr_number], | |
| ).get(pr_number, []) | |
| assigned_clusters = [] | |
| for cluster_id in assigned_cluster_ids: | |
| cluster = get_cluster(connection, run_id=run_id, cluster_id=cluster_id) | |
| if cluster is None: | |
| continue | |
| assigned_clusters.append(_cluster_summary(cluster)) | |
| candidates = [] | |
| for row in candidate_rows: | |
| evidence = _json_dict(row.get("evidence_json")) | |
| candidates.append( | |
| { | |
| **_without_json_fields(row), | |
| "shared_filenames": _json_list(row.get("shared_filenames_json")), | |
| "shared_directories": _json_list(row.get("shared_directories_json")), | |
| "evidence": evidence, | |
| "matched_member_pr_numbers": evidence.get("matched_member_pr_numbers") or [], | |
| "reason": evidence.get("reason") or "", | |
| } | |
| ) | |
| return { | |
| "repo": active_run["repo"], | |
| "snapshot_id": active_run["snapshot_id"], | |
| "run_id": run_id, | |
| "pr": document, | |
| "assigned_clusters": assigned_clusters, | |
| "candidate_clusters": candidates, | |
| "assigned_cluster_count": len(assigned_clusters), | |
| "candidate_cluster_count": len(candidates), | |
| } | |
| finally: | |
| connection.close() | |
| def _require_document(connection: Any, *, run_id: str, pr_number: int) -> dict[str, Any]: | |
| document = get_document(connection, run_id=run_id, pr_number=pr_number) | |
| if document is None: | |
| raise ValueError(f"PR #{pr_number} was not found in the active indexed universe.") | |
| return document | |
| def _require_feature(connection: Any, *, run_id: str, pr_number: int) -> dict[str, Any]: | |
| feature = get_feature(connection, run_id=run_id, pr_number=pr_number) | |
| if feature is None: | |
| raise ValueError(f"No scope feature row was found for PR #{pr_number}.") | |
| return feature | |
| def _require_contributor(connection: Any, *, run_id: str, author_login: str) -> dict[str, Any]: | |
| contributor = get_contributor(connection, run_id=run_id, author_login=author_login) | |
| if contributor is None: | |
| raise ValueError( | |
| f"Contributor {author_login!r} was not found in the active indexed universe." | |
| ) | |
| return _contributor_row(contributor) | |
| def _json_list(raw: Any) -> list[str]: | |
| if isinstance(raw, list): | |
| return [str(item) for item in raw] | |
| if isinstance(raw, str) and raw: | |
| payload = json.loads(raw) | |
| if isinstance(payload, list): | |
| return [str(item) for item in payload] | |
| return [] | |
| def _json_dict(raw: Any) -> dict[str, Any]: | |
| if isinstance(raw, dict): | |
| return dict(raw) | |
| if isinstance(raw, str) and raw: | |
| payload = json.loads(raw) | |
| if isinstance(payload, dict): | |
| return payload | |
| return {} | |
| def _json_float_dict(raw: Any) -> dict[str, float]: | |
| payload = _json_dict(raw) | |
| return {str(key): float(value) for key, value in payload.items()} | |
| def _cluster_summary(cluster: dict[str, Any]) -> dict[str, Any]: | |
| return { | |
| **_without_json_fields(cluster), | |
| "shared_filenames": _json_list(cluster.get("shared_filenames_json")), | |
| "shared_directories": _json_list(cluster.get("shared_directories_json")), | |
| } | |
| def _without_json_fields(row: Mapping[str, Any]) -> dict[str, Any]: | |
| return {str(key): value for key, value in row.items() if not str(key).endswith("_json")} | |
| def _document_rows(rows: Sequence[Mapping[str, Any]]) -> list[dict[str, Any]]: | |
| return [_without_json_fields(row) for row in rows] | |
| def _contributor_rows( | |
| rows: list[Mapping[str, Any]], | |
| *, | |
| run_id: str, | |
| repo: str, | |
| snapshot_id: str, | |
| ) -> list[dict[str, Any]]: | |
| return [ | |
| { | |
| "run_id": run_id, | |
| "repo": repo, | |
| "snapshot_id": snapshot_id, | |
| "report_generated_at": row.get("report_generated_at"), | |
| "window_days": row.get("window_days"), | |
| "author_login": row.get("author_login"), | |
| "name": row.get("name"), | |
| "profile_url": row.get("profile_url"), | |
| "repo_pull_requests_url": row.get("repo_pull_requests_url"), | |
| "repo_issues_url": row.get("repo_issues_url"), | |
| "repo_first_seen_at": row.get("repo_first_seen_at"), | |
| "repo_last_seen_at": row.get("repo_last_seen_at"), | |
| "repo_primary_artifact_count": row.get("repo_primary_artifact_count"), | |
| "repo_artifact_count": row.get("repo_artifact_count"), | |
| "snapshot_issue_count": row.get("snapshot_issue_count"), | |
| "snapshot_pr_count": row.get("snapshot_pr_count"), | |
| "snapshot_comment_count": row.get("snapshot_comment_count"), | |
| "snapshot_review_count": row.get("snapshot_review_count"), | |
| "snapshot_review_comment_count": row.get("snapshot_review_comment_count"), | |
| "repo_association": row.get("repo_association"), | |
| "new_to_repo": row.get("new_to_repo"), | |
| "first_seen_in_snapshot": row.get("first_seen_in_snapshot"), | |
| "report_reason": row.get("report_reason"), | |
| "account_age_days": row.get("account_age_days"), | |
| "young_account": row.get("young_account"), | |
| "follow_through_score": row.get("follow_through_score"), | |
| "breadth_score": row.get("breadth_score"), | |
| "automation_risk_signal": row.get("automation_risk_signal"), | |
| "heuristic_note": row.get("heuristic_note"), | |
| "public_orgs_json": row.get("public_orgs"), | |
| "visible_authored_pr_count": row.get("visible_authored_pr_count"), | |
| "merged_pr_count": row.get("merged_pr_count"), | |
| "closed_unmerged_pr_count": row.get("closed_unmerged_pr_count"), | |
| "open_pr_count": row.get("open_pr_count"), | |
| "merged_pr_rate": row.get("merged_pr_rate"), | |
| "closed_unmerged_pr_rate": row.get("closed_unmerged_pr_rate"), | |
| "still_open_pr_rate": row.get("still_open_pr_rate"), | |
| "distinct_repos_with_authored_prs": row.get("distinct_repos_with_authored_prs"), | |
| "distinct_repos_with_open_prs": row.get("distinct_repos_with_open_prs"), | |
| "fetch_error": row.get("fetch_error"), | |
| } | |
| for row in rows | |
| ] | |
| def _contributor_row(row: Mapping[str, Any]) -> dict[str, Any]: | |
| return { | |
| **_without_json_fields(row), | |
| "public_orgs": _json_list(row.get("public_orgs_json")), | |
| } | |
| def _normalize_lookup_mode(mode: str) -> str: | |
| normalized = mode.strip().lower() | |
| if normalized not in {"auto", "indexed", "live"}: | |
| raise ValueError(f"Unsupported mode {mode!r}; expected auto, indexed, or live.") | |
| return normalized | |
| def _is_index_miss(exc: ValueError) -> bool: | |
| return "active indexed universe" in str(exc) | |
| def _probe_source_metadata( | |
| client: Any, | |
| *, | |
| owner: str, | |
| repo: str, | |
| number: int, | |
| ) -> dict[str, Any]: | |
| metadata: dict[str, Any] = {"provider": _probe_provider(client)} | |
| base_url = getattr(client, "base_url", None) | |
| if isinstance(base_url, str) and base_url: | |
| metadata["base_url"] = base_url | |
| status_method = getattr(client, "get_pull_request_status", None) | |
| if not callable(status_method): | |
| return metadata | |
| try: | |
| status = status_method(owner, repo, number) | |
| except Exception as exc: | |
| metadata["status_error"] = str(exc) | |
| return metadata | |
| metadata.update(_normalize_probe_status(status)) | |
| return metadata | |
| def _normalize_probe_status(raw: Any) -> dict[str, Any]: | |
| if not isinstance(raw, dict): | |
| return {} | |
| indexed = raw.get("indexed") | |
| if indexed is None: | |
| indexed = raw.get("is_indexed") | |
| freshness = raw.get("index_freshness") | |
| if freshness is None: | |
| freshness = raw.get("freshness") | |
| last_indexed_at = raw.get("last_indexed_at") | |
| if last_indexed_at is None: | |
| last_indexed_at = raw.get("indexed_at") | |
| normalized: dict[str, Any] = {} | |
| if indexed is not None: | |
| normalized["indexed"] = bool(indexed) | |
| if freshness is not None: | |
| normalized["index_freshness"] = str(freshness) | |
| if last_indexed_at is not None: | |
| normalized["last_indexed_at"] = str(last_indexed_at) | |
| return normalized | |
| def _probe_provider(client: Any) -> str: | |
| provider = getattr(client, "provider", None) | |
| if isinstance(provider, str) and provider: | |
| return provider | |
| if isinstance(client, GitHubClient): | |
| return "github" | |
| return "live" | |