diff --git "a/src/slop_farmer/reports/analysis.py" "b/src/slop_farmer/reports/analysis.py" new file mode 100644--- /dev/null +++ "b/src/slop_farmer/reports/analysis.py" @@ -0,0 +1,3506 @@ +from __future__ import annotations + +import asyncio +import copy +import json +import os +import re +import shutil +import sys +from collections import Counter, defaultdict +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from pydantic import BaseModel, Field +from rank_bm25 import BM25Okapi + +from slop_farmer.config import AnalysisOptions, MarkdownReportOptions +from slop_farmer.data.links import build_text_link_rows +from slop_farmer.data.parquet_io import read_json, read_parquet_rows, write_text +from slop_farmer.data.snapshot_source import resolve_snapshot_source_dir +from slop_farmer.reports.analysis_cache import ( + HYBRID_REVIEW_CACHE_SCHEMA_VERSION, + PREPARED_REVIEW_UNIT_SCHEMA_VERSION, + HybridReviewCacheEntry, + HybridReviewCacheKey, + HybridReviewCacheManifest, + HybridReviewCacheStore, + HybridReviewSettingsFingerprint, + build_hybrid_review_cache_key, + hybrid_review_cache_dir, +) +from slop_farmer.reports.pr_heuristics import ( + build_template_cleanup_settings, + compile_cluster_suppression_rules, + strip_pull_request_template, + suppressed_pull_request_reasons, +) + +LINK_KEY_FIELDS = ( + "repo", + "source_type", + "source_number", + "source_github_id", + "target_owner", + "target_repo", + "target_number", + "link_type", + "link_origin", +) +STOPWORDS = { + "a", + "an", + "and", + "are", + "as", + "at", + "be", + "by", + "for", + "from", + "how", + "if", + "in", + "into", + "is", + "it", + "of", + "on", + "or", + "that", + "the", + "this", + "to", + "was", + "were", + "with", +} +TOKEN_PATTERN = re.compile(r"[a-z0-9_]+") +HUNK_HEADER_PATTERN = re.compile(r"^@@ -\d+(?:,\d+)? \+(?P\d+)(?:,(?P\d+))? @@") +LLM_PROVIDER_ENV_VARS = ( + "OPENAI_API_KEY", + "ANTHROPIC_API_KEY", + "GOOGLE_API_KEY", + "DEEPSEEK_API_KEY", +) +LLM_PACKET_CHARS_PER_TOKEN = 4 +LLM_MAX_INPUT_TOKENS = 60_000 +LLM_MAX_NODES_PER_PACKET = 48 +LLM_MAX_SOFT_PAIRS_PER_PACKET = 72 +LLM_MAX_DIFF_CHARS_PER_ITEM = 1_200 +LLM_MAX_FILENAMES_PER_ITEM = 16 +LLM_SKIP_EVALUATOR_ABOVE_TOKENS = 60_000 +LLM_OVERFLOW_POLICY = "truncate_then_skip" +LLM_SHARED_TARGET_MAX_NEIGHBORS_PER_PR = 3 +LLM_SHARED_TARGET_MAX_EXTRA_PAIRS_PER_TARGET = 18 +LLM_SHARED_TARGET_MIN_TEXT_JACCARD = 0.1 +CLUSTER_ANALYST_PROMPT_VERSION = "1.0" +CLUSTER_EVALUATOR_PROMPT_VERSION = "1.0" +CLUSTER_ANALYST_INSTRUCTION = ( + "You analyze clustered GitHub issues and pull requests for duplicate triage. " + "Return a short summary, confidence between 0 and 1, concise reasons for canonical issue/PR choices, " + "concise reasons for global best issue/PR suitability, and accept/reject verdicts for each soft edge candidate. " + "Only accept a soft edge when the two artifacts look like the same underlying bug or change. " + "Use titles, descriptions, explicit issue targets, changed filenames, and diff previews when available. " + "For pull requests, be strict: accept only when the PRs appear to fix the same concrete code-path problem and could plausibly be merged into one PR. " + "Do not merge PRs just because they mention the same tracking issue, touch the same broad subsystem, or both change documentation/tests." +) +CLUSTER_EVALUATOR_INSTRUCTION = ( + "You review the analyst output for precision. Accept only when the summary is grounded in the packet " + "and every soft-edge verdict is conservative. Reject if the analyst overstates evidence. " + "For pull-request pairs, reject if the two changes do not look mergeable into a single PR for the same bugfix." +) + + +class SoftEdgeVerdict(BaseModel): + left: str + right: str + accept: bool + reason: str + + +class ClusterAnalystResponse(BaseModel): + summary: str + confidence: float + canonical_issue_reason: str | None = None + canonical_pr_reason: str | None = None + best_issue_reason: str | None = None + best_pr_reason: str | None = None + soft_edge_verdicts: list[SoftEdgeVerdict] = Field(default_factory=list) + + +class ClusterEvaluatorResponse(BaseModel): + accept: bool + feedback: str = "" + + +class PrFileAreaEntry(BaseModel): + filename: str + left_ranges: list[list[int]] + right_ranges: list[list[int]] + + +class PrComparisonEntry(BaseModel): + left_pr_number: int + right_pr_number: int + code_similarity: float + size_similarity: float + file_overlap: float + area_overlap: float + patch_similarity: float + shared_filenames: list[str] + shared_file_areas: list[PrFileAreaEntry] + + +class MetaBugEntry(BaseModel): + cluster_id: str + summary: str + status: str + confidence: float + canonical_issue_number: int | None + canonical_pr_number: int | None + issue_numbers: list[int] + pr_numbers: list[int] + evidence_types: list[str] + pr_comparisons: list[PrComparisonEntry] = Field(default_factory=list) + + +class DuplicateIssuesEntry(BaseModel): + cluster_id: str + canonical_issue_number: int + duplicate_issue_numbers: list[int] + reason: str + + +class DuplicatePrsEntry(BaseModel): + cluster_id: str + canonical_pr_number: int + duplicate_pr_numbers: list[int] + target_issue_number: int | None + reason: str + + +class BestIssueEntry(BaseModel): + cluster_id: str + issue_number: int + reason: str + score: float + + +class BestPrEntry(BaseModel): + cluster_id: str + pr_number: int + reason: str + score: float + + +class AnalysisReport(BaseModel): + schema_version: str + repo: str + snapshot_id: str + generated_at: str + evidence_quality: str + llm_enrichment: bool + meta_bugs: list[MetaBugEntry] + duplicate_issues: list[DuplicateIssuesEntry] + duplicate_prs: list[DuplicatePrsEntry] + best_issue: BestIssueEntry | None + best_pr: BestPrEntry | None + + +@dataclass(slots=True) +class SnapshotData: + repo: str + snapshot_id: str + snapshot_dir: Path + manifest: dict[str, Any] + issues: list[dict[str, Any]] + pull_requests: list[dict[str, Any]] + comments: list[dict[str, Any]] + reviews: list[dict[str, Any]] + review_comments: list[dict[str, Any]] + pr_files: list[dict[str, Any]] + pr_diffs: list[dict[str, Any]] + links: list[dict[str, Any]] + events: list[dict[str, Any]] + evidence_quality: str + + +@dataclass(slots=True) +class ArtifactFeature: + node_id: str + kind: str + number: int + row: dict[str, Any] + tokens: list[str] + title_tokens: set[str] + title_length: int + body_length: int + discussion_activity: int + review_activity: int + inbound_references: int + explicit_issue_links: int + explicit_issue_targets: list[int] + diff_size: int + filenames: list[str] + diff_preview: str | None + file_ranges_by_name: dict[str, list[tuple[int, int]]] + patch_tokens: list[str] + + +@dataclass(slots=True) +class ClusterRecord: + cluster_id: str + nodes: list[str] + issue_numbers: list[int] + pr_numbers: list[int] + evidence_types: list[str] + canonical_issue_number: int | None + canonical_pr_number: int | None + target_issue_number: int | None + summary: str + status: str + confidence: float + canonical_issue_reason: str | None + canonical_pr_reason: str | None + best_issue_reason: str | None + best_pr_reason: str | None + cluster_score: float + best_issue_score: float | None + best_pr_score: float | None + + +@dataclass(frozen=True, slots=True) +class PacketBudget: + node_count: int + item_count: int + soft_pair_count: int + serialized_chars: int + estimated_input_tokens: int + estimated_eval_tokens: int + + +@dataclass(frozen=True, slots=True) +class PreparedLlmPacket: + packet: dict[str, Any] + budget: PacketBudget + original_budget: PacketBudget + trimmed: bool + aggressively_trimmed: bool + split: bool + + +@dataclass(frozen=True, slots=True) +class ClusterAnalysisCallResult: + analyst_result: ClusterAnalystResponse | None + evaluator_result: ClusterEvaluatorResponse | None + error_kind: str | None + error_message: str | None + evaluator_used: bool + retried: bool + + +@dataclass(frozen=True, slots=True) +class AnalysisBuildResult: + report: AnalysisReport + llm_reviews: list[dict[str, Any]] + + +@dataclass(frozen=True, slots=True) +class SoftPairReviewUnitMeta: + label: str + component_index: int + component_count: int + review_unit_index: int + review_unit_count: int + cluster_id: str + prefix: str + nodes: tuple[str, ...] + soft_pairs: tuple[str, ...] + component_budget: PacketBudget + budget: PacketBudget + prepared_review_unit_hash: str | None + trimmed: bool + aggressively_trimmed: bool + split: bool + + +@dataclass(frozen=True, slots=True) +class PendingSoftPairReview: + meta: SoftPairReviewUnitMeta + prepared: PreparedLlmPacket + cache_key: HybridReviewCacheKey + + +@dataclass(frozen=True, slots=True) +class CompletedSoftPairReview: + meta: SoftPairReviewUnitMeta + result: ClusterAnalysisCallResult | None + status: str + reason: str | None + source: str | None + cache_hit: bool + + +def _hybrid_review_cache_manifest() -> HybridReviewCacheManifest: + return HybridReviewCacheManifest( + cache_schema_version=HYBRID_REVIEW_CACHE_SCHEMA_VERSION, + prepared_review_unit_schema_version=PREPARED_REVIEW_UNIT_SCHEMA_VERSION, + analyst_prompt_version=CLUSTER_ANALYST_PROMPT_VERSION, + evaluator_prompt_version=CLUSTER_EVALUATOR_PROMPT_VERSION, + hybrid_review_settings=HybridReviewSettingsFingerprint( + llm_max_input_tokens=LLM_MAX_INPUT_TOKENS, + llm_max_nodes_per_packet=LLM_MAX_NODES_PER_PACKET, + llm_max_soft_pairs_per_packet=LLM_MAX_SOFT_PAIRS_PER_PACKET, + llm_max_diff_chars_per_item=LLM_MAX_DIFF_CHARS_PER_ITEM, + llm_max_filenames_per_item=LLM_MAX_FILENAMES_PER_ITEM, + llm_skip_evaluator_above_tokens=LLM_SKIP_EVALUATOR_ABOVE_TOKENS, + llm_overflow_policy=LLM_OVERFLOW_POLICY, + ), + ) + + +def _prepared_review_unit_payload(prepared: PreparedLlmPacket) -> dict[str, Any]: + return { + "packet": copy.deepcopy(prepared.packet), + "budget": _packet_budget_json(prepared.budget), + "original_budget": _packet_budget_json(prepared.original_budget), + "trimmed": prepared.trimmed, + "aggressively_trimmed": prepared.aggressively_trimmed, + "split": prepared.split, + } + + +def _cluster_analysis_call_result_payload(result: ClusterAnalysisCallResult) -> dict[str, Any]: + return { + "analyst_result": ( + None if result.analyst_result is None else result.analyst_result.model_dump(mode="json") + ), + "evaluator_result": ( + None + if result.evaluator_result is None + else result.evaluator_result.model_dump(mode="json") + ), + "error_kind": result.error_kind, + "error_message": result.error_message, + "evaluator_used": result.evaluator_used, + "retried": result.retried, + } + + +def _cluster_analysis_call_result_from_payload( + payload: dict[str, Any], +) -> ClusterAnalysisCallResult: + return ClusterAnalysisCallResult( + analyst_result=( + None + if payload.get("analyst_result") is None + else ClusterAnalystResponse.model_validate(payload["analyst_result"]) + ), + evaluator_result=( + None + if payload.get("evaluator_result") is None + else ClusterEvaluatorResponse.model_validate(payload["evaluator_result"]) + ), + error_kind=payload.get("error_kind"), + error_message=payload.get("error_message"), + evaluator_used=bool(payload.get("evaluator_used", False)), + retried=bool(payload.get("retried", False)), + ) + + +def _cacheable_cluster_analysis_result(result: ClusterAnalysisCallResult) -> bool: + return result.analyst_result is not None and result.error_kind is None + + +def run_analysis(options: AnalysisOptions) -> Path: + if options.snapshot_dir is not None and options.hf_repo_id: + raise ValueError("--snapshot-dir and --hf-repo-id are mutually exclusive") + warning = _llm_fallback_warning(options) + if warning: + _analysis_log(warning) + snapshot_dir = _resolve_snapshot_dir(options) + snapshot = _load_snapshot(snapshot_dir) + _maybe_carry_forward_hybrid_review_cache(snapshot, enabled=options.cached_analysis) + build = asyncio.run(_build_report(snapshot, options)) + output_path = options.output or (snapshot_dir / "analysis-report.json") + output_path.parent.mkdir(parents=True, exist_ok=True) + write_text(json.dumps(build.report.model_dump(mode="json"), indent=2) + "\n", output_path) + llm_reviews_path = _llm_reviews_output_path(output_path) + if build.llm_reviews: + write_text( + json.dumps( + { + "schema_version": "1.0", + "repo": build.report.repo, + "snapshot_id": build.report.snapshot_id, + "generated_at": build.report.generated_at, + "model": options.model, + "reviews": build.llm_reviews, + }, + indent=2, + ) + + "\n", + llm_reviews_path, + ) + elif llm_reviews_path.exists(): + llm_reviews_path.unlink() + _log_hybrid_review_cache_summary(build.llm_reviews, enabled=options.cached_analysis) + return output_path + + +def _analysis_log(message: str) -> None: + stamp = datetime.now(tz=UTC).strftime("%H:%M:%SZ") + print(f"[{stamp}] {message}", file=sys.stderr, flush=True) + + +def _llm_reviews_output_path(output_path: Path) -> Path: + return output_path.with_name(f"{output_path.stem}.llm-reviews.json") + + +def _llm_fallback_warning(options: AnalysisOptions) -> str | None: + if options.ranking_backend != "hybrid": + return None + if _can_use_fast_agent(): + return None + return ( + "Analyze requested ranking-backend=hybrid but fast-agent LLM enrichment is unavailable; " + "reusing cached hybrid review results when available and falling back to deterministic-only clustering " + "for cache misses. " + "Install the llm extra and set one of " + f"{', '.join(LLM_PROVIDER_ENV_VARS)}." + ) + + +def _maybe_carry_forward_hybrid_review_cache(snapshot: SnapshotData, *, enabled: bool) -> None: + if not enabled: + return + current_cache_dir = hybrid_review_cache_dir(snapshot.snapshot_dir) + if current_cache_dir.exists(): + _analysis_log( + f"Cached analysis enabled: using existing analysis-state in {current_cache_dir}" + ) + return + watermark = snapshot.manifest.get("watermark") + if not isinstance(watermark, dict): + _analysis_log("Cached analysis enabled: no previous snapshot recorded; starting fresh") + return + previous_snapshot_dir = watermark.get("previous_snapshot_dir") + if not isinstance(previous_snapshot_dir, str) or not previous_snapshot_dir: + _analysis_log("Cached analysis enabled: no previous snapshot recorded; starting fresh") + return + previous_cache_dir = hybrid_review_cache_dir(Path(previous_snapshot_dir)) + if not previous_cache_dir.exists(): + _analysis_log( + "Cached analysis enabled: previous snapshot has no analysis-state; starting fresh" + ) + return + shutil.copytree(previous_cache_dir, current_cache_dir) + _analysis_log( + f"Cached analysis enabled: copied analysis-state from {previous_cache_dir} to {current_cache_dir}" + ) + + +def _log_hybrid_review_cache_summary(llm_reviews: list[dict[str, Any]], *, enabled: bool) -> None: + if not enabled: + return + if not llm_reviews: + _analysis_log("Hybrid review cache summary: no LLM review units were produced") + return + reviewed = [review for review in llm_reviews if review.get("status") == "reviewed"] + cache_hits = [review for review in reviewed if review.get("cache_hit")] + cache_sourced = [review for review in reviewed if review.get("source") == "cache"] + llm_sourced = [review for review in reviewed if review.get("source") == "llm"] + skipped = [review for review in llm_reviews if review.get("status") != "reviewed"] + hit_rate = 100.0 * len(cache_hits) / len(reviewed) if reviewed else 0.0 + _analysis_log( + "Hybrid review cache summary: " + f"{len(cache_hits)}/{len(reviewed)} reviewed units reused from cache " + f"({hit_rate:.1f}%); " + f"source_cache={len(cache_sourced)}, source_llm={len(llm_sourced)}, skipped={len(skipped)}" + ) + if skipped: + reasons = Counter(str(review.get("reason")) for review in skipped if review.get("reason")) + if reasons: + formatted = ", ".join(f"{reason}={count}" for reason, count in reasons.most_common(5)) + _analysis_log(f"Hybrid review cache skipped reasons: {formatted}") + + +def render_markdown_report(options: MarkdownReportOptions) -> Path: + input_path = options.input.resolve() + report = AnalysisReport.model_validate(read_json(input_path)) + snapshot_dir = _resolve_markdown_snapshot_dir(input_path, options.snapshot_dir) + issue_map, pr_map = _report_artifact_maps(snapshot_dir) + output_path = (options.output or input_path.with_suffix(".md")).resolve() + markdown = _markdown_report_text( + report=report, + issue_map=issue_map, + pr_map=pr_map, + ) + write_text(markdown, output_path) + return output_path + + +def _resolve_markdown_snapshot_dir(input_path: Path, snapshot_dir: Path | None) -> Path | None: + if snapshot_dir is not None: + return snapshot_dir.resolve() + candidate = input_path.parent.resolve() + if (candidate / "issues.parquet").exists() or (candidate / "pull_requests.parquet").exists(): + return candidate + return None + + +def _report_artifact_maps( + snapshot_dir: Path | None, +) -> tuple[dict[int, dict[str, Any]], dict[int, dict[str, Any]]]: + if snapshot_dir is None: + return {}, {} + issues = { + int(row["number"]): row + for row in read_parquet_rows(snapshot_dir / "issues.parquet") + if row.get("number") is not None + } + pull_requests = { + int(row["number"]): row + for row in read_parquet_rows(snapshot_dir / "pull_requests.parquet") + if row.get("number") is not None + } + return issues, pull_requests + + +def _markdown_report_text( + *, + report: AnalysisReport, + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], +) -> str: + lines = [ + f"# Analysis Report: {report.repo}", + "", + f"- Snapshot: `{report.snapshot_id}`", + f"- Generated: `{report.generated_at}`", + f"- Evidence quality: `{report.evidence_quality}`", + f"- LLM enrichment: `{str(report.llm_enrichment).lower()}`", + f"- Meta bugs: `{len(report.meta_bugs)}`", + ] + if report.best_issue is not None: + lines.append( + f"- Best issue: {_artifact_markdown_link(report.repo, 'issue', report.best_issue.issue_number, issue_map.get(report.best_issue.issue_number))}" + ) + if report.best_pr is not None: + lines.append( + f"- Best PR: {_artifact_markdown_link(report.repo, 'pull_request', report.best_pr.pr_number, pr_map.get(report.best_pr.pr_number))}" + ) + lines.append("") + + ordered_meta_bugs = sorted( + report.meta_bugs, + key=lambda entry: _meta_bug_sort_key(entry, issue_map, pr_map), + ) + if not ordered_meta_bugs: + lines.append("No meta bugs found.") + lines.append("") + return "\n".join(lines) + + for meta_bug in ordered_meta_bugs: + lines.extend(_meta_bug_markdown_lines(report.repo, meta_bug, issue_map, pr_map)) + return "\n".join(lines).rstrip() + "\n" + + +def _meta_bug_markdown_lines( + repo: str, + meta_bug: MetaBugEntry, + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], +) -> list[str]: + artifact_count = len(meta_bug.issue_numbers) + len(meta_bug.pr_numbers) + latest_activity = _meta_bug_latest_activity(meta_bug, issue_map, pr_map) + issue_numbers_to_render = [ + number for number in meta_bug.issue_numbers if number != meta_bug.canonical_issue_number + ] + lines = [ + f"## {meta_bug.summary}", + "", + f"- Cluster: `{meta_bug.cluster_id}`", + f"- Status: `{meta_bug.status}`", + f"- Confidence: `{meta_bug.confidence:.3f}`", + f"- Artifacts: `{artifact_count}`", + f"- Latest activity: `{latest_activity}`", + ] + if meta_bug.canonical_issue_number is not None: + lines.append( + f"- Canonical issue: {_artifact_markdown_link(repo, 'issue', meta_bug.canonical_issue_number, issue_map.get(meta_bug.canonical_issue_number))}" + ) + if meta_bug.canonical_pr_number is not None: + lines.append( + f"- Canonical PR: {_artifact_markdown_link(repo, 'pull_request', meta_bug.canonical_pr_number, pr_map.get(meta_bug.canonical_pr_number))}" + ) + if meta_bug.evidence_types: + lines.append(f"- Evidence: `{', '.join(meta_bug.evidence_types)}`") + lines.append("") + + if issue_numbers_to_render: + lines.append("### Issues") + lines.append("") + for number in _sorted_artifact_numbers(issue_numbers_to_render, issue_map): + lines.append( + f"- {_artifact_markdown_link(repo, 'issue', number, issue_map.get(number))}{_artifact_suffix(issue_map.get(number), 'issue')}" + ) + lines.append("") + + if meta_bug.pr_numbers: + lines.append("### PRs") + lines.append("") + for number in _sorted_artifact_numbers(meta_bug.pr_numbers, pr_map): + lines.append( + f"- {_artifact_markdown_link(repo, 'pull_request', number, pr_map.get(number))}{_artifact_suffix(pr_map.get(number), 'pull_request')}" + ) + lines.append("") + + if meta_bug.pr_comparisons: + lines.append("### PR comparison") + lines.append("") + for comparison in meta_bug.pr_comparisons: + shared_files = ", ".join(f"`{name}`" for name in comparison.shared_filenames) or "none" + lines.append( + f"- PR #{comparison.left_pr_number} vs PR #{comparison.right_pr_number}: " + f"code `{comparison.code_similarity:.3f}`, " + f"size `{comparison.size_similarity:.3f}`, " + f"files `{comparison.file_overlap:.3f}`, " + f"areas `{comparison.area_overlap:.3f}`, " + f"patch `{comparison.patch_similarity:.3f}`; " + f"shared files: {shared_files}" + ) + lines.append("") + + return lines + + +def _meta_bug_sort_key( + meta_bug: MetaBugEntry, + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], +) -> tuple[int, float, int, str]: + artifact_count = len(meta_bug.issue_numbers) + len(meta_bug.pr_numbers) + latest_activity = _meta_bug_latest_activity_dt(meta_bug, issue_map, pr_map).timestamp() + largest_number = max([*meta_bug.issue_numbers, *meta_bug.pr_numbers], default=0) + return (-artifact_count, -latest_activity, -largest_number, meta_bug.cluster_id) + + +def _meta_bug_latest_activity( + meta_bug: MetaBugEntry, issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]] +) -> str: + latest_row = _meta_bug_latest_row(meta_bug, issue_map, pr_map) + if latest_row is None: + return "unknown" + return str( + latest_row.get("updated_at") + or latest_row.get("created_at") + or latest_row.get("closed_at") + or "unknown" + ) + + +def _meta_bug_latest_activity_dt( + meta_bug: MetaBugEntry, + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], +) -> datetime: + latest_row = _meta_bug_latest_row(meta_bug, issue_map, pr_map) + if latest_row is None: + return datetime(1970, 1, 1, tzinfo=UTC) + return _row_activity_dt(latest_row) + + +def _meta_bug_latest_row( + meta_bug: MetaBugEntry, + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], +) -> dict[str, Any] | None: + rows = [issue_map[number] for number in meta_bug.issue_numbers if number in issue_map] + rows.extend(pr_map[number] for number in meta_bug.pr_numbers if number in pr_map) + if not rows: + return None + return max(rows, key=_row_activity_dt) + + +def _sorted_artifact_numbers(numbers: list[int], row_map: dict[int, dict[str, Any]]) -> list[int]: + return sorted( + numbers, + key=lambda number: ( + -_row_activity_dt(row_map.get(number)).timestamp(), + -number, + ), + ) + + +def _row_activity_dt(row: dict[str, Any] | None) -> datetime: + if not row: + return datetime(1970, 1, 1, tzinfo=UTC) + for field in ("updated_at", "created_at", "closed_at", "merged_at"): + value = row.get(field) + if not value: + continue + try: + return _parse_dt(str(value)) + except ValueError: + continue + return datetime(1970, 1, 1, tzinfo=UTC) + + +def _artifact_markdown_link(repo: str, kind: str, number: int, row: dict[str, Any] | None) -> str: + title = _artifact_title(kind, number, row) + url = _artifact_url(repo, kind, number, row) + return f"[{title}]({url})" + + +def _artifact_title(kind: str, number: int, row: dict[str, Any] | None) -> str: + prefix = "PR" if kind == "pull_request" else "Issue" + title = str((row or {}).get("title") or "").strip() + if not title and kind == "pull_request": + body = str((row or {}).get("body") or "").strip() + if body: + title = body.splitlines()[0].strip()[:120] + if title: + return f"{prefix} #{number}: {title}" + return f"{prefix} #{number}" + + +def _artifact_url(repo: str, kind: str, number: int, row: dict[str, Any] | None) -> str: + html_url = str((row or {}).get("html_url") or "").strip() + if html_url: + return html_url + if repo: + path = "pull" if kind == "pull_request" else "issues" + return f"https://github.com/{repo}/{path}/{number}" + return "#" + + +def _artifact_suffix(row: dict[str, Any] | None, kind: str) -> str: + if not row: + return "" + details: list[str] = [] + state = str(row.get("state") or "").strip() + if state: + details.append(state) + if kind == "pull_request": + if bool(row.get("merged")): + details.append("merged") + if bool(row.get("draft")): + details.append("draft") + timestamp = row.get("updated_at") or row.get("created_at") + if timestamp: + details.append(str(timestamp)) + if not details: + return "" + return f" ({', '.join(details)})" + + +def _resolve_snapshot_dir(options: AnalysisOptions) -> Path: + return resolve_snapshot_source_dir( + snapshot_dir=options.snapshot_dir, + local_snapshots_root=options.output_dir.resolve() / "snapshots", + hf_repo_id=options.hf_repo_id, + hf_revision=options.hf_revision, + hf_materialize_dir=options.hf_materialize_dir, + hf_output_dir=options.output_dir, + ) + + +def _load_snapshot(snapshot_dir: Path) -> SnapshotData: + manifest_path = snapshot_dir / "manifest.json" + manifest = read_json(manifest_path) if manifest_path.exists() else {} + + issues = read_parquet_rows(snapshot_dir / "issues.parquet") + pull_requests = read_parquet_rows(snapshot_dir / "pull_requests.parquet") + comments = read_parquet_rows(snapshot_dir / "comments.parquet") + reviews = read_parquet_rows(snapshot_dir / "reviews.parquet") + review_comments = read_parquet_rows(snapshot_dir / "review_comments.parquet") + pr_files = read_parquet_rows(snapshot_dir / "pr_files.parquet") + pr_diffs = read_parquet_rows(snapshot_dir / "pr_diffs.parquet") + links = read_parquet_rows(snapshot_dir / "links.parquet") + events_path = snapshot_dir / "events.parquet" + events = read_parquet_rows(events_path) + if not any( + [ + issues, + pull_requests, + comments, + reviews, + review_comments, + pr_files, + pr_diffs, + links, + events, + ] + ): + parquet_files = sorted(str(path.name) for path in snapshot_dir.glob("*.parquet")) + raise FileNotFoundError( + f"No analysis tables found in {snapshot_dir}. " + f"Expected local files like issues.parquet/pull_requests.parquet. " + f"Found parquet files: {parquet_files or 'none'}. " + "Use --hf-repo-id for Hugging Face datasets or point --snapshot-dir at a local slop-farmer snapshot." + ) + + repo = ( + manifest.get("repo") + or (issues[0]["repo"] if issues else None) + or (pull_requests[0]["repo"] if pull_requests else None) + or (comments[0]["repo"] if comments else None) + or "" + ) + snapshot_id = manifest.get("snapshot_id") or snapshot_dir.name + evidence_quality = "full" if events_path.exists() and events else "partial" + return SnapshotData( + repo=repo, + snapshot_id=snapshot_id, + snapshot_dir=snapshot_dir, + manifest=manifest, + issues=issues, + pull_requests=pull_requests, + comments=comments, + reviews=reviews, + review_comments=review_comments, + pr_files=pr_files, + pr_diffs=pr_diffs, + links=links, + events=events, + evidence_quality=evidence_quality, + ) + + +async def _build_report(snapshot: SnapshotData, options: AnalysisOptions) -> AnalysisBuildResult: + combined_links = _combined_links(snapshot) + llm_available = _can_use_fast_agent() + hybrid_review_cache = HybridReviewCacheStore( + hybrid_review_cache_dir(snapshot.snapshot_dir), + _hybrid_review_cache_manifest(), + enabled=options.ranking_backend == "hybrid", + ) + if hybrid_review_cache.invalidation_reason is not None: + _analysis_log( + "Hybrid review cache invalidated; ignoring cached entries " + f"({hybrid_review_cache.invalidation_reason})" + ) + issue_map = {int(row["number"]): row for row in snapshot.issues} + pr_map = {int(row["number"]): row for row in snapshot.pull_requests} + suppressed_pr_reasons = suppressed_pull_request_reasons( + snapshot.pull_requests, + snapshot.pr_files, + compile_cluster_suppression_rules(options.cluster_suppression_rules), + ) + if suppressed_pr_reasons: + original_pr_count = len(pr_map) + pr_map = { + number: row for number, row in pr_map.items() if number not in suppressed_pr_reasons + } + _analysis_log( + f"Suppressing {len(suppressed_pr_reasons)} routine PRs from clustering: " + f"{len(pr_map)}/{original_pr_count} PRs kept" + ) + if options.open_prs_only: + original_pr_count = len(pr_map) + pr_map = { + number: row + for number, row in pr_map.items() + if str(row.get("state") or "").lower() == "open" + } + _analysis_log( + f"Restricting PR analysis to open PRs only: {len(pr_map)}/{original_pr_count} PRs kept " + "(draft PRs remain eligible)" + ) + comment_map = { + int(row["github_id"]): row for row in snapshot.comments if row.get("github_id") is not None + } + review_map = { + int(row["github_id"]): row for row in snapshot.reviews if row.get("github_id") is not None + } + review_comment_map = { + int(row["github_id"]): row + for row in snapshot.review_comments + if row.get("github_id") is not None + } + + inbound_references, _ = _reference_counts( + snapshot.repo, + combined_links, + issue_map, + pr_map, + comment_map, + review_map, + review_comment_map, + ) + explicit_issue_link_targets = _explicit_pr_issue_targets( + repo=snapshot.repo, + combined_links=combined_links, + issue_map=issue_map, + pr_map=pr_map, + ) + features = _artifact_features( + snapshot, + options=options, + issue_map=issue_map, + pr_map=pr_map, + inbound_references=inbound_references, + explicit_issue_link_targets=explicit_issue_link_targets, + ) + issue_hard_pairs = _issue_hard_pairs( + repo=snapshot.repo, + combined_links=combined_links, + issue_map=issue_map, + pr_map=pr_map, + comment_map=comment_map, + review_map=review_map, + review_comment_map=review_comment_map, + ) + issue_soft_candidates = _issue_soft_candidates(issue_map, features, issue_hard_pairs) + pr_soft_candidates, pr_pair_target_issues = _pr_duplicate_candidates( + options=options, + snapshot=snapshot, + issue_map=issue_map, + pr_map=pr_map, + features=features, + ) + review_semaphore = asyncio.Semaphore(options.hybrid_llm_concurrency) + ( + (accepted_issue_pairs, issue_llm_enabled, issue_llm_reviews), + (accepted_pr_pairs, pr_llm_enabled, pr_llm_reviews), + ) = await asyncio.gather( + _accepted_soft_pairs( + options=options, + snapshot=snapshot, + features=features, + hard_pairs=issue_hard_pairs, + soft_candidates=issue_soft_candidates, + label="issue", + hybrid_review_cache=hybrid_review_cache, + llm_available=llm_available, + review_semaphore=review_semaphore, + ), + _accepted_soft_pairs( + options=options, + snapshot=snapshot, + features=features, + hard_pairs={}, + soft_candidates=pr_soft_candidates, + label="pull_request", + hybrid_review_cache=hybrid_review_cache, + llm_available=llm_available, + review_semaphore=review_semaphore, + ), + ) + issue_pairs = dict(issue_hard_pairs) + for pair, detail in accepted_issue_pairs.items(): + issue_pairs.setdefault(pair, set()).update( + detail.get("evidence_types") or {"soft_similarity"} + ) + pr_pairs: dict[tuple[str, str], set[str]] = {} + for pair, detail in accepted_pr_pairs.items(): + pr_pairs.setdefault(pair, set()).update(detail.get("evidence_types") or {"soft_similarity"}) + + issue_clusters = _clusters( + snapshot=snapshot, + features=features, + final_pairs=issue_pairs, + pair_target_issues=defaultdict(set), + llm_cluster_payloads={}, + ) + pr_clusters = _clusters( + snapshot=snapshot, + features=features, + final_pairs=pr_pairs, + pair_target_issues=pr_pair_target_issues, + llm_cluster_payloads={}, + ) + clusters = _meta_bug_clusters( + features=features, + issue_clusters=issue_clusters, + pr_clusters=pr_clusters, + explicit_issue_link_targets=explicit_issue_link_targets, + issue_map=issue_map, + pr_map=pr_map, + ) + + meta_clusters = sorted( + clusters, key=lambda cluster: (-cluster.cluster_score, cluster.cluster_id) + )[: options.max_clusters] + duplicate_issues = [ + DuplicateIssuesEntry( + cluster_id=cluster.cluster_id, + canonical_issue_number=cluster.canonical_issue_number, + duplicate_issue_numbers=[ + number + for number in cluster.issue_numbers + if number != cluster.canonical_issue_number + ], + reason=_duplicate_issue_reason(cluster), + ) + for cluster in clusters + if cluster.canonical_issue_number is not None and len(cluster.issue_numbers) >= 2 + ] + duplicate_prs = [ + DuplicatePrsEntry( + cluster_id=cluster.cluster_id, + canonical_pr_number=cluster.canonical_pr_number, + duplicate_pr_numbers=[ + number for number in cluster.pr_numbers if number != cluster.canonical_pr_number + ], + target_issue_number=cluster.target_issue_number, + reason=_duplicate_pr_reason(cluster), + ) + for cluster in clusters + if cluster.canonical_pr_number is not None and len(cluster.pr_numbers) >= 2 + ] + best_issue = _best_issue(meta_clusters, features) + best_pr = _best_pr(meta_clusters, features) + return AnalysisBuildResult( + report=AnalysisReport( + schema_version="1.0", + repo=snapshot.repo, + snapshot_id=snapshot.snapshot_id, + generated_at=_iso_now(), + evidence_quality=snapshot.evidence_quality, + llm_enrichment=issue_llm_enabled or pr_llm_enabled, + meta_bugs=[ + MetaBugEntry( + cluster_id=cluster.cluster_id, + summary=cluster.summary, + status=cluster.status, + confidence=round(cluster.confidence, 3), + canonical_issue_number=cluster.canonical_issue_number, + canonical_pr_number=cluster.canonical_pr_number, + issue_numbers=cluster.issue_numbers, + pr_numbers=cluster.pr_numbers, + evidence_types=cluster.evidence_types, + pr_comparisons=_cluster_pr_comparisons(cluster, features), + ) + for cluster in meta_clusters + ], + duplicate_issues=duplicate_issues, + duplicate_prs=duplicate_prs, + best_issue=best_issue, + best_pr=best_pr, + ), + llm_reviews=issue_llm_reviews + pr_llm_reviews, + ) + + +def _iso_now() -> str: + return datetime.now(tz=UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def _combined_links(snapshot: SnapshotData) -> list[dict[str, Any]]: + owner, repo_name = snapshot.repo.split("/", 1) + extracted_at = snapshot.manifest.get("extracted_at") or _iso_now() + rows = list(snapshot.links) + for issue in snapshot.issues: + rows.extend( + build_text_link_rows( + repo=snapshot.repo, + owner=owner, + repo_name=repo_name, + source_type="issue", + source_number=int(issue["number"]), + source_id=issue.get("github_id"), + body=issue.get("body"), + snapshot_id=snapshot.snapshot_id, + extracted_at=extracted_at, + ) + ) + for pr in snapshot.pull_requests: + rows.extend( + build_text_link_rows( + repo=snapshot.repo, + owner=owner, + repo_name=repo_name, + source_type="pull_request", + source_number=int(pr["number"]), + source_id=pr.get("github_id"), + body=pr.get("body"), + snapshot_id=snapshot.snapshot_id, + extracted_at=extracted_at, + ) + ) + for comment in snapshot.comments: + if comment.get("parent_number") is None: + continue + rows.extend( + build_text_link_rows( + repo=snapshot.repo, + owner=owner, + repo_name=repo_name, + source_type="comment", + source_number=int(comment["parent_number"]), + source_id=comment.get("github_id"), + body=comment.get("body"), + snapshot_id=snapshot.snapshot_id, + extracted_at=extracted_at, + ) + ) + for review in snapshot.reviews: + rows.extend( + build_text_link_rows( + repo=snapshot.repo, + owner=owner, + repo_name=repo_name, + source_type="review", + source_number=int(review["pull_request_number"]), + source_id=review.get("github_id"), + body=review.get("body"), + snapshot_id=snapshot.snapshot_id, + extracted_at=extracted_at, + ) + ) + for review_comment in snapshot.review_comments: + rows.extend( + build_text_link_rows( + repo=snapshot.repo, + owner=owner, + repo_name=repo_name, + source_type="review_comment", + source_number=int(review_comment["pull_request_number"]), + source_id=review_comment.get("github_id"), + body=review_comment.get("body"), + snapshot_id=snapshot.snapshot_id, + extracted_at=extracted_at, + ) + ) + deduped: dict[tuple[Any, ...], dict[str, Any]] = {} + for row in rows: + key = tuple(row.get(field) for field in LINK_KEY_FIELDS) + deduped[key] = row + return list(deduped.values()) + + +def _reference_counts( + repo: str, + links: list[dict[str, Any]], + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], + comment_map: dict[int, dict[str, Any]], + review_map: dict[int, dict[str, Any]], + review_comment_map: dict[int, dict[str, Any]], +) -> tuple[Counter[str], defaultdict[int, set[int]]]: + inbound_references: Counter[str] = Counter() + explicit_issue_link_targets: defaultdict[int, set[int]] = defaultdict(set) + for row in links: + source_node = _resolve_source_node( + row, issue_map, pr_map, comment_map, review_map, review_comment_map + ) + target_node = _resolve_target_node(repo, row, issue_map, pr_map) + if source_node is not None and target_node is not None: + inbound_references[target_node] += 1 + if ( + source_node + and target_node + and source_node.startswith("pull_request:") + and target_node.startswith("issue:") + ): + explicit_issue_link_targets[int(source_node.split(":", 1)[1])].add( + int(target_node.split(":", 1)[1]) + ) + return inbound_references, explicit_issue_link_targets + + +def _artifact_features( + snapshot: SnapshotData, + *, + options: AnalysisOptions, + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], + inbound_references: Counter[str], + explicit_issue_link_targets: defaultdict[int, set[int]], +) -> dict[str, ArtifactFeature]: + template_cleanup = build_template_cleanup_settings( + mode=options.pr_template_cleanup_mode, + strip_html_comments=options.pr_template_strip_html_comments, + trim_closing_reference_prefix=options.pr_template_trim_closing_reference_prefix, + section_patterns=options.pr_template_section_patterns, + line_patterns=options.pr_template_line_patterns, + ) + comments_by_parent: defaultdict[int, int] = defaultdict(int) + reviews_by_pr: defaultdict[int, int] = defaultdict(int) + review_comments_by_pr: defaultdict[int, int] = defaultdict(int) + filenames_by_pr: defaultdict[int, set[str]] = defaultdict(set) + file_ranges_by_pr: defaultdict[int, dict[str, list[tuple[int, int]]]] = defaultdict( + lambda: defaultdict(list) + ) + patch_tokens_by_pr: defaultdict[int, list[str]] = defaultdict(list) + diff_preview_by_pr: dict[int, str] = {} + for comment in snapshot.comments: + parent_number = comment.get("parent_number") + if parent_number is not None: + comments_by_parent[int(parent_number)] += 1 + for review in snapshot.reviews: + reviews_by_pr[int(review["pull_request_number"])] += 1 + for review_comment in snapshot.review_comments: + review_comments_by_pr[int(review_comment["pull_request_number"])] += 1 + for pr_file in snapshot.pr_files: + pr_number = pr_file.get("pull_request_number") + filename = pr_file.get("filename") + if pr_number is None or not filename: + continue + filenames_by_pr[int(pr_number)].add(str(filename)) + patch = pr_file.get("patch") + if patch: + file_ranges_by_pr[int(pr_number)][str(filename)].extend(_patch_ranges(str(patch))) + patch_tokens_by_pr[int(pr_number)].extend(_patch_content_tokens(str(patch))) + for pr_diff in snapshot.pr_diffs: + pr_number = pr_diff.get("pull_request_number") + diff = pr_diff.get("diff") + if pr_number is None or not diff: + continue + diff_preview_by_pr[int(pr_number)] = str(diff)[:1200] + + features: dict[str, ArtifactFeature] = {} + for number, issue in issue_map.items(): + title = issue.get("title") or "" + body = issue.get("body") or "" + node_id = f"issue:{number}" + title_tokens = set(_tokenize(title, remove_stopwords=True)) + features[node_id] = ArtifactFeature( + node_id=node_id, + kind="issue", + number=number, + row=issue, + tokens=_tokenize(f"{title} {body}", remove_stopwords=True), + title_tokens=title_tokens, + title_length=len(title), + body_length=len(body), + discussion_activity=max( + int(issue.get("comments_count") or 0), comments_by_parent[number] + ), + review_activity=0, + inbound_references=inbound_references[node_id], + explicit_issue_links=0, + explicit_issue_targets=[], + diff_size=0, + filenames=[], + diff_preview=None, + file_ranges_by_name={}, + patch_tokens=[], + ) + for number, pr in pr_map.items(): + title = pr.get("title") or "" + body = pr.get("body") or "" + similarity_body = _strip_pull_request_template(body, settings=template_cleanup) + node_id = f"pull_request:{number}" + discussion_activity = max(int(pr.get("comments_count") or 0), comments_by_parent[number]) + review_activity = reviews_by_pr[number] + max( + int(pr.get("review_comments_count") or 0), review_comments_by_pr[number] + ) + diff_size = ( + int(pr.get("additions") or 0) + + int(pr.get("deletions") or 0) + + int(pr.get("changed_files") or 0) * 10 + ) + features[node_id] = ArtifactFeature( + node_id=node_id, + kind="pull_request", + number=number, + row=pr, + tokens=_tokenize(f"{title} {similarity_body}", remove_stopwords=True), + title_tokens=set(_tokenize(title, remove_stopwords=True)), + title_length=len(title), + body_length=len(body), + discussion_activity=discussion_activity, + review_activity=review_activity, + inbound_references=inbound_references[node_id], + explicit_issue_links=len(explicit_issue_link_targets[number]), + explicit_issue_targets=sorted(explicit_issue_link_targets[number]), + diff_size=diff_size, + filenames=sorted(filenames_by_pr[number]), + diff_preview=diff_preview_by_pr.get(number), + file_ranges_by_name={ + filename: sorted(ranges) for filename, ranges in file_ranges_by_pr[number].items() + }, + patch_tokens=patch_tokens_by_pr[number], + ) + return features + + +def _explicit_pr_issue_targets( + *, + repo: str, + combined_links: list[dict[str, Any]], + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], +) -> defaultdict[int, set[int]]: + targets: defaultdict[int, set[int]] = defaultdict(set) + owner, repo_name = repo.split("/", 1) + for row in combined_links: + if row.get("source_type") != "pull_request": + continue + if row.get("link_type") != "closing_reference": + continue + if row.get("target_owner") != owner or row.get("target_repo") != repo_name: + continue + source_number = row.get("source_number") + target_number = row.get("target_number") + if source_number is None or target_number is None: + continue + pr_number = int(source_number) + issue_number = int(target_number) + if pr_number not in pr_map or issue_number not in issue_map: + continue + targets[pr_number].add(issue_number) + return targets + + +def _issue_hard_pairs( + *, + repo: str, + combined_links: list[dict[str, Any]], + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], + comment_map: dict[int, dict[str, Any]], + review_map: dict[int, dict[str, Any]], + review_comment_map: dict[int, dict[str, Any]], +) -> dict[tuple[str, str], set[str]]: + hard_pairs: dict[tuple[str, str], set[str]] = defaultdict(set) + for row in combined_links: + source_node = _resolve_source_node( + row, issue_map, pr_map, comment_map, review_map, review_comment_map + ) + target_node = _resolve_target_node(repo, row, issue_map, pr_map) + if source_node is None or target_node is None or source_node == target_node: + continue + if ( + row["link_type"] == "duplicate_reference" + and source_node.startswith("issue:") + and target_node.startswith("issue:") + ): + hard_pairs[_pair_key(source_node, target_node)].add("duplicate_reference") + return hard_pairs + + +def _issue_soft_candidates( + issue_map: dict[int, dict[str, Any]], + features: dict[str, ArtifactFeature], + hard_pairs: dict[tuple[str, str], set[str]], +) -> dict[tuple[str, str], dict[str, Any]]: + hard_neighbors: defaultdict[str, set[str]] = defaultdict(set) + for left, right in hard_pairs: + hard_neighbors[left].add(right) + hard_neighbors[right].add(left) + candidates = _bm25_candidates( + numbers=sorted(issue_map), + kind="issue", + features=features, + hard_neighbors=hard_neighbors, + max_candidates=5, + extra_filter=_issue_soft_filter, + ) + for detail in candidates.values(): + detail["evidence_types"] = {"soft_similarity"} + detail["deterministic_accept"] = False + return candidates + + +def _pr_duplicate_candidates( + *, + options: AnalysisOptions, + snapshot: SnapshotData | None, + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], + features: dict[str, ArtifactFeature], +) -> tuple[dict[tuple[str, str], dict[str, Any]], dict[tuple[str, str], set[int]]]: + del options, snapshot + candidates: dict[tuple[str, str], dict[str, Any]] = {} + pair_target_issues: dict[tuple[str, str], set[int]] = defaultdict(set) + explicit_targets = { + number: set(features[f"pull_request:{number}"].explicit_issue_targets) + for number in pr_map + if features[f"pull_request:{number}"].explicit_issue_targets + } + + for pair, detail in _bm25_candidates( + numbers=sorted(pr_map), + kind="pull_request", + features=features, + hard_neighbors=defaultdict(set), + max_candidates=5, + extra_filter=_pr_soft_filter, + ).items(): + left = features[pair[0]] + right = features[pair[1]] + shared_files = _shared_filenames(left, right) + _merge_candidate_detail( + candidates, + pair, + { + **detail, + "evidence_types": {"soft_similarity"}, + "shared_filenames": shared_files, + "shared_targets": sorted( + explicit_targets.get(left.number, set()) + & explicit_targets.get(right.number, set()) + ), + "deterministic_accept": detail["jaccard"] >= 0.35, + }, + ) + + prs_by_target: defaultdict[int, set[str]] = defaultdict(set) + for pr_number, targets in explicit_targets.items(): + for target in targets: + prs_by_target[target].add(f"pull_request:{pr_number}") + for target_issue, pr_nodes in prs_by_target.items(): + pr_nodes_list = sorted(pr_nodes) + if len(pr_nodes_list) < 2: + continue + scored_pairs: list[tuple[tuple[Any, ...], tuple[str, str], dict[str, Any]]] = [] + for index, left_node in enumerate(pr_nodes_list): + for right_node in pr_nodes_list[index + 1 :]: + pair = _pair_key(left_node, right_node) + left = features[left_node] + right = features[right_node] + if not _pr_soft_filter(left, right): + continue + shared_files = _shared_filenames(left, right) + text_jaccard = _jaccard_sets(set(left.tokens), set(right.tokens)) + if text_jaccard < LLM_SHARED_TARGET_MIN_TEXT_JACCARD and not shared_files: + continue + score = max(5.0, text_jaccard * 10.0 + len(shared_files)) + scored_pairs.append( + ( + ( + -len(shared_files), + -text_jaccard, + -score, + pair[0], + pair[1], + ), + pair, + { + "left": pair[0], + "right": pair[1], + "kind": "pull_request", + "score": score, + "jaccard": text_jaccard, + "evidence_types": {"shared_issue_target"}, + "shared_targets": [target_issue], + "shared_filenames": shared_files, + "deterministic_accept": text_jaccard >= 0.2, + }, + ) + ) + for pair, detail in _bounded_shared_target_pairs(scored_pairs): + _merge_candidate_detail(candidates, pair, detail) + pair_target_issues[pair].add(target_issue) + return candidates, pair_target_issues + + +def _bounded_shared_target_pairs( + scored_pairs: list[tuple[tuple[Any, ...], tuple[str, str], dict[str, Any]]], +) -> list[tuple[tuple[str, str], dict[str, Any]]]: + ordered = sorted(scored_pairs, key=lambda item: item[0]) + if not ordered: + return [] + + parent = {node: node for _, pair, _ in ordered for node in pair} + + def find(node: str) -> str: + root = node + while parent[root] != root: + root = parent[root] + while parent[node] != node: + next_node = parent[node] + parent[node] = root + node = next_node + return root + + def union(left: str, right: str) -> None: + left_root = find(left) + right_root = find(right) + if left_root != right_root: + parent[right_root] = left_root + + neighbor_counts: Counter[str] = Counter() + selected_pairs: set[tuple[str, str]] = set() + selected: list[tuple[tuple[str, str], dict[str, Any]]] = [] + + def record(pair: tuple[str, str], detail: dict[str, Any]) -> None: + selected_pairs.add(pair) + selected.append((pair, detail)) + neighbor_counts[pair[0]] += 1 + neighbor_counts[pair[1]] += 1 + + extra_pairs_added = 0 + + def can_add_redundant_pair(pair: tuple[str, str]) -> bool: + return ( + extra_pairs_added < LLM_SHARED_TARGET_MAX_EXTRA_PAIRS_PER_TARGET + and neighbor_counts[pair[0]] < LLM_SHARED_TARGET_MAX_NEIGHBORS_PER_PR + and neighbor_counts[pair[1]] < LLM_SHARED_TARGET_MAX_NEIGHBORS_PER_PR + ) + + for _, pair, detail in ordered: + if pair in selected_pairs: + continue + if find(pair[0]) == find(pair[1]): + continue + record(pair, detail) + union(pair[0], pair[1]) + + for _, pair, detail in ordered: + if pair in selected_pairs or not can_add_redundant_pair(pair): + continue + record(pair, detail) + extra_pairs_added += 1 + + return selected + + +def _merge_candidate_detail( + candidates: dict[tuple[str, str], dict[str, Any]], + pair: tuple[str, str], + detail: dict[str, Any], +) -> None: + current = candidates.get(pair) + if current is None: + copied = dict(detail) + copied["evidence_types"] = set(detail.get("evidence_types") or []) + copied["shared_targets"] = list(detail.get("shared_targets") or []) + copied["shared_filenames"] = list(detail.get("shared_filenames") or []) + copied["deterministic_accept"] = bool(detail.get("deterministic_accept")) + candidates[pair] = copied + return + current["score"] = max(float(current.get("score") or 0.0), float(detail.get("score") or 0.0)) + current["jaccard"] = max( + float(current.get("jaccard") or 0.0), float(detail.get("jaccard") or 0.0) + ) + current["evidence_types"] = set(current.get("evidence_types") or []) | set( + detail.get("evidence_types") or [] + ) + current["shared_targets"] = sorted( + set(current.get("shared_targets") or []) | set(detail.get("shared_targets") or []) + ) + current["shared_filenames"] = sorted( + set(current.get("shared_filenames") or []) | set(detail.get("shared_filenames") or []) + )[:10] + current["deterministic_accept"] = bool(current.get("deterministic_accept")) or bool( + detail.get("deterministic_accept") + ) + + +def _shared_filenames(left: ArtifactFeature, right: ArtifactFeature) -> list[str]: + return sorted(set(left.filenames) & set(right.filenames))[:10] + + +def _hard_pairs( + snapshot: SnapshotData, + *, + combined_links: list[dict[str, Any]], + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], + comment_map: dict[int, dict[str, Any]], + review_map: dict[int, dict[str, Any]], + review_comment_map: dict[int, dict[str, Any]], +) -> tuple[dict[tuple[str, str], set[str]], dict[tuple[str, str], set[int]]]: + hard_pairs: dict[tuple[str, str], set[str]] = defaultdict(set) + pair_target_issues: dict[tuple[str, str], set[int]] = defaultdict(set) + + for row in combined_links: + source_node = _resolve_source_node( + row, issue_map, pr_map, comment_map, review_map, review_comment_map + ) + target_node = _resolve_target_node(snapshot.repo, row, issue_map, pr_map) + if source_node is None or target_node is None or source_node == target_node: + continue + if ( + row["link_type"] == "duplicate_reference" + and source_node.startswith("issue:") + and target_node.startswith("issue:") + ): + hard_pairs[_pair_key(source_node, target_node)].add("duplicate_reference") + if ( + row["link_type"] == "closing_reference" + and source_node.startswith("pull_request:") + and target_node.startswith("issue:") + ): + hard_pairs[_pair_key(source_node, target_node)].add("closing_reference") + + prs_by_target: defaultdict[int, set[str]] = defaultdict(set) + for row in combined_links: + source_node = _resolve_source_node( + row, issue_map, pr_map, comment_map, review_map, review_comment_map + ) + target_node = _resolve_target_node(snapshot.repo, row, issue_map, pr_map) + if ( + source_node + and target_node + and source_node.startswith("pull_request:") + and target_node.startswith("issue:") + ): + prs_by_target[int(target_node.split(":", 1)[1])].add(source_node) + for target_issue, pr_nodes in prs_by_target.items(): + pr_nodes_list = sorted(pr_nodes) + if len(pr_nodes_list) < 2: + continue + for index, left in enumerate(pr_nodes_list): + for right in pr_nodes_list[index + 1 :]: + pair = _pair_key(left, right) + hard_pairs[pair].add("shared_issue_target") + pair_target_issues[pair].add(target_issue) + + for event in snapshot.events: + source_number = event.get("source_issue_number") + if event.get("event") != "cross-referenced" or source_number is None: + continue + parent_kind = event.get("parent_kind") + parent_number = event.get("parent_number") + if parent_kind not in {"issue", "pull_request"} or parent_number is None: + continue + parent_node = f"{parent_kind}:{int(parent_number)}" + if parent_node not in features_from_maps(issue_map, pr_map): + continue + target_node = _node_from_number(int(source_number), issue_map, pr_map) + if target_node is None or target_node == parent_node: + continue + hard_pairs[_pair_key(parent_node, target_node)].add("timeline:cross-referenced") + + return hard_pairs, pair_target_issues + + +def features_from_maps( + issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]] +) -> set[str]: + return {f"issue:{number}" for number in issue_map} | { + f"pull_request:{number}" for number in pr_map + } + + +def _soft_candidates( + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], + features: dict[str, ArtifactFeature], + hard_pairs: dict[tuple[str, str], set[str]], +) -> dict[tuple[str, str], dict[str, Any]]: + candidates: dict[tuple[str, str], dict[str, Any]] = {} + hard_neighbors: defaultdict[str, set[str]] = defaultdict(set) + for left, right in hard_pairs: + if left.split(":", 1)[0] == right.split(":", 1)[0]: + hard_neighbors[left].add(right) + hard_neighbors[right].add(left) + issue_candidates = _bm25_candidates( + numbers=sorted(issue_map), + kind="issue", + features=features, + hard_neighbors=hard_neighbors, + max_candidates=5, + extra_filter=_issue_soft_filter, + ) + pr_candidates = _bm25_candidates( + numbers=sorted(pr_map), + kind="pull_request", + features=features, + hard_neighbors=hard_neighbors, + max_candidates=5, + extra_filter=_pr_soft_filter, + ) + candidates.update(issue_candidates) + candidates.update(pr_candidates) + return candidates + + +def _bm25_candidates( + *, + numbers: list[int], + kind: str, + features: dict[str, ArtifactFeature], + hard_neighbors: defaultdict[str, set[str]], + max_candidates: int, + extra_filter: Any, +) -> dict[tuple[str, str], dict[str, Any]]: + if not numbers: + return {} + nodes = [f"{kind}:{number}" for number in numbers] + token_sets = [set(features[node].tokens) for node in nodes] + if len(nodes) > 4000: + return _sparse_token_candidates( + nodes=nodes, + kind=kind, + features=features, + token_sets=token_sets, + hard_neighbors=hard_neighbors, + max_candidates=max_candidates, + extra_filter=extra_filter, + ) + corpus = [features[node].tokens or ["empty"] for node in nodes] + bm25 = BM25Okapi(corpus) + candidates: dict[tuple[str, str], dict[str, Any]] = {} + for index, node in enumerate(nodes): + feature = features[node] + if not feature.tokens: + continue + scores = bm25.get_scores(feature.tokens) + ranked = sorted(range(len(nodes)), key=lambda position: scores[position], reverse=True) + accepted = 0 + for candidate_index in ranked: + if candidate_index == index: + continue + candidate_node = nodes[candidate_index] + if candidate_node in hard_neighbors[node]: + continue + score = float(scores[candidate_index]) + if score <= 0: + continue + candidate_feature = features[candidate_node] + jaccard = _jaccard_sets(token_sets[index], token_sets[candidate_index]) + if jaccard < 0.2: + continue + if not extra_filter(feature, candidate_feature): + continue + pair = _pair_key(node, candidate_node) + current = candidates.get(pair) + if current is None or score > current["score"]: + candidates[pair] = { + "left": pair[0], + "right": pair[1], + "kind": kind, + "score": score, + "jaccard": jaccard, + } + accepted += 1 + if accepted >= max_candidates: + break + return candidates + + +def _sparse_token_candidates( + *, + nodes: list[str], + kind: str, + features: dict[str, ArtifactFeature], + token_sets: list[set[str]], + hard_neighbors: defaultdict[str, set[str]], + max_candidates: int, + extra_filter: Any, +) -> dict[tuple[str, str], dict[str, Any]]: + anchor_tokens: list[set[str]] = [] + inverted: defaultdict[str, list[int]] = defaultdict(list) + for index, node in enumerate(nodes): + tokens = features[node].title_tokens or token_sets[index] + anchor_tokens.append(tokens) + for token in tokens: + inverted[token].append(index) + + candidates: dict[tuple[str, str], dict[str, Any]] = {} + for index, node in enumerate(nodes): + feature = features[node] + if not token_sets[index]: + continue + probe_tokens = sorted(anchor_tokens[index], key=lambda token: len(inverted[token]))[:8] + overlap_scores: Counter[int] = Counter() + for token in probe_tokens: + for candidate_index in inverted[token]: + if candidate_index != index: + overlap_scores[candidate_index] += 1 + accepted = 0 + for candidate_index, overlap in overlap_scores.most_common(): + candidate_node = nodes[candidate_index] + if candidate_node in hard_neighbors[node]: + continue + candidate_feature = features[candidate_node] + jaccard = _jaccard_sets(token_sets[index], token_sets[candidate_index]) + if jaccard < 0.2: + continue + if not extra_filter(feature, candidate_feature): + continue + pair = _pair_key(node, candidate_node) + score = float(overlap) + current = candidates.get(pair) + if current is None or score > current["score"]: + candidates[pair] = { + "left": pair[0], + "right": pair[1], + "kind": kind, + "score": score, + "jaccard": jaccard, + } + accepted += 1 + if accepted >= max_candidates: + break + return candidates + + +def _issue_soft_filter(left: ArtifactFeature, right: ArtifactFeature) -> bool: + if _days_between(left.row.get("created_at"), right.row.get("created_at")) <= 365: + return True + return len(left.title_tokens & right.title_tokens) >= 3 + + +def _pr_soft_filter(left: ArtifactFeature, right: ArtifactFeature) -> bool: + if not left.row.get("base_ref") or left.row.get("base_ref") != right.row.get("base_ref"): + return False + return _days_between(left.row.get("created_at"), right.row.get("created_at")) <= 180 + + +def _estimate_packet_size(packet: dict[str, Any], model: str) -> PacketBudget: + del model + serialized = json.dumps(packet, indent=2, sort_keys=True) + estimated_input_tokens = max( + 1, (len(serialized) + LLM_PACKET_CHARS_PER_TOKEN - 1) // LLM_PACKET_CHARS_PER_TOKEN + ) + return PacketBudget( + node_count=len(packet["nodes"]), + item_count=len(packet["items"]), + soft_pair_count=len(packet["soft_pairs"]), + serialized_chars=len(serialized), + estimated_input_tokens=estimated_input_tokens, + estimated_eval_tokens=estimated_input_tokens * 2 + 256, + ) + + +def _packet_budget_json(budget: PacketBudget) -> dict[str, int]: + return { + "node_count": budget.node_count, + "item_count": budget.item_count, + "soft_pair_count": budget.soft_pair_count, + "serialized_chars": budget.serialized_chars, + "estimated_input_tokens": budget.estimated_input_tokens, + "estimated_eval_tokens": budget.estimated_eval_tokens, + } + + +def _packet_over_budget(budget: PacketBudget) -> bool: + return ( + budget.node_count > LLM_MAX_NODES_PER_PACKET + or budget.soft_pair_count > LLM_MAX_SOFT_PAIRS_PER_PACKET + or budget.estimated_input_tokens > LLM_MAX_INPUT_TOKENS + ) + + +def _soft_pair_review_sort_key(pair: dict[str, Any]) -> tuple[Any, ...]: + return ( + 0 if not bool(pair.get("deterministic_accept", True)) else 1, + -len(pair.get("shared_targets") or []), + -float(pair.get("score") or 0.0), + -float(pair.get("jaccard") or 0.0), + str(pair["left"]), + str(pair["right"]), + ) + + +def _review_subpacket(packet: dict[str, Any], soft_pairs: list[dict[str, Any]]) -> dict[str, Any]: + node_ids = { + node_id for pair in soft_pairs for node_id in (str(pair["left"]), str(pair["right"])) + } + soft_pair_keys = {_pair_key(str(pair["left"]), str(pair["right"])) for pair in soft_pairs} + items_by_node = {str(item["node_id"]): item for item in packet["items"]} + pair_evidence: dict[str, list[str]] = {} + for key, evidence in packet["pair_evidence"].items(): + left, right = key.split("|", 1) + if left not in node_ids or right not in node_ids: + continue + filtered = sorted(value for value in evidence if value != "soft_similarity") + if key in soft_pair_keys: + pair_evidence[key] = sorted(evidence) + elif filtered: + pair_evidence[key] = filtered + nodes = sorted(node_ids) + return { + "nodes": nodes, + "items": [dict(items_by_node[node]) for node in nodes], + "pair_evidence": pair_evidence, + "soft_pairs": [dict(pair) for pair in soft_pairs], + } + + +def _split_packet_for_review(packet: dict[str, Any], model: str) -> list[dict[str, Any]]: + if not packet["soft_pairs"]: + return [packet] + if not _packet_over_budget(_estimate_packet_size(packet, model)): + return [packet] + batches: list[list[dict[str, Any]]] = [] + current_batch: list[dict[str, Any]] = [] + for soft_pair in sorted(packet["soft_pairs"], key=_soft_pair_review_sort_key): + candidate_batch = [*current_batch, soft_pair] + candidate_packet = _review_subpacket(packet, candidate_batch) + if current_batch and _packet_over_budget(_estimate_packet_size(candidate_packet, model)): + batches.append(current_batch) + current_batch = [soft_pair] + continue + current_batch = candidate_batch + if current_batch: + batches.append(current_batch) + return [_review_subpacket(packet, batch) for batch in batches] + + +def _trim_packet_for_llm( + packet: dict[str, Any], *, max_diff_chars: int, max_filenames: int +) -> dict[str, Any]: + return { + "nodes": list(packet["nodes"]), + "items": [ + { + **item, + "filenames": list(item.get("filenames") or [])[:max_filenames], + "diff_preview": ( + None + if item.get("diff_preview") is None + else str(item["diff_preview"])[:max_diff_chars] + ), + } + for item in packet["items"] + ], + "pair_evidence": {key: list(values) for key, values in packet["pair_evidence"].items()}, + "soft_pairs": [dict(pair) for pair in packet["soft_pairs"]], + } + + +def _prepare_packet_for_llm( + packet: dict[str, Any], model: str, *, split: bool +) -> PreparedLlmPacket | None: + original_budget = _estimate_packet_size(packet, model) + if not _packet_over_budget(original_budget): + return PreparedLlmPacket( + packet=packet, + budget=original_budget, + original_budget=original_budget, + trimmed=False, + aggressively_trimmed=False, + split=split, + ) + trim_levels = ( + (LLM_MAX_DIFF_CHARS_PER_ITEM, LLM_MAX_FILENAMES_PER_ITEM, False), + ( + max(120, LLM_MAX_DIFF_CHARS_PER_ITEM // 2), + max(2, LLM_MAX_FILENAMES_PER_ITEM // 2), + True, + ), + ) + for max_diff_chars, max_filenames, aggressively_trimmed in trim_levels: + trimmed_packet = _trim_packet_for_llm( + packet, + max_diff_chars=max_diff_chars, + max_filenames=max_filenames, + ) + budget = _estimate_packet_size(trimmed_packet, model) + if not _packet_over_budget(budget): + return PreparedLlmPacket( + packet=trimmed_packet, + budget=budget, + original_budget=original_budget, + trimmed=True, + aggressively_trimmed=aggressively_trimmed, + split=split, + ) + return None + + +def _accepted_nontrivial_soft_edge( + packet: dict[str, Any], analyst_result: ClusterAnalystResponse +) -> bool: + accepted = { + _pair_key(verdict.left, verdict.right) + for verdict in analyst_result.soft_edge_verdicts + if verdict.accept + } + return any( + not bool(pair.get("deterministic_accept", True)) + and _pair_key(str(pair["left"]), str(pair["right"])) in accepted + for pair in packet["soft_pairs"] + ) + + +def _should_run_evaluator( + packet: dict[str, Any], + budget: PacketBudget, + *, + split: bool, + aggressively_trimmed: bool, + analyst_result: ClusterAnalystResponse, +) -> bool: + del split + if aggressively_trimmed: + return False + if budget.estimated_eval_tokens > LLM_SKIP_EVALUATOR_ABOVE_TOKENS: + return False + return _accepted_nontrivial_soft_edge(packet, analyst_result) + + +def _classify_llm_error(exc: Exception) -> str: + message = f"{type(exc).__name__}: {exc}".lower() + type_name = type(exc).__name__.lower() + if ( + "context window" in message + or "maximum context length" in message + or "exceeds the context" in message + ): + return "context_window_exceeded" + if "timeout" in message or "timed out" in message: + return "provider_timeout" + if any(term in message for term in ("auth", "api key", "unauthorized", "forbidden")): + return "provider_auth_error" + if any(term in type_name for term in ("validation", "decode")) or "parse" in message: + return "structured_parse_error" + return "unknown_provider_error" + + +def _summarize_llm_error(exc: Exception) -> str: + return re.sub(r"\s+", " ", str(exc)).strip()[:300] + + +def _packet_soft_pair_ids(packet: dict[str, Any]) -> list[str]: + return [ + "|".join(_pair_key(str(pair["left"]), str(pair["right"]))) for pair in packet["soft_pairs"] + ] + + +def _soft_pair_review_meta( + *, + label: str, + component_index: int, + component_count: int, + review_unit_index: int, + review_unit_count: int, + cluster_id: str, + component_budget: PacketBudget, + budget: PacketBudget, + prepared_review_unit_hash: str | None, + trimmed: bool, + aggressively_trimmed: bool, + split: bool, + packet: dict[str, Any], +) -> SoftPairReviewUnitMeta: + prefix = ( + f"LLM {label} soft-edge review {component_index}/{component_count}" + f" unit {review_unit_index}/{review_unit_count}" + ) + return SoftPairReviewUnitMeta( + label=label, + component_index=component_index, + component_count=component_count, + review_unit_index=review_unit_index, + review_unit_count=review_unit_count, + cluster_id=cluster_id, + prefix=prefix, + nodes=tuple(str(node) for node in packet["nodes"]), + soft_pairs=tuple(_packet_soft_pair_ids(packet)), + component_budget=component_budget, + budget=budget, + prepared_review_unit_hash=prepared_review_unit_hash, + trimmed=trimmed, + aggressively_trimmed=aggressively_trimmed, + split=split, + ) + + +def _completed_soft_pair_review_sort_key(review: CompletedSoftPairReview) -> tuple[int, int]: + return ( + review.meta.component_index, + review.meta.review_unit_index, + ) + + +def _soft_pair_review_record( + *, + review: CompletedSoftPairReview, + model: str, + accepted_nontrivial_soft_edge: bool, +) -> dict[str, Any]: + result = review.result + return { + "label": review.meta.label, + "component_index": review.meta.component_index, + "component_count": review.meta.component_count, + "review_unit_index": review.meta.review_unit_index, + "review_unit_count": review.meta.review_unit_count, + "status": review.status, + "reason": review.reason, + "source": review.source, + "cache_hit": review.cache_hit, + "model": model, + "cluster_id": review.meta.cluster_id, + "nodes": list(review.meta.nodes), + "soft_pairs": list(review.meta.soft_pairs), + "prepared_review_unit_hash": review.meta.prepared_review_unit_hash, + "component_budget": _packet_budget_json(review.meta.component_budget), + "budget": _packet_budget_json(review.meta.budget), + "overflow_policy": LLM_OVERFLOW_POLICY, + "trimmed": review.meta.trimmed, + "aggressively_trimmed": review.meta.aggressively_trimmed, + "split": review.meta.split, + "analyst_result": ( + None + if result is None or result.analyst_result is None + else result.analyst_result.model_dump(mode="json") + ), + "evaluator_result": ( + None + if result is None or result.evaluator_result is None + else result.evaluator_result.model_dump(mode="json") + ), + "evaluator_used": False if result is None else result.evaluator_used, + "retried": False if result is None else result.retried, + "accepted_nontrivial_soft_edge": accepted_nontrivial_soft_edge, + "error_kind": None if result is None else result.error_kind, + "error_message": None if result is None else result.error_message, + } + + +def _completed_soft_pair_review_from_result( + pending: PendingSoftPairReview, + result: ClusterAnalysisCallResult, +) -> CompletedSoftPairReview: + return CompletedSoftPairReview( + meta=pending.meta, + result=result, + status="reviewed" if result.analyst_result is not None else "error", + reason=None, + source="llm", + cache_hit=False, + ) + + +async def _run_pending_soft_pair_review( + pending: PendingSoftPairReview, + *, + model: str, + review_semaphore: asyncio.Semaphore, +) -> CompletedSoftPairReview: + async with review_semaphore: + try: + result = await _fast_agent_cluster_analysis(pending.prepared, model) + except Exception as exc: + result = ClusterAnalysisCallResult( + analyst_result=None, + evaluator_result=None, + error_kind=_classify_llm_error(exc), + error_message=_summarize_llm_error(exc), + evaluator_used=False, + retried=False, + ) + return _completed_soft_pair_review_from_result(pending, result) + + +async def _run_pending_soft_pair_reviews( + pending_reviews: list[PendingSoftPairReview], + *, + concurrency: int, + model: str, + review_semaphore: asyncio.Semaphore, +) -> list[CompletedSoftPairReview]: + if not pending_reviews: + return [] + if concurrency <= 1: + completed: list[CompletedSoftPairReview] = [] + for pending in pending_reviews: + completed.append( + await _run_pending_soft_pair_review( + pending, + model=model, + review_semaphore=review_semaphore, + ) + ) + return completed + tasks = [ + asyncio.create_task( + _run_pending_soft_pair_review( + pending, + model=model, + review_semaphore=review_semaphore, + ) + ) + for pending in pending_reviews + ] + return await asyncio.gather(*tasks) + + +async def _accepted_soft_pairs( + *, + options: AnalysisOptions, + snapshot: SnapshotData, + features: dict[str, ArtifactFeature], + hard_pairs: dict[tuple[str, str], set[str]], + soft_candidates: dict[tuple[str, str], dict[str, Any]], + label: str, + hybrid_review_cache: HybridReviewCacheStore, + llm_available: bool, + review_semaphore: asyncio.Semaphore, +) -> tuple[dict[tuple[str, str], dict[str, Any]], bool, list[dict[str, Any]]]: + del snapshot + if not soft_candidates: + return {}, False, [] + deterministic_accepts = { + pair: detail + for pair, detail in soft_candidates.items() + if bool(detail.get("deterministic_accept", True)) + } + if options.ranking_backend != "hybrid": + return deterministic_accepts, False, [] + if not llm_available and not hybrid_review_cache.has_entries: + return deterministic_accepts, False, [] + + candidate_graph = dict(hard_pairs) + for pair in soft_candidates: + candidate_graph.setdefault(pair, set()).add("soft_similarity") + component_payloads = _component_packets(features, candidate_graph, soft_candidates) + pending_reviews: list[PendingSoftPairReview] = [] + completed_reviews: list[CompletedSoftPairReview] = [] + accepted: dict[tuple[str, str], dict[str, Any]] = dict(deterministic_accepts) + llm_used = False + review_records: list[dict[str, Any]] = [] + total_components = len(component_payloads) + for index, payload in enumerate(component_payloads, start=1): + component_budget = _estimate_packet_size(payload, options.model) + cluster_id = _cluster_id_from_nodes(payload["nodes"]) + review_units = _split_packet_for_review(payload, options.model) + if len(review_units) > 1: + _analysis_log( + f"LLM {label} soft-edge review {index}/{total_components}: " + f"split oversized component into {len(review_units)} review units " + f"(nodes={component_budget.node_count}, soft_pairs={component_budget.soft_pair_count}, " + f"est_tokens={component_budget.estimated_input_tokens})" + ) + for unit_index, review_unit in enumerate(review_units, start=1): + prepared = _prepare_packet_for_llm( + review_unit, + options.model, + split=len(review_units) > 1, + ) + if prepared is None: + unit_budget = _estimate_packet_size(review_unit, options.model) + completed_reviews.append( + CompletedSoftPairReview( + meta=_soft_pair_review_meta( + label=label, + component_index=index, + component_count=total_components, + review_unit_index=unit_index, + review_unit_count=len(review_units), + cluster_id=cluster_id, + component_budget=component_budget, + budget=unit_budget, + prepared_review_unit_hash=None, + trimmed=True, + aggressively_trimmed=True, + split=len(review_units) > 1, + packet=review_unit, + ), + result=None, + status="skipped", + reason="over_budget_after_truncate", + source=None, + cache_hit=False, + ) + ) + continue + prepared_review_unit = _prepared_review_unit_payload(prepared) + cache_key = build_hybrid_review_cache_key( + manifest=hybrid_review_cache.manifest, + model=options.model, + prepared_review_unit=prepared_review_unit, + ) + meta = _soft_pair_review_meta( + label=label, + component_index=index, + component_count=total_components, + review_unit_index=unit_index, + review_unit_count=len(review_units), + cluster_id=cluster_id, + component_budget=component_budget, + budget=prepared.budget, + prepared_review_unit_hash=cache_key.prepared_review_unit_hash, + trimmed=prepared.trimmed, + aggressively_trimmed=prepared.aggressively_trimmed, + split=prepared.split, + packet=prepared.packet, + ) + cached_entry = hybrid_review_cache.get(cache_key) + if cached_entry is not None: + completed_reviews.append( + CompletedSoftPairReview( + meta=meta, + result=_cluster_analysis_call_result_from_payload(cached_entry.result), + status=( + "reviewed" + if cached_entry.result.get("analyst_result") is not None + else "error" + ), + reason=None, + source="cache", + cache_hit=True, + ) + ) + continue + if not llm_available: + completed_reviews.append( + CompletedSoftPairReview( + meta=meta, + result=None, + status="skipped", + reason="llm_unavailable_cache_miss", + source=None, + cache_hit=False, + ) + ) + continue + pending_reviews.append( + PendingSoftPairReview( + meta=meta, + prepared=prepared, + cache_key=cache_key, + ) + ) + reviewed_from_cache = sum(1 for review in completed_reviews if review.cache_hit) + skipped_reviews = sum(1 for review in completed_reviews if review.status == "skipped") + _analysis_log( + f"LLM {label} soft-edge review scheduling: " + f"units={len(pending_reviews) + len(completed_reviews)}, " + f"cache_hits={reviewed_from_cache}, " + f"cache_misses={len(pending_reviews)}, " + f"skipped={skipped_reviews}, " + f"concurrency={options.hybrid_llm_concurrency}" + ) + completed_reviews.extend( + await _run_pending_soft_pair_reviews( + pending_reviews, + concurrency=options.hybrid_llm_concurrency, + model=options.model, + review_semaphore=review_semaphore, + ) + ) + pending_by_position = { + (pending.meta.component_index, pending.meta.review_unit_index): pending + for pending in pending_reviews + } + for review in sorted(completed_reviews, key=_completed_soft_pair_review_sort_key): + accepted_nontrivial = False + pending = pending_by_position.get( + (review.meta.component_index, review.meta.review_unit_index) + ) + result = review.result + if review.reason == "over_budget_after_truncate": + _analysis_log( + f"{review.meta.prefix}: skipped over-budget packet " + f"(nodes={review.meta.budget.node_count}, soft_pairs={review.meta.budget.soft_pair_count}, " + f"est_tokens={review.meta.budget.estimated_input_tokens}, overflow_policy={LLM_OVERFLOW_POLICY})" + ) + elif review.reason == "llm_unavailable_cache_miss": + _analysis_log( + f"{review.meta.prefix}: cache miss with fast-agent unavailable; " + "keeping deterministic-only soft edges" + ) + else: + if review.cache_hit: + _analysis_log( + f"{review.meta.prefix}: cache hit " + f"(nodes={review.meta.budget.node_count}, soft_pairs={review.meta.budget.soft_pair_count}, " + f"est_tokens={review.meta.budget.estimated_input_tokens}, model={options.model})" + ) + if result is None or result.analyst_result is None: + if result is not None and result.error_kind is not None: + _analysis_log( + f"{review.meta.prefix}: {result.error_kind}" + f" (nodes={review.meta.budget.node_count}, soft_pairs={review.meta.budget.soft_pair_count}, " + f"est_tokens={review.meta.budget.estimated_input_tokens}, " + f"overflow_policy={LLM_OVERFLOW_POLICY})" + ) + else: + _analysis_log(f"{review.meta.prefix}: no result") + else: + llm_used = True + verdicts = { + _pair_key(verdict.left, verdict.right): verdict + for verdict in result.analyst_result.soft_edge_verdicts + } + accepted_count = sum(1 for verdict in verdicts.values() if verdict.accept) + rejected_count = sum(1 for verdict in verdicts.values() if not verdict.accept) + accepted_nontrivial = any( + verdicts.get(_pair_key(*pair_id.split("|", 1))) is not None + and verdicts[_pair_key(*pair_id.split("|", 1))].accept + and not bool( + soft_candidates[_pair_key(*pair_id.split("|", 1))].get( + "deterministic_accept", + True, + ) + ) + for pair_id in review.meta.soft_pairs + ) + evaluator_status = "used" if result.evaluator_used else "skipped" + _analysis_log( + f"{review.meta.prefix}: {accepted_count} accepted, {rejected_count} rejected, " + f"evaluator={evaluator_status}, source={review.source}" + ) + if result.error_kind is not None: + _analysis_log( + f"{review.meta.prefix}: {result.error_kind}; keeping analyst result" + ) + for pair_id in review.meta.soft_pairs: + normalized_pair = _pair_key(*pair_id.split("|", 1)) + verdict = verdicts.get(normalized_pair) + if verdict is None: + continue + if verdict.accept: + accepted[normalized_pair] = soft_candidates[normalized_pair] + else: + accepted.pop(normalized_pair, None) + if ( + pending is not None + and review.source == "llm" + and _cacheable_cluster_analysis_result(result) + ): + hybrid_review_cache.put( + HybridReviewCacheEntry( + key=pending.cache_key, + result=_cluster_analysis_call_result_payload(result), + cached_at=_iso_now(), + nodes=tuple(pending.prepared.packet["nodes"]), + soft_pairs=tuple(_packet_soft_pair_ids(pending.prepared.packet)), + budget=_packet_budget_json(pending.prepared.budget), + split=pending.prepared.split, + trimmed=pending.prepared.trimmed, + aggressively_trimmed=pending.prepared.aggressively_trimmed, + ) + ) + review_records.append( + _soft_pair_review_record( + review=review, + model=options.model, + accepted_nontrivial_soft_edge=accepted_nontrivial, + ) + ) + return accepted, llm_used, review_records + + +def _component_packets( + features: dict[str, ArtifactFeature], + pairs: dict[tuple[str, str], set[str]], + soft_candidates: dict[tuple[str, str], dict[str, Any]], +) -> list[dict[str, Any]]: + components = _connected_components(features, pairs) + packets: list[dict[str, Any]] = [] + for nodes in components: + pair_members = { + pair: evidence + for pair, evidence in pairs.items() + if pair[0] in nodes and pair[1] in nodes + } + soft_pairs = [ + { + "left": pair[0], + "right": pair[1], + "score": detail["score"], + "jaccard": detail["jaccard"], + "evidence_types": sorted(detail.get("evidence_types") or []), + "shared_targets": detail.get("shared_targets") or [], + "shared_filenames": detail.get("shared_filenames") or [], + "deterministic_accept": bool(detail.get("deterministic_accept", True)), + } + for pair, detail in soft_candidates.items() + if pair[0] in nodes and pair[1] in nodes + ] + packets.append( + { + "nodes": nodes, + "items": [_cluster_item(features[node]) for node in nodes], + "pair_evidence": { + f"{left}|{right}": sorted(evidence) + for (left, right), evidence in pair_members.items() + }, + "soft_pairs": soft_pairs, + } + ) + return packets + + +def _clusters( + *, + snapshot: SnapshotData, + features: dict[str, ArtifactFeature], + final_pairs: dict[tuple[str, str], set[str]], + pair_target_issues: dict[tuple[str, str], set[int]], + llm_cluster_payloads: dict[str, ClusterAnalystResponse], +) -> list[ClusterRecord]: + clusters: list[ClusterRecord] = [] + for nodes in _connected_components(features, final_pairs): + issue_numbers = sorted( + int(node.split(":", 1)[1]) for node in nodes if node.startswith("issue:") + ) + pr_numbers = sorted( + int(node.split(":", 1)[1]) for node in nodes if node.startswith("pull_request:") + ) + evidence_types = sorted( + { + evidence + for (left, right), evidences in final_pairs.items() + if left in nodes and right in nodes + for evidence in evidences + } + ) + target_counter: Counter[int] = Counter() + for pair, targets in pair_target_issues.items(): + if pair[0] in nodes and pair[1] in nodes: + target_counter.update(targets) + target_issue_number = target_counter.most_common(1)[0][0] if target_counter else None + llm_payload = llm_cluster_payloads.get(_cluster_id_from_nodes(nodes)) + clusters.append( + _cluster_record_from_members( + features=features, + issue_numbers=issue_numbers, + pr_numbers=pr_numbers, + evidence_types=evidence_types, + target_issue_number=target_issue_number, + llm_payload=llm_payload, + ) + ) + return clusters + + +def _cluster_record_from_members( + *, + features: dict[str, ArtifactFeature], + issue_numbers: list[int], + pr_numbers: list[int], + evidence_types: list[str], + target_issue_number: int | None, + llm_payload: ClusterAnalystResponse | None = None, +) -> ClusterRecord: + nodes = sorted( + [f"issue:{number}" for number in issue_numbers] + + [f"pull_request:{number}" for number in pr_numbers] + ) + cluster_id = _cluster_id_from_nodes(nodes) + canonical_issue_number = _canonical_issue(issue_numbers, features) + canonical_pr_number = _canonical_pr(pr_numbers, features) + status = _cluster_status(issue_numbers, pr_numbers, features) + confidence = _cluster_confidence(evidence_types) + summary = _cluster_summary(issue_numbers, pr_numbers, target_issue_number, evidence_types) + canonical_issue_reason = ( + _canonical_issue_reason(canonical_issue_number, features, issue_numbers) + if canonical_issue_number is not None + else None + ) + canonical_pr_reason = ( + _canonical_pr_reason(canonical_pr_number, features, pr_numbers) + if canonical_pr_number is not None + else None + ) + best_issue_reason = ( + _best_issue_reason(canonical_issue_number, features, len(nodes)) + if canonical_issue_number is not None + else None + ) + best_pr_reason = ( + _best_pr_reason(canonical_pr_number, features, len(nodes)) + if canonical_pr_number is not None + else None + ) + if llm_payload is not None: + summary = llm_payload.summary or summary + confidence = max(0.0, min(1.0, llm_payload.confidence)) + canonical_issue_reason = llm_payload.canonical_issue_reason or canonical_issue_reason + canonical_pr_reason = llm_payload.canonical_pr_reason or canonical_pr_reason + best_issue_reason = llm_payload.best_issue_reason or best_issue_reason + best_pr_reason = llm_payload.best_pr_reason or best_pr_reason + cluster_score = _cluster_score(issue_numbers, pr_numbers, features, status) + best_issue_score = ( + _issue_score(canonical_issue_number, features, len(nodes)) + if canonical_issue_number is not None + else None + ) + best_pr_score = ( + _pr_score(canonical_pr_number, features, len(nodes)) + if canonical_pr_number is not None + else None + ) + return ClusterRecord( + cluster_id=cluster_id, + nodes=nodes, + issue_numbers=issue_numbers, + pr_numbers=pr_numbers, + evidence_types=evidence_types, + canonical_issue_number=canonical_issue_number, + canonical_pr_number=canonical_pr_number, + target_issue_number=target_issue_number, + summary=summary, + status=status, + confidence=confidence, + canonical_issue_reason=canonical_issue_reason, + canonical_pr_reason=canonical_pr_reason, + best_issue_reason=best_issue_reason, + best_pr_reason=best_pr_reason, + cluster_score=cluster_score, + best_issue_score=best_issue_score, + best_pr_score=best_pr_score, + ) + + +def _meta_bug_clusters( + *, + features: dict[str, ArtifactFeature], + issue_clusters: list[ClusterRecord], + pr_clusters: list[ClusterRecord], + explicit_issue_link_targets: defaultdict[int, set[int]], + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], +) -> list[ClusterRecord]: + issue_cluster_by_issue: dict[int, ClusterRecord] = {} + for cluster in issue_clusters: + for issue_number in cluster.issue_numbers: + issue_cluster_by_issue[issue_number] = cluster + + pr_cluster_by_pr: dict[int, ClusterRecord] = {} + for cluster in pr_clusters: + for pr_number in cluster.pr_numbers: + pr_cluster_by_pr[pr_number] = cluster + + prs_by_target_issue: defaultdict[int, set[int]] = defaultdict(set) + for pr_number, targets in explicit_issue_link_targets.items(): + for target in targets: + if target in issue_map: + prs_by_target_issue[target].add(pr_number) + + issue_anchors: list[ClusterRecord] = list(issue_clusters) + targeted_issue_numbers = sorted(prs_by_target_issue) + for issue_number in targeted_issue_numbers: + if issue_number in issue_cluster_by_issue: + continue + singleton = _cluster_record_from_members( + features=features, + issue_numbers=[issue_number], + pr_numbers=[], + evidence_types=["closing_reference"], + target_issue_number=issue_number, + ) + issue_anchors.append(singleton) + issue_cluster_by_issue[issue_number] = singleton + + pr_groups: list[ClusterRecord] = list(pr_clusters) + for pr_number, targets in explicit_issue_link_targets.items(): + if pr_number in pr_cluster_by_pr or pr_number not in pr_map: + continue + singleton = _cluster_record_from_members( + features=features, + issue_numbers=[], + pr_numbers=[pr_number], + evidence_types=["closing_reference"], + target_issue_number=min(targets) if targets else None, + ) + pr_groups.append(singleton) + pr_cluster_by_pr[pr_number] = singleton + + anchor_buckets: dict[str, dict[str, Any]] = {} + issue_anchor_for_issue: dict[int, str] = {} + for cluster in issue_anchors: + has_attached_prs = any( + prs_by_target_issue.get(issue_number) for issue_number in cluster.issue_numbers + ) + if len(cluster.issue_numbers) < 2 and not has_attached_prs: + continue + anchor_buckets[cluster.cluster_id] = { + "issue_numbers": set(cluster.issue_numbers), + "pr_numbers": set(), + "evidence_types": set(cluster.evidence_types), + "target_issue_number": cluster.canonical_issue_number + or (cluster.issue_numbers[0] if cluster.issue_numbers else None), + } + for issue_number in cluster.issue_numbers: + issue_anchor_for_issue[issue_number] = cluster.cluster_id + + attached_pr_clusters: set[str] = set() + for cluster in pr_groups: + anchor_id = _select_issue_anchor_for_pr_cluster( + cluster=cluster, + explicit_issue_link_targets=explicit_issue_link_targets, + issue_map=issue_map, + issue_anchor_for_issue=issue_anchor_for_issue, + anchor_buckets=anchor_buckets, + ) + if anchor_id is not None: + bucket = anchor_buckets[anchor_id] + bucket["pr_numbers"].update(cluster.pr_numbers) + bucket["evidence_types"].update(cluster.evidence_types) + bucket["evidence_types"].add("closing_reference") + attached_pr_clusters.add(cluster.cluster_id) + + meta_clusters: list[ClusterRecord] = [] + for bucket in anchor_buckets.values(): + if len(bucket["pr_numbers"]) < 2: + continue + meta_clusters.append( + _cluster_record_from_members( + features=features, + issue_numbers=sorted(bucket["issue_numbers"]), + pr_numbers=sorted(bucket["pr_numbers"]), + evidence_types=sorted(bucket["evidence_types"]), + target_issue_number=bucket["target_issue_number"], + ) + ) + + for cluster in pr_groups: + if cluster.cluster_id in attached_pr_clusters: + continue + if len(cluster.pr_numbers) < 2: + continue + meta_clusters.append( + _cluster_record_from_members( + features=features, + issue_numbers=[], + pr_numbers=cluster.pr_numbers, + evidence_types=cluster.evidence_types, + target_issue_number=cluster.target_issue_number, + ) + ) + + return sorted( + {cluster.cluster_id: cluster for cluster in meta_clusters}.values(), + key=lambda cluster: cluster.cluster_id, + ) + + +def _select_issue_anchor_for_pr_cluster( + *, + cluster: ClusterRecord, + explicit_issue_link_targets: defaultdict[int, set[int]], + issue_map: dict[int, dict[str, Any]], + issue_anchor_for_issue: dict[int, str], + anchor_buckets: dict[str, dict[str, Any]], +) -> str | None: + anchor_counts: Counter[str] = Counter() + targeted_pr_count = 0 + for pr_number in cluster.pr_numbers: + anchor_ids = { + issue_anchor_for_issue[target] + for target in explicit_issue_link_targets.get(pr_number, set()) + if target in issue_map and target in issue_anchor_for_issue + } + if not anchor_ids: + continue + targeted_pr_count += 1 + anchor_counts.update(anchor_ids) + if not anchor_counts or targeted_pr_count <= 0: + return None + ranked = sorted( + anchor_counts.items(), + key=lambda item: ( + -item[1], + min(anchor_buckets[item[0]]["issue_numbers"]), + ), + ) + winner_id, winner_count = ranked[0] + runner_up_count = ranked[1][1] if len(ranked) > 1 else 0 + if winner_count <= runner_up_count: + return None + if winner_count * 2 < targeted_pr_count: + return None + return winner_id + + +def _best_issue( + clusters: list[ClusterRecord], features: dict[str, ArtifactFeature] +) -> BestIssueEntry | None: + candidates = [cluster for cluster in clusters if cluster.canonical_issue_number is not None] + if not candidates: + return None + winner = min( + candidates, + key=lambda cluster: ( + 0 + if features[f"issue:{cluster.canonical_issue_number}"].row.get("state") == "open" + else 1, + -len(cluster.nodes), + -features[f"issue:{cluster.canonical_issue_number}"].discussion_activity, + -features[f"issue:{cluster.canonical_issue_number}"].inbound_references, + _sort_timestamp( + features[f"issue:{cluster.canonical_issue_number}"].row.get("created_at") + ), + cluster.canonical_issue_number, + ), + ) + issue_number = winner.canonical_issue_number + assert issue_number is not None + issue_reason = winner.best_issue_reason + if issue_reason is None: + issue_reason = _best_issue_reason(issue_number, features, len(winner.nodes)) + assert issue_reason is not None + return BestIssueEntry( + cluster_id=winner.cluster_id, + issue_number=issue_number, + reason=issue_reason, + score=round(float(winner.best_issue_score or 0.0), 3), + ) + + +def _best_pr( + clusters: list[ClusterRecord], features: dict[str, ArtifactFeature] +) -> BestPrEntry | None: + candidates = [cluster for cluster in clusters if cluster.canonical_pr_number is not None] + if not candidates: + return None + open_candidates = [ + cluster + for cluster in candidates + if features[f"pull_request:{cluster.canonical_pr_number}"].row.get("state") == "open" + and not bool(features[f"pull_request:{cluster.canonical_pr_number}"].row.get("draft")) + ] + pool = ( + open_candidates + or [ + cluster + for cluster in candidates + if bool(features[f"pull_request:{cluster.canonical_pr_number}"].row.get("merged")) + ] + or candidates + ) + winner = min( + pool, + key=lambda cluster: ( + 0 + if features[f"pull_request:{cluster.canonical_pr_number}"].row.get("state") == "open" + and not bool(features[f"pull_request:{cluster.canonical_pr_number}"].row.get("draft")) + else 1, + -len(cluster.nodes), + -features[f"pull_request:{cluster.canonical_pr_number}"].explicit_issue_links, + -( + features[f"pull_request:{cluster.canonical_pr_number}"].discussion_activity + + features[f"pull_request:{cluster.canonical_pr_number}"].review_activity + ), + features[f"pull_request:{cluster.canonical_pr_number}"].diff_size, + _sort_timestamp( + features[f"pull_request:{cluster.canonical_pr_number}"].row.get("created_at") + ), + cluster.canonical_pr_number, + ), + ) + pr_number = winner.canonical_pr_number + assert pr_number is not None + pr_reason = winner.best_pr_reason + if pr_reason is None: + pr_reason = _best_pr_reason(pr_number, features, len(winner.nodes)) + assert pr_reason is not None + return BestPrEntry( + cluster_id=winner.cluster_id, + pr_number=pr_number, + reason=pr_reason, + score=round(float(winner.best_pr_score or 0.0), 3), + ) + + +def _resolve_source_node( + row: dict[str, Any], + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], + comment_map: dict[int, dict[str, Any]], + review_map: dict[int, dict[str, Any]], + review_comment_map: dict[int, dict[str, Any]], +) -> str | None: + source_type = row.get("source_type") + source_number = row.get("source_number") + if source_type in {"issue", "pull_request"} and source_number is not None: + return _node_from_number(int(source_number), issue_map, pr_map) + source_id = row.get("source_github_id") + if source_type == "comment" and source_id is not None: + comment = comment_map.get(int(source_id)) + if comment and comment.get("parent_number") is not None: + parent_kind = comment.get("parent_kind") + if parent_kind in {"issue", "pull_request"}: + return _node_from_number(int(comment["parent_number"]), issue_map, pr_map) + if source_type == "review" and source_id is not None: + review = review_map.get(int(source_id)) + if review: + return _node_from_number(int(review["pull_request_number"]), issue_map, pr_map) + if source_type == "review_comment" and source_id is not None: + review_comment = review_comment_map.get(int(source_id)) + if review_comment: + return _node_from_number(int(review_comment["pull_request_number"]), issue_map, pr_map) + if source_number is None: + return None + return _node_from_number(int(source_number), issue_map, pr_map) + + +def _resolve_target_node( + repo: str, + row: dict[str, Any], + issue_map: dict[int, dict[str, Any]], + pr_map: dict[int, dict[str, Any]], +) -> str | None: + if ( + row.get("target_owner") != repo.split("/", 1)[0] + or row.get("target_repo") != repo.split("/", 1)[1] + ): + return None + target_number = row.get("target_number") + if target_number is None: + return None + return _node_from_number(int(target_number), issue_map, pr_map) + + +def _node_from_number( + number: int, issue_map: dict[int, dict[str, Any]], pr_map: dict[int, dict[str, Any]] +) -> str | None: + if number in issue_map: + return f"issue:{number}" + if number in pr_map: + return f"pull_request:{number}" + return None + + +def _connected_components( + features: dict[str, ArtifactFeature], + pairs: dict[tuple[str, str], set[str]], +) -> list[list[str]]: + adjacency: defaultdict[str, set[str]] = defaultdict(set) + for left, right in pairs: + adjacency[left].add(right) + adjacency[right].add(left) + visited: set[str] = set() + components: list[list[str]] = [] + for node in sorted(adjacency): + if node in visited: + continue + stack = [node] + component: list[str] = [] + while stack: + current = stack.pop() + if current in visited: + continue + visited.add(current) + component.append(current) + stack.extend(sorted(adjacency[current] - visited)) + components.append(sorted(component)) + return components + + +def _canonical_issue(issue_numbers: list[int], features: dict[str, ArtifactFeature]) -> int | None: + if not issue_numbers: + return None + return min( + issue_numbers, + key=lambda number: ( + 0 if features[f"issue:{number}"].row.get("state") == "open" else 1, + -features[f"issue:{number}"].inbound_references, + -features[f"issue:{number}"].discussion_activity, + _sort_timestamp(features[f"issue:{number}"].row.get("created_at")), + number, + ), + ) + + +def _canonical_pr(pr_numbers: list[int], features: dict[str, ArtifactFeature]) -> int | None: + if not pr_numbers: + return None + return min( + pr_numbers, + key=lambda number: ( + 0 if bool(features[f"pull_request:{number}"].row.get("merged")) else 1, + 0 + if features[f"pull_request:{number}"].row.get("state") == "open" + and not bool(features[f"pull_request:{number}"].row.get("draft")) + else 1, + -features[f"pull_request:{number}"].explicit_issue_links, + -( + features[f"pull_request:{number}"].discussion_activity + + features[f"pull_request:{number}"].review_activity + ), + features[f"pull_request:{number}"].diff_size, + _sort_timestamp(features[f"pull_request:{number}"].row.get("created_at")), + number, + ), + ) + + +def _cluster_status( + issue_numbers: list[int], pr_numbers: list[int], features: dict[str, ArtifactFeature] +) -> str: + if any(features[f"issue:{number}"].row.get("state") == "open" for number in issue_numbers): + return "open" + if any( + features[f"pull_request:{number}"].row.get("state") == "open" + and not bool(features[f"pull_request:{number}"].row.get("draft")) + for number in pr_numbers + ): + return "open" + if any(bool(features[f"pull_request:{number}"].row.get("merged")) for number in pr_numbers): + return "merged" + return "closed" + + +def _cluster_confidence(evidence_types: list[str]) -> float: + confidence = 0.45 + if "duplicate_reference" in evidence_types: + confidence += 0.25 + if "shared_issue_target" in evidence_types: + confidence += 0.2 + if "closing_reference" in evidence_types: + confidence += 0.1 + if "timeline:cross-referenced" in evidence_types: + confidence += 0.1 + if "soft_similarity" in evidence_types: + confidence += 0.05 + return min(confidence, 0.99) + + +def _cluster_summary( + issue_numbers: list[int], + pr_numbers: list[int], + target_issue_number: int | None, + evidence_types: list[str], +) -> str: + if issue_numbers and pr_numbers and target_issue_number is not None: + return f"Cluster of {len(issue_numbers)} issues and {len(pr_numbers)} PRs centered on issue #{target_issue_number}." + if pr_numbers and target_issue_number is not None: + return f"Cluster of {len(pr_numbers)} PRs targeting issue #{target_issue_number}." + if issue_numbers: + return f"Cluster of {len(issue_numbers)} related issues linked by {', '.join(evidence_types[:2]) or 'duplicate evidence'}." + return f"Cluster of {len(pr_numbers)} related pull requests linked by {', '.join(evidence_types[:2]) or 'shared evidence'}." + + +def _cluster_score( + issue_numbers: list[int], + pr_numbers: list[int], + features: dict[str, ArtifactFeature], + status: str, +) -> float: + cluster_size = len(issue_numbers) + len(pr_numbers) + has_mixed = 1 if issue_numbers and pr_numbers else 0 + duplicate_pressure = max(len(issue_numbers) - 1, 0) + max(len(pr_numbers) - 1, 0) + open_bonus = 1 if status == "open" else 0 + discussion = sum( + features[f"issue:{number}"].discussion_activity for number in issue_numbers + ) + sum( + features[f"pull_request:{number}"].discussion_activity + + features[f"pull_request:{number}"].review_activity + for number in pr_numbers + ) + return float( + cluster_size * 100 + has_mixed * 50 + duplicate_pressure * 25 + open_bonus * 20 + discussion + ) + + +def _issue_score( + number: int | None, features: dict[str, ArtifactFeature], cluster_size: int +) -> float | None: + if number is None: + return None + feature = features[f"issue:{number}"] + score = 0.0 + if feature.row.get("state") == "open": + score += 100.0 + score += cluster_size * 10.0 + score += feature.discussion_activity * 2.0 + score += feature.inbound_references + return score + + +def _pr_score( + number: int | None, features: dict[str, ArtifactFeature], cluster_size: int +) -> float | None: + if number is None: + return None + feature = features[f"pull_request:{number}"] + score = 0.0 + if feature.row.get("state") == "open" and not bool(feature.row.get("draft")): + score += 120.0 + elif bool(feature.row.get("merged")): + score += 60.0 + score += cluster_size * 10.0 + score += feature.explicit_issue_links * 5.0 + score += (feature.discussion_activity + feature.review_activity) * 2.0 + score -= feature.diff_size / 1000.0 + return score + + +def _canonical_issue_reason( + number: int | None, features: dict[str, ArtifactFeature], issue_numbers: list[int] +) -> str | None: + if number is None: + return None + feature = features[f"issue:{number}"] + return ( + f"Issue #{number} is canonical because it is {'open' if feature.row.get('state') == 'open' else 'closed'}, " + f"has {feature.inbound_references} inbound references, and has the strongest discussion signal in a cluster of {len(issue_numbers)} issues." + ) + + +def _canonical_pr_reason( + number: int | None, features: dict[str, ArtifactFeature], pr_numbers: list[int] +) -> str | None: + if number is None: + return None + feature = features[f"pull_request:{number}"] + review_signal = feature.discussion_activity + feature.review_activity + return ( + f"PR #{number} is canonical because it is {'merged' if feature.row.get('merged') else 'open' if feature.row.get('state') == 'open' else 'closed'}, " + f"links to {feature.explicit_issue_links} issues, and has {review_signal} review/discussion events across {len(pr_numbers)} related PRs." + ) + + +def _best_issue_reason( + number: int | None, features: dict[str, ArtifactFeature], cluster_size: int +) -> str | None: + if number is None: + return None + feature = features[f"issue:{number}"] + return ( + f"Issue #{number} is the strongest global issue candidate because it is {'open' if feature.row.get('state') == 'open' else 'closed'}, " + f"belongs to a cluster with {cluster_size} artifacts, and carries {feature.discussion_activity} discussion comments plus {feature.inbound_references} inbound references." + ) + + +def _best_pr_reason( + number: int | None, features: dict[str, ArtifactFeature], cluster_size: int +) -> str | None: + if number is None: + return None + feature = features[f"pull_request:{number}"] + return ( + f"PR #{number} is the strongest global PR candidate because it is {'open' if feature.row.get('state') == 'open' else 'merged' if feature.row.get('merged') else 'closed'}, " + f"belongs to a cluster with {cluster_size} artifacts, links to {feature.explicit_issue_links} issues, and carries {feature.discussion_activity + feature.review_activity} review/discussion events." + ) + + +def _duplicate_issue_reason(cluster: ClusterRecord) -> str: + return f"Issues in {cluster.cluster_id} are treated as duplicates because they share {', '.join(cluster.evidence_types)} evidence." + + +def _duplicate_pr_reason(cluster: ClusterRecord) -> str: + if cluster.target_issue_number is not None: + return f"PRs in {cluster.cluster_id} are treated as duplicates because they converge on issue #{cluster.target_issue_number} with {', '.join(cluster.evidence_types)} evidence." + return f"PRs in {cluster.cluster_id} are treated as duplicates because they share {', '.join(cluster.evidence_types)} evidence." + + +def _cluster_pr_comparisons( + cluster: ClusterRecord, features: dict[str, ArtifactFeature] +) -> list[PrComparisonEntry]: + comparisons: list[PrComparisonEntry] = [] + numbers = sorted(cluster.pr_numbers) + for index, left_number in enumerate(numbers): + left = features[f"pull_request:{left_number}"] + for right_number in numbers[index + 1 :]: + right = features[f"pull_request:{right_number}"] + comparisons.append(_pr_comparison(left, right)) + return comparisons + + +def _pr_comparison(left: ArtifactFeature, right: ArtifactFeature) -> PrComparisonEntry: + shared_filenames = sorted(set(left.filenames) & set(right.filenames)) + size_similarity = _size_similarity(left.diff_size, right.diff_size) + file_overlap = _jaccard_sets(set(left.filenames), set(right.filenames)) + area_overlap, shared_file_areas = _file_area_overlap( + left.file_ranges_by_name, right.file_ranges_by_name + ) + patch_similarity = _jaccard_sets(set(left.patch_tokens), set(right.patch_tokens)) + code_similarity = ( + size_similarity * 0.20 + file_overlap * 0.30 + area_overlap * 0.35 + patch_similarity * 0.15 + ) + return PrComparisonEntry( + left_pr_number=left.number, + right_pr_number=right.number, + code_similarity=round(code_similarity, 3), + size_similarity=round(size_similarity, 3), + file_overlap=round(file_overlap, 3), + area_overlap=round(area_overlap, 3), + patch_similarity=round(patch_similarity, 3), + shared_filenames=shared_filenames, + shared_file_areas=shared_file_areas, + ) + + +def _cluster_item(feature: ArtifactFeature) -> dict[str, Any]: + return { + "node_id": feature.node_id, + "kind": feature.kind, + "number": feature.number, + "title": feature.row.get("title"), + "state": feature.row.get("state"), + "draft": feature.row.get("draft"), + "merged": feature.row.get("merged"), + "created_at": feature.row.get("created_at"), + "body_length": feature.body_length, + "discussion_activity": feature.discussion_activity, + "review_activity": feature.review_activity, + "inbound_references": feature.inbound_references, + "explicit_issue_links": feature.explicit_issue_links, + "explicit_issue_targets": feature.explicit_issue_targets, + "diff_size": feature.diff_size, + "filenames": feature.filenames[:20], + "diff_preview": feature.diff_preview, + } + + +async def _fast_agent_cluster_analysis( + prepared: PreparedLlmPacket, model: str +) -> ClusterAnalysisCallResult: + try: + from fast_agent import FastAgent + except Exception as exc: + return ClusterAnalysisCallResult( + analyst_result=None, + evaluator_result=None, + error_kind=_classify_llm_error(exc), + error_message=_summarize_llm_error(exc), + evaluator_used=False, + retried=False, + ) + + fast = FastAgent("slop-farmer-analysis") + + @fast.agent( + name="cluster_analyst", + instruction=CLUSTER_ANALYST_INSTRUCTION, + model=model, + use_history=False, + ) + async def analyst_stub() -> None: + return None + + @fast.agent( + name="cluster_evaluator", + instruction=CLUSTER_EVALUATOR_INSTRUCTION, + model=model, + use_history=False, + ) + async def evaluator_stub() -> None: + return None + + packet = prepared.packet + prompt = json.dumps(packet, indent=2, sort_keys=True) + try: + async with fast.run() as agent: + analyst_result, _ = await agent.cluster_analyst.structured( + prompt, ClusterAnalystResponse + ) + if analyst_result is None: + return ClusterAnalysisCallResult( + analyst_result=None, + evaluator_result=None, + error_kind=None, + error_message=None, + evaluator_used=False, + retried=False, + ) + if not _should_run_evaluator( + packet, + prepared.budget, + split=prepared.split, + aggressively_trimmed=prepared.aggressively_trimmed, + analyst_result=analyst_result, + ): + return ClusterAnalysisCallResult( + analyst_result=analyst_result, + evaluator_result=None, + error_kind=None, + error_message=None, + evaluator_used=False, + retried=False, + ) + evaluation_prompt = json.dumps( + {"packet": packet, "analyst_result": analyst_result.model_dump(mode="json")}, + indent=2, + sort_keys=True, + ) + try: + evaluation_result, _ = await agent.cluster_evaluator.structured( + evaluation_prompt, ClusterEvaluatorResponse + ) + except Exception as exc: + return ClusterAnalysisCallResult( + analyst_result=analyst_result, + evaluator_result=None, + error_kind=_classify_llm_error(exc), + error_message=_summarize_llm_error(exc), + evaluator_used=True, + retried=False, + ) + if evaluation_result is None or evaluation_result.accept: + return ClusterAnalysisCallResult( + analyst_result=analyst_result, + evaluator_result=evaluation_result, + error_kind=None, + error_message=None, + evaluator_used=True, + retried=False, + ) + retry_prompt = json.dumps( + { + "packet": packet, + "previous_result": analyst_result.model_dump(mode="json"), + "feedback": evaluation_result.feedback, + }, + indent=2, + sort_keys=True, + ) + try: + retry_result, _ = await agent.cluster_analyst.structured( + retry_prompt, ClusterAnalystResponse + ) + except Exception as exc: + return ClusterAnalysisCallResult( + analyst_result=analyst_result, + evaluator_result=evaluation_result, + error_kind=_classify_llm_error(exc), + error_message=_summarize_llm_error(exc), + evaluator_used=True, + retried=True, + ) + return ClusterAnalysisCallResult( + analyst_result=retry_result or analyst_result, + evaluator_result=evaluation_result, + error_kind=None, + error_message=None, + evaluator_used=True, + retried=True, + ) + except Exception as exc: + return ClusterAnalysisCallResult( + analyst_result=None, + evaluator_result=None, + error_kind=_classify_llm_error(exc), + error_message=_summarize_llm_error(exc), + evaluator_used=False, + retried=False, + ) + + +def _can_use_fast_agent() -> bool: + try: + import fast_agent # noqa: F401 + except Exception: + return False + return any(os.environ.get(name) for name in LLM_PROVIDER_ENV_VARS) + + +def _tokenize(text: str | None, *, remove_stopwords: bool) -> list[str]: + tokens = TOKEN_PATTERN.findall((text or "").lower()) + if not remove_stopwords: + return tokens + return [token for token in tokens if token not in STOPWORDS] + + +def _strip_pull_request_template( + body: str | None, + *, + settings: Any | None = None, +) -> str: + return strip_pull_request_template(body, settings=settings) + + +def _patch_ranges(patch: str) -> list[tuple[int, int]]: + ranges: list[tuple[int, int]] = [] + for line in patch.splitlines(): + match = HUNK_HEADER_PATTERN.match(line) + if match is None: + continue + start = int(match.group("start")) + count = int(match.group("count") or "1") + end = start if count == 0 else start + count - 1 + ranges.append((start, end)) + return ranges + + +def _patch_content_tokens(patch: str) -> list[str]: + lines = [] + for line in patch.splitlines(): + if line.startswith("+++") or line.startswith("---"): + continue + if line.startswith("+") or line.startswith("-"): + lines.append(line[1:]) + return _tokenize("\n".join(lines), remove_stopwords=True) + + +def _size_similarity(left: int, right: int) -> float: + largest = max(left, right) + if largest <= 0: + return 1.0 + return min(left, right) / largest + + +def _file_area_overlap( + left_ranges_by_name: dict[str, list[tuple[int, int]]], + right_ranges_by_name: dict[str, list[tuple[int, int]]], +) -> tuple[float, list[PrFileAreaEntry]]: + shared_names = sorted(set(left_ranges_by_name) & set(right_ranges_by_name)) + if not shared_names: + return 0.0, [] + total_overlap = 0 + total_union = 0 + entries: list[PrFileAreaEntry] = [] + for filename in shared_names: + left_ranges = _merge_ranges(left_ranges_by_name.get(filename) or []) + right_ranges = _merge_ranges(right_ranges_by_name.get(filename) or []) + overlap = _ranges_overlap_size(left_ranges, right_ranges) + union = _ranges_size(_merge_ranges([*left_ranges, *right_ranges])) + total_overlap += overlap + total_union += union + entries.append( + PrFileAreaEntry( + filename=filename, + left_ranges=[[start, end] for start, end in left_ranges], + right_ranges=[[start, end] for start, end in right_ranges], + ) + ) + if total_union == 0: + return 0.0, entries + return total_overlap / total_union, entries + + +def _merge_ranges(ranges: list[tuple[int, int]]) -> list[tuple[int, int]]: + if not ranges: + return [] + merged: list[tuple[int, int]] = [] + for start, end in sorted(ranges): + if not merged or start > merged[-1][1] + 1: + merged.append((start, end)) + continue + merged[-1] = (merged[-1][0], max(merged[-1][1], end)) + return merged + + +def _ranges_size(ranges: list[tuple[int, int]]) -> int: + return sum(end - start + 1 for start, end in ranges) + + +def _ranges_overlap_size(left: list[tuple[int, int]], right: list[tuple[int, int]]) -> int: + overlap = 0 + left_index = 0 + right_index = 0 + while left_index < len(left) and right_index < len(right): + left_start, left_end = left[left_index] + right_start, right_end = right[right_index] + overlap_start = max(left_start, right_start) + overlap_end = min(left_end, right_end) + if overlap_start <= overlap_end: + overlap += overlap_end - overlap_start + 1 + if left_end <= right_end: + left_index += 1 + else: + right_index += 1 + return overlap + + +def _days_between(left: str | None, right: str | None) -> int: + if not left or not right: + return 10**9 + return abs((_parse_dt(left) - _parse_dt(right)).days) + + +def _parse_dt(value: str) -> datetime: + return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC) + + +def _sort_timestamp(value: str | None) -> str: + return value or "9999-99-99T99:99:99Z" + + +def _pair_key(left: str, right: str) -> tuple[str, str]: + return (left, right) if left <= right else (right, left) + + +def _jaccard(left: list[str], right: list[str]) -> float: + return _jaccard_sets(set(left), set(right)) + + +def _jaccard_sets(left_set: set[str], right_set: set[str]) -> float: + if not left_set or not right_set: + return 0.0 + return len(left_set & right_set) / len(left_set | right_set) + + +def _cluster_id_from_nodes(nodes: list[str]) -> str: + numbers = sorted(int(node.split(":", 1)[1]) for node in nodes) + return f"cluster-{numbers[0]}-{len(nodes)}"