ApplyCRs / scripts /fetch_crs.py
heymenn's picture
init
7eedaf8
#!/usr/bin/env python3
"""
fetch_crs.py β€” Download CRs and TSs from a 3GPP/ETSI Excel contribution list.
Usage:
python3 fetch_crs.py <excel_path> <person_name> [--output-dir DIR]
Steps:
1. Parse Excel, filter Accepted CRs by person name
2. Download CR DOCXs via docfinder /find/tdoc/download
3. Parse CR cover pages to extract target TS spec + version
4. Download TS DOCXs via docfinder /find/docx
5. Print summary report
"""
import argparse
import os
import re
import sys
import time
import zipfile
from pathlib import Path
import requests
BASE_URL = "https://organizedprogrammers-docfinder.hf.space"
_proxy = os.environ.get("http_proxy") or None
PROXIES = {"http": _proxy, "https": os.environ.get("https_proxy") or None}
# ---------------------------------------------------------------------------
# Path helpers
# ---------------------------------------------------------------------------
def wsl_path(p: str) -> str:
"""Convert Windows path (C:\\...) to WSL path (/mnt/c/...) if needed."""
p = p.strip()
if len(p) >= 2 and p[1] == ":" and p[0].isalpha():
drive = p[0].lower()
rest = p[2:].replace("\\", "/")
return f"/mnt/{drive}{rest}"
return p
# ---------------------------------------------------------------------------
# Step 1 β€” Parse Excel
# ---------------------------------------------------------------------------
def parse_excel(excel_path: str, person_name: str):
"""
Return list of (uid, title) for Accepted CRs matching person_name.
Handles both .xls and .xlsx.
"""
path = Path(wsl_path(excel_path))
ext = path.suffix.lower()
if ext == ".xls":
return _parse_xls(path, person_name)
elif ext == ".xlsx":
return _parse_xlsx(path, person_name)
else:
raise ValueError(f"Unsupported file extension: {ext!r}. Expected .xls or .xlsx")
def _name_pattern(name: str) -> re.Pattern:
return re.compile(r"\b" + re.escape(name) + r"\b", re.IGNORECASE)
def _parse_xls(path: Path, person_name: str):
try:
import xlrd
except ImportError:
sys.exit("ERROR: xlrd is not installed. Run: pip install xlrd")
wb = xlrd.open_workbook(str(path))
# Try "Contributions" sheet first, fall back to first sheet
try:
ws = wb.sheet_by_name("Contributions")
except xlrd.XLRDError:
ws = wb.sheet_by_index(0)
# Row 0 is headers; row 1 is an empty duplicate β€” skip it
headers = [str(ws.cell_value(0, c)).strip() for c in range(ws.ncols)]
col = {h: i for i, h in enumerate(headers)}
uid_col = col.get("Uid") or col.get("UID") or col.get("uid")
type_col = col.get("Type") or col.get("type")
status_col = col.get("Status") or col.get("status")
by_col = col.get("SubmittedBy") or col.get("Submitted By") or col.get("submittedby")
title_col = col.get("Title") or col.get("title")
for name, c in [("Uid", uid_col), ("Type", type_col),
("Status", status_col), ("SubmittedBy", by_col)]:
if c is None:
raise ValueError(f"Column {name!r} not found. Available: {list(col.keys())}")
pattern = _name_pattern(person_name)
results = []
for r in range(2, ws.nrows): # skip header + empty duplicate
uid = str(ws.cell_value(r, uid_col)).strip()
doc_type = str(ws.cell_value(r, type_col)).strip()
status = str(ws.cell_value(r, status_col)).strip()
submitted_by = str(ws.cell_value(r, by_col)).strip()
title = str(ws.cell_value(r, title_col)).strip() if title_col is not None else ""
if doc_type != "CR":
continue
if status != "Accepted":
continue
if not pattern.search(submitted_by):
continue
results.append((uid, title))
return results
def _parse_xlsx(path: Path, person_name: str):
try:
import openpyxl
except ImportError:
sys.exit("ERROR: openpyxl is not installed. Run: pip install openpyxl")
wb = openpyxl.load_workbook(str(path), read_only=True, data_only=True)
ws = wb["Contributions"] if "Contributions" in wb.sheetnames else wb.active
rows = iter(ws.iter_rows(values_only=True))
# Row 0: headers
header_row = next(rows)
headers = [str(h).strip() if h is not None else "" for h in header_row]
col = {h: i for i, h in enumerate(headers)}
# Row 1: empty duplicate β€” skip
next(rows, None)
uid_col = col.get("Uid") or col.get("UID") or col.get("uid")
type_col = col.get("Type") or col.get("type")
status_col = col.get("Status") or col.get("status")
by_col = col.get("SubmittedBy") or col.get("Submitted By") or col.get("submittedby")
title_col = col.get("Title") or col.get("title")
for name, c in [("Uid", uid_col), ("Type", type_col),
("Status", status_col), ("SubmittedBy", by_col)]:
if c is None:
raise ValueError(f"Column {name!r} not found. Available: {list(col.keys())}")
pattern = _name_pattern(person_name)
results = []
for row in rows:
def cell(c):
v = row[c] if c < len(row) else None
return str(v).strip() if v is not None else ""
uid = cell(uid_col)
doc_type = cell(type_col)
status = cell(status_col)
submitted_by = cell(by_col)
title = cell(title_col) if title_col is not None else ""
if not uid:
continue
if doc_type != "CR":
continue
if status != "Accepted":
continue
if not pattern.search(submitted_by):
continue
results.append((uid, title))
return results
# ---------------------------------------------------------------------------
# Step 2 β€” Download CR DOCXs
# ---------------------------------------------------------------------------
def download_cr(uid: str, cr_dir: Path):
"""
Download CR DOCX for the given UID.
Returns:
(docx_path, note) β€” docx_path is the file to use for parsing
note is a human-readable string for the summary
Returns (None, error_msg) on failure.
"""
dest = cr_dir / f"{uid}.docx"
if dest.exists():
return dest, "already existed"
try:
resp = requests.post(
f"{BASE_URL}/find/tdoc/download",
json={"doc_id": uid},
proxies=PROXIES,
timeout=60,
)
except requests.RequestException as e:
return None, f"network error: {e}"
if not resp.ok:
return None, f"HTTP {resp.status_code}"
content = resp.content
if not content:
return None, "empty response"
dest.write_bytes(content)
# ZIP detection
if content[:4] == b"PK\x03\x04":
try:
with zipfile.ZipFile(dest) as zf:
docx_entries = [n for n in zf.namelist() if n.endswith(".docx")]
if docx_entries:
extracted_name = f"{uid}_extracted.docx"
extracted_path = cr_dir / extracted_name
with zf.open(docx_entries[0]) as src, open(extracted_path, "wb") as dst:
dst.write(src.read())
return extracted_path, "extracted from ZIP"
except zipfile.BadZipFile:
pass # Not actually a ZIP despite magic bytes β€” treat as raw DOCX
return dest, "downloaded"
# ---------------------------------------------------------------------------
# Step 3 β€” Parse CR Cover Pages
# ---------------------------------------------------------------------------
SPEC_PATTERN = re.compile(r"^\d{3}\s\d{3}$")
VERSION_PATTERN = re.compile(r"^\d+\.\d+\.\d+$")
def parse_cr_cover(docx_path: Path):
"""
Parse the CR cover table (tables[0]) to extract (spec_number, version).
Returns (spec_number, version) e.g. ("102 221", "18.3.0")
Returns (None, None) if parsing fails.
"""
try:
from docx import Document
except ImportError:
sys.exit("ERROR: python-docx is not installed. Run: pip install python-docx")
try:
doc = Document(str(docx_path))
except Exception as e:
return None, None
if not doc.tables:
return None, None
table = doc.tables[0]
# Collect all non-empty cell texts in order
cells = []
for row in table.rows:
for cell in row.cells:
text = cell.text.strip()
if text:
cells.append(text)
spec_number = None
version = None
for i, text in enumerate(cells):
# Look for spec number: "NNN NNN" pattern
if SPEC_PATTERN.match(text) and spec_number is None:
spec_number = text
# Look for version: cell immediately after "Current version:"
if text == "Current version:" and i + 1 < len(cells):
candidate = cells[i + 1]
if VERSION_PATTERN.match(candidate):
version = candidate
# Also accept "Current version" without colon
if text in ("Current version:", "Current version") and version is None:
if i + 1 < len(cells) and VERSION_PATTERN.match(cells[i + 1]):
version = cells[i + 1]
return spec_number, version
# ---------------------------------------------------------------------------
# Step 4 β€” Download TS DOCXs
# ---------------------------------------------------------------------------
def _is_html(resp: requests.Response) -> bool:
"""Return True if the response body is an HTML page (e.g. HF Space loading page)."""
ct = resp.headers.get("content-type", "")
if "text/html" in ct:
return True
return resp.content[:5].lower() in (b"<!doc", b"<html")
def download_ts(spec_number: str, version: str, ts_dir: Path,
max_retries: int = 3, retry_delay: int = 10):
"""
Download TS DOCX for spec_number (e.g. "102 221") and version (e.g. "18.3.0").
Retries up to max_retries times when the HF Space returns an HTML loading page
instead of the DOCX binary (happens on cold-start / brief restarts).
Returns (filename, note) or (None, error_msg).
"""
spec_no_space = spec_number.replace(" ", "")
filename = f"ts_{spec_no_space}_v{version}.docx"
dest = ts_dir / filename
if dest.exists():
return filename, "already existed"
last_error = "no attempts made"
for attempt in range(1, max_retries + 1):
try:
resp = requests.post(
f"{BASE_URL}/find/docx",
json={"doc_id": spec_number, "version": version},
proxies=PROXIES,
timeout=120,
)
except requests.RequestException as e:
return None, f"network error: {e}"
if not resp.ok:
return None, f"HTTP {resp.status_code}"
content = resp.content
if not content:
return None, "empty response"
# Detect HTML splash page (HF Space cold-start) β€” retry after a delay
if _is_html(resp):
last_error = f"got HTML instead of DOCX (attempt {attempt}/{max_retries})"
if attempt < max_retries:
print(f"\n [retry in {retry_delay}s β€” HF Space loading…]", flush=True)
time.sleep(retry_delay)
continue
return None, f"invalid file (not a ZIP/DOCX, starts with {content[:4]!r}) after {max_retries} attempts"
# Good binary response
dest.write_bytes(content)
if content[:2] != b"PK":
dest.unlink()
return None, f"invalid file (not a ZIP/DOCX, starts with {content[:4]!r})"
# Verify the TS contains the expected spec number in its first paragraph
try:
import docx as _docx
_doc = _docx.Document(dest)
first_para = _doc.paragraphs[0].text if _doc.paragraphs else ''
if spec_no_space not in first_para.replace(' ', ''):
dest.unlink()
return None, f"wrong TS returned by API: got {first_para[:80]!r} (expected spec {spec_no_space})"
except Exception:
pass # Trust the ZIP check above
note = "downloaded" if attempt == 1 else f"downloaded (after {attempt} attempts)"
return filename, note
return None, last_error
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Download CRs and TSs from a 3GPP/ETSI Excel contribution list."
)
parser.add_argument("excel_path", help="Path to .xls or .xlsx contribution list")
parser.add_argument("person_name", help="Name to search for in SubmittedBy column")
parser.add_argument(
"--output-dir",
default=str(Path.home() / "CR_Processing"),
help="Base output directory (default: ~/CR_Processing)",
)
args = parser.parse_args()
excel_path = wsl_path(args.excel_path)
person_name = args.person_name
output_dir = Path(wsl_path(args.output_dir)).expanduser()
cr_dir = output_dir / "CRs"
ts_dir = output_dir / "TS"
cr_dir.mkdir(parents=True, exist_ok=True)
ts_dir.mkdir(parents=True, exist_ok=True)
# --- Step 1: Parse Excel ---
print(f"Parsing Excel: {excel_path}")
print(f"Filtering for: {person_name!r} | Type=CR | Status=Accepted\n")
try:
cr_list = parse_excel(excel_path, person_name)
except Exception as e:
sys.exit(f"ERROR parsing Excel: {e}")
print(f"Found {len(cr_list)} matching CR(s).\n")
if not cr_list:
print("Nothing to download.")
return
# --- Step 2: Download CR DOCXs ---
print("Downloading CRs...")
cr_results = [] # list of (uid, docx_path_or_None, note)
for uid, title in cr_list:
print(f" [{uid}] ", end="", flush=True)
docx_path, note = download_cr(uid, cr_dir)
cr_results.append((uid, docx_path, note))
if docx_path:
print(f"OK ({note}) β€” {docx_path.name}")
else:
print(f"FAILED β€” {note}")
print()
# --- Step 3: Parse cover pages ---
print("Parsing CR cover pages...")
ts_targets = {} # (spec_number, version) -> list of uids
for uid, docx_path, note in cr_results:
if docx_path is None:
continue
spec_number, version = parse_cr_cover(docx_path)
if spec_number and version:
key = (spec_number, version)
ts_targets.setdefault(key, []).append(uid)
print(f" [{uid}] β†’ TS {spec_number} v{version}")
else:
print(f" [{uid}] WARNING: could not parse cover page (spec/version not found)")
print()
# --- Step 4: Download TSs ---
print("Downloading TSs...")
ts_results = [] # list of (spec_number, version, filename_or_None, note)
for (spec_number, version), uids in ts_targets.items():
print(f" [TS {spec_number} v{version}] ", end="", flush=True)
filename, note = download_ts(spec_number, version, ts_dir)
ts_results.append((spec_number, version, filename, note))
if filename:
print(f"OK ({note}) β€” {filename}")
else:
print(f"FAILED β€” {note}")
print()
# --- Step 5: Summary ---
print("=" * 50)
print("=== fetch-crs summary ===")
print(f"Person: {person_name}")
print(f"Excel: {excel_path}")
print(f"CRs found: {len(cr_list)} (Accepted, Type=CR)")
print()
print("CRs downloaded:")
for uid, docx_path, note in cr_results:
if docx_path:
print(f" βœ“ {docx_path.name} [{note}]")
else:
print(f" βœ— {uid} β€” {note}")
print()
print("TSs downloaded:")
for spec_number, version, filename, note in ts_results:
if filename:
print(f" βœ“ {filename} [{note}]")
else:
print(f" βœ— ts_{spec_number.replace(' ', '')} v{version} β€” {note}")
print()
print(f"Output: {output_dir}/")
if __name__ == "__main__":
main()