FoundationalASSIST / Code /find_duplicate_problem_id.py
martinakaduc's picture
Upload folder using huggingface_hub
6256eb9 verified
#!/usr/bin/env python3
"""Export rows that have duplicated problem_id values.
Default input:
- ../Data/Problems.csv
Default output:
- ../Results/Problems_duplicated_problem_id.csv
The output includes all original columns plus:
- duplicate_group_id
- duplicate_problem_id_count
- distinct_problem_body_count
"""
from __future__ import annotations
import argparse
import csv
import html
import re
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Set
_TAG_RE = re.compile(r"<[^>]+>")
_WS_RE = re.compile(r"\s+")
def normalize_body(text: str, strip_html: bool, collapse_whitespace: bool) -> str:
"""Normalize Problem Body text for distinct-body counting."""
value = html.unescape(text or "")
if strip_html:
value = _TAG_RE.sub("", value)
if collapse_whitespace:
value = _WS_RE.sub(" ", value).strip()
return value
def main() -> None:
parser = argparse.ArgumentParser(
description="Find rows in Problems.csv where problem_id is duplicated."
)
parser.add_argument(
"--input-csv",
type=Path,
default=Path("../Data/Problems.csv"),
help="Path to Problems.csv",
)
parser.add_argument(
"--output-csv",
type=Path,
default=Path("../Results/Problems_duplicated_problem_id.csv"),
help="Output CSV path",
)
parser.add_argument(
"--id-column",
type=str,
default="problem_id",
help="Column name for problem identifier",
)
parser.add_argument(
"--body-column",
type=str,
default="Problem Body",
help="Column name for problem statement text",
)
parser.add_argument(
"--strip-html",
action="store_true",
help="Strip HTML tags before counting distinct problem bodies",
)
parser.add_argument(
"--collapse-whitespace",
action="store_true",
help="Collapse runs of whitespace before counting distinct problem bodies",
)
args = parser.parse_args()
input_csv = args.input_csv.resolve()
output_csv = args.output_csv.resolve()
with input_csv.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
fieldnames = reader.fieldnames or []
if args.id_column not in fieldnames:
raise ValueError(f"Missing id column '{args.id_column}' in {input_csv}")
if args.body_column not in fieldnames:
raise ValueError(f"Missing body column '{args.body_column}' in {input_csv}")
rows: List[dict] = list(reader)
groups: Dict[str, List[int]] = defaultdict(list)
for idx, row in enumerate(rows):
problem_id = str(row.get(args.id_column, "")).strip()
if not problem_id:
continue
groups[problem_id].append(idx)
duplicate_ids = [pid for pid, row_idxs in groups.items() if len(row_idxs) > 1]
# Preserve first-seen order of duplicate groups.
duplicate_ids.sort(key=lambda pid: groups[pid][0])
output_rows: List[dict] = []
for group_num, pid in enumerate(duplicate_ids, start=1):
row_idxs = groups[pid]
distinct_bodies: Set[str] = set()
for row_idx in row_idxs:
body_raw = rows[row_idx].get(args.body_column, "")
distinct_bodies.add(
normalize_body(
body_raw,
strip_html=args.strip_html,
collapse_whitespace=args.collapse_whitespace,
)
)
for row_idx in row_idxs:
out_row = dict(rows[row_idx])
out_row["duplicate_group_id"] = str(group_num)
out_row["duplicate_problem_id_count"] = str(len(row_idxs))
out_row["distinct_problem_body_count"] = str(len(distinct_bodies))
output_rows.append(out_row)
output_csv.parent.mkdir(parents=True, exist_ok=True)
output_fieldnames = fieldnames + [
"duplicate_group_id",
"duplicate_problem_id_count",
"distinct_problem_body_count",
]
with output_csv.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=output_fieldnames)
writer.writeheader()
writer.writerows(output_rows)
print(f"Input rows: {len(rows)}")
print(f"Duplicated {args.id_column} groups: {len(duplicate_ids)}")
print(f"Output rows: {len(output_rows)}")
print(f"Wrote: {output_csv}")
if __name__ == "__main__":
main()