| | """Discover new Pashto-related resource candidates from public endpoints. |
| | |
| | This script does not auto-merge into the main catalog. It writes candidates to |
| | `resources/catalog/pending_candidates.json` for maintainer review. |
| | |
| | Usage: |
| | python scripts/sync_resources.py |
| | python scripts/sync_resources.py --limit 20 --output resources/catalog/pending_candidates.json |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import argparse |
| | import json |
| | import re |
| | import socket |
| | import ssl |
| | import time |
| | import urllib.parse |
| | import urllib.request |
| | import xml.etree.ElementTree as ET |
| | from datetime import datetime, timezone |
| | from email.utils import parsedate_to_datetime |
| | from html import unescape |
| | from http.client import IncompleteRead |
| | from pathlib import Path |
| | from typing import Any |
| | from urllib.error import HTTPError, URLError |
| |
|
| |
|
| | USER_AGENT = "pashto-resource-sync/1.0" |
| | MAX_FETCH_RETRIES = 4 |
| | RETRYABLE_HTTP_CODES = {429, 500, 502, 503, 504} |
| | PASHTO_QUERY_TERMS = ["pashto", "pukhto", "pushto", "pakhto"] |
| | PASHTO_TEXT_MARKERS = ("pashto", "pukhto", "pushto", "pakhto") |
| | PASHTO_SCRIPT_MARKERS = ("پښتو", "پشتو") |
| | PASHTO_WORD_RE = re.compile( |
| | r"(?<![A-Za-z0-9])(pashto|pukhto|pushto|pakhto)(?![A-Za-z0-9])", |
| | re.IGNORECASE, |
| | ) |
| | PASHTO_CAMEL_RE = re.compile( |
| | r"(?<![A-Za-z0-9])(pashto|pukhto|pakhto)(?=[A-Z])", |
| | re.IGNORECASE, |
| | ) |
| | PASHTO_CODE_RE = re.compile(r"\b(ps(_af)?|pus|pbt[_-]?arab)\b", re.IGNORECASE) |
| | LOW_SIGNAL_RE = re.compile(r"(^|[-_/ ])(test|tmp|trial|scratch)([-_/ ]|$)", re.IGNORECASE) |
| |
|
| |
|
| | def _slug(value: str) -> str: |
| | value = value.lower() |
| | value = re.sub(r"[^a-z0-9]+", "-", value) |
| | value = re.sub(r"-+", "-", value).strip("-") |
| | return value[:80] if value else "resource" |
| |
|
| |
|
| | def _contains_pashto_marker(value: str) -> bool: |
| | text = (value or "").strip() |
| | if not text: |
| | return False |
| | if PASHTO_WORD_RE.search(text): |
| | return True |
| | if PASHTO_CAMEL_RE.search(text): |
| | return True |
| | if any(marker in text for marker in PASHTO_SCRIPT_MARKERS): |
| | return True |
| | lowered = text.casefold() |
| | return bool(PASHTO_CODE_RE.search(lowered)) |
| |
|
| |
|
| | def _is_pashto_centric(*values: str) -> bool: |
| | return any(_contains_pashto_marker(value) for value in values) |
| |
|
| |
|
| | def _is_low_signal_name(value: str) -> bool: |
| | return bool(LOW_SIGNAL_RE.search(value or "")) |
| |
|
| |
|
| | def _strip_html(value: str) -> str: |
| | text = re.sub(r"<[^>]+>", " ", value or "") |
| | text = unescape(text) |
| | return re.sub(r"\s+", " ", text).strip() |
| |
|
| |
|
| | def _classify_repo_category(name_blob: str) -> str: |
| | lowered = (name_blob or "").casefold() |
| | code_tokens = ("toolkit", "library", "nlp", "asr", "tts", "ocr", "api", "code", "cli", "sdk") |
| | if any(token in lowered for token in code_tokens): |
| | return "code" |
| | return "project" |
| |
|
| |
|
| | def _parse_retry_after_seconds(retry_after: str | None) -> float | None: |
| | if not retry_after: |
| | return None |
| |
|
| | retry_after = retry_after.strip() |
| | if not retry_after: |
| | return None |
| |
|
| | if retry_after.isdigit(): |
| | return float(retry_after) |
| |
|
| | try: |
| | retry_at = parsedate_to_datetime(retry_after) |
| | except (TypeError, ValueError): |
| | return None |
| |
|
| | now = datetime.now(timezone.utc) |
| | if retry_at.tzinfo is None: |
| | retry_at = retry_at.replace(tzinfo=timezone.utc) |
| | return max(0.0, (retry_at - now).total_seconds()) |
| |
|
| |
|
| | def _is_ssl_cert_error(exc: BaseException) -> bool: |
| | if isinstance(exc, ssl.SSLCertVerificationError): |
| | return True |
| | if isinstance(exc, URLError): |
| | reason = exc.reason |
| | if isinstance(reason, ssl.SSLCertVerificationError): |
| | return True |
| | return "CERTIFICATE_VERIFY_FAILED" in str(exc) |
| |
|
| |
|
| | def _retryable_network_error(exc: BaseException) -> bool: |
| | if _is_ssl_cert_error(exc): |
| | return False |
| | if isinstance(exc, (TimeoutError, socket.timeout, IncompleteRead, ConnectionResetError)): |
| | return True |
| | if isinstance(exc, URLError): |
| | reason = exc.reason |
| | if isinstance(reason, (TimeoutError, socket.timeout, IncompleteRead, ConnectionResetError)): |
| | return True |
| | return True |
| | return False |
| |
|
| |
|
| | def _retry_delay(attempt: int, retry_after: str | None = None) -> float: |
| | parsed = _parse_retry_after_seconds(retry_after) |
| | if parsed is not None: |
| | return min(max(parsed, 0.0), 60.0) |
| | return min(2 ** (attempt - 1), 30.0) |
| |
|
| |
|
| | def _fetch_bytes( |
| | url: str, |
| | *, |
| | timeout: float = 20.0, |
| | ssl_context: ssl.SSLContext | None = None, |
| | source_name: str = "remote", |
| | ) -> bytes: |
| | req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) |
| | last_exc: BaseException | None = None |
| |
|
| | for attempt in range(1, MAX_FETCH_RETRIES + 1): |
| | try: |
| | with urllib.request.urlopen(req, timeout=timeout, context=ssl_context) as response: |
| | return response.read() |
| | except HTTPError as exc: |
| | last_exc = exc |
| | if exc.code in RETRYABLE_HTTP_CODES and attempt < MAX_FETCH_RETRIES: |
| | delay = _retry_delay(attempt, exc.headers.get("Retry-After")) |
| | print( |
| | f"[retry] {source_name} HTTP {exc.code} from {url}; " |
| | f"retrying in {delay:.1f}s ({attempt}/{MAX_FETCH_RETRIES})" |
| | ) |
| | time.sleep(delay) |
| | continue |
| | raise |
| | except Exception as exc: |
| | last_exc = exc |
| | if _retryable_network_error(exc) and attempt < MAX_FETCH_RETRIES: |
| | delay = _retry_delay(attempt) |
| | print( |
| | f"[retry] {source_name} network error from {url}: {exc}; " |
| | f"retrying in {delay:.1f}s ({attempt}/{MAX_FETCH_RETRIES})" |
| | ) |
| | time.sleep(delay) |
| | continue |
| | raise |
| |
|
| | if last_exc is not None: |
| | raise RuntimeError(f"{source_name} fetch failed after retries: {last_exc}") from last_exc |
| | raise RuntimeError(f"{source_name} fetch failed unexpectedly for {url}") |
| |
|
| |
|
| | def _fetch_json( |
| | url: str, |
| | *, |
| | timeout: float = 20.0, |
| | ssl_context: ssl.SSLContext | None = None, |
| | source_name: str = "remote", |
| | ) -> Any: |
| | payload = _fetch_bytes( |
| | url, |
| | timeout=timeout, |
| | ssl_context=ssl_context, |
| | source_name=source_name, |
| | ) |
| | return json.loads(payload.decode("utf-8")) |
| |
|
| |
|
| | def _fetch_text( |
| | url: str, |
| | *, |
| | timeout: float = 20.0, |
| | ssl_context: ssl.SSLContext | None = None, |
| | source_name: str = "remote", |
| | ) -> str: |
| | payload = _fetch_bytes( |
| | url, |
| | timeout=timeout, |
| | ssl_context=ssl_context, |
| | source_name=source_name, |
| | ) |
| | return payload.decode("utf-8", errors="replace") |
| |
|
| |
|
| | def _candidate( |
| | *, |
| | rid: str, |
| | title: str, |
| | url: str, |
| | category: str, |
| | source: str, |
| | summary: str, |
| | evidence_text: str, |
| | evidence_url: str, |
| | markers: list[str], |
| | tags: list[str], |
| | ) -> dict[str, Any]: |
| | return { |
| | "id": rid, |
| | "title": title.strip(), |
| | "url": url.strip(), |
| | "category": category, |
| | "source": source, |
| | "status": "candidate", |
| | "summary": summary.strip(), |
| | "primary_use": "Needs maintainer review before promotion to verified catalog.", |
| | "tasks": [], |
| | "pashto_evidence": { |
| | "evidence_text": evidence_text.strip(), |
| | "evidence_url": evidence_url.strip(), |
| | "markers": markers, |
| | }, |
| | "tags": tags, |
| | } |
| |
|
| |
|
| | def fetch_huggingface(kind: str, limit: int) -> list[dict[str, Any]]: |
| | if kind not in {"datasets", "models"}: |
| | return [] |
| |
|
| | combined: dict[str, dict[str, Any]] = {} |
| | errors: list[str] = [] |
| | for term in PASHTO_QUERY_TERMS: |
| | query = urllib.parse.urlencode({"search": term, "limit": str(limit)}) |
| | url = f"https://huggingface.co/api/{kind}?{query}" |
| | try: |
| | payload = _fetch_json(url, source_name=f"huggingface-{kind}") |
| | except Exception as exc: |
| | errors.append(f"{term}: {exc}") |
| | continue |
| | for item in payload: |
| | repo_id = item.get("id") or item.get("modelId") |
| | if not repo_id: |
| | continue |
| | combined[repo_id] = item |
| |
|
| | if not combined and errors: |
| | raise RuntimeError("; ".join(errors)) |
| |
|
| | category = "dataset" if kind == "datasets" else "model" |
| | out: list[dict[str, Any]] = [] |
| | for item in combined.values(): |
| | repo_id = item.get("id") or item.get("modelId") |
| | if not repo_id: |
| | continue |
| | if not _is_pashto_centric(repo_id): |
| | continue |
| | if _is_low_signal_name(repo_id): |
| | continue |
| | repo_url = f"https://huggingface.co/{'datasets/' if kind == 'datasets' else ''}{repo_id}" |
| | rid = f"candidate-hf-{kind[:-1]}-{_slug(repo_id)}" |
| | out.append( |
| | _candidate( |
| | rid=rid, |
| | title=repo_id, |
| | url=repo_url, |
| | category=category, |
| | source="huggingface", |
| | summary=f"Candidate {category} returned from Hugging Face search for Pashto.", |
| | evidence_text="Matched by Pashto keyword in Hugging Face search results.", |
| | evidence_url=repo_url, |
| | markers=["pashto"], |
| | tags=["pashto", "candidate", category], |
| | ) |
| | ) |
| | if len(out) >= limit: |
| | break |
| | return out |
| |
|
| |
|
| | def fetch_huggingface_spaces(limit: int) -> list[dict[str, Any]]: |
| | combined: dict[str, dict[str, Any]] = {} |
| | errors: list[str] = [] |
| | for term in PASHTO_QUERY_TERMS: |
| | query = urllib.parse.urlencode({"search": term, "limit": str(limit)}) |
| | url = f"https://huggingface.co/api/spaces?{query}" |
| | try: |
| | payload = _fetch_json(url, source_name="huggingface-spaces") |
| | except Exception as exc: |
| | errors.append(f"{term}: {exc}") |
| | continue |
| | for item in payload: |
| | space_id = item.get("id") |
| | if not space_id: |
| | continue |
| | combined[space_id] = item |
| |
|
| | if not combined and errors: |
| | raise RuntimeError("; ".join(errors)) |
| |
|
| | out: list[dict[str, Any]] = [] |
| | for item in combined.values(): |
| | space_id = item.get("id") |
| | if not space_id: |
| | continue |
| | if not _is_pashto_centric(space_id): |
| | continue |
| | if _is_low_signal_name(space_id): |
| | continue |
| | space_url = f"https://huggingface.co/spaces/{space_id}" |
| | rid = f"candidate-hf-project-{_slug(space_id)}" |
| | summary = "Candidate project app returned from Hugging Face Spaces Pashto search." |
| | out.append( |
| | _candidate( |
| | rid=rid, |
| | title=space_id, |
| | url=space_url, |
| | category="project", |
| | source="huggingface", |
| | summary=summary, |
| | evidence_text="Matched by Pashto keyword in Hugging Face Spaces search.", |
| | evidence_url=space_url, |
| | markers=["pashto"], |
| | tags=["pashto", "candidate", "project", "space"], |
| | ) |
| | ) |
| | if len(out) >= limit: |
| | break |
| | return out |
| |
|
| |
|
| | def fetch_kaggle_datasets(limit: int) -> list[dict[str, Any]]: |
| | |
| | combined: list[dict[str, Any]] = [] |
| | seen_urls: set[str] = set() |
| | errors: list[str] = [] |
| | for term in PASHTO_QUERY_TERMS: |
| | query = urllib.parse.urlencode({"search": term, "page": "1"}) |
| | url = f"https://www.kaggle.com/api/v1/datasets/list?{query}" |
| | try: |
| | payload = _fetch_json(url, source_name="kaggle-datasets") |
| | except Exception as exc: |
| | errors.append(f"{term}: {exc}") |
| | continue |
| | for item in payload: |
| | dataset_url = (item.get("urlNullable") or "").strip() |
| | if not dataset_url or dataset_url in seen_urls: |
| | continue |
| | seen_urls.add(dataset_url) |
| | combined.append(item) |
| |
|
| | if not combined and errors: |
| | raise RuntimeError("; ".join(errors)) |
| |
|
| | out: list[dict[str, Any]] = [] |
| | for item in combined: |
| | title = (item.get("titleNullable") or "").strip() |
| | dataset_url = (item.get("urlNullable") or "").strip() |
| | owner = (item.get("ownerRefNullable") or "").strip() |
| | subtitle = (item.get("subtitleNullable") or "").strip() |
| | if not title or not dataset_url: |
| | continue |
| |
|
| | if not _is_pashto_centric(title, subtitle): |
| | continue |
| | if _is_low_signal_name(title): |
| | continue |
| |
|
| | owner_prefix = f"{owner}/" if owner else "" |
| | rid = f"candidate-kaggle-dataset-{_slug(owner_prefix + title)}" |
| | out.append( |
| | _candidate( |
| | rid=rid, |
| | title=title, |
| | url=dataset_url, |
| | category="dataset", |
| | source="kaggle", |
| | summary=(subtitle or "Candidate Kaggle dataset returned from Pashto search.")[:240], |
| | evidence_text="Kaggle dataset title/subtitle includes Pashto keyword.", |
| | evidence_url=dataset_url, |
| | markers=["Pashto"], |
| | tags=["pashto", "candidate", "dataset", "kaggle"], |
| | ) |
| | ) |
| | if len(out) >= limit: |
| | break |
| | return out |
| |
|
| |
|
| | def fetch_github_pashto_repos(limit: int) -> list[dict[str, Any]]: |
| | |
| | query_variants = [ |
| | "topic:pashto", |
| | "topic:pukhto", |
| | "pashto in:name,description,readme", |
| | "pukhto in:name,description,readme", |
| | "pushto in:name,description,readme", |
| | "pakhto in:name,description,readme", |
| | ] |
| |
|
| | combined: dict[str, dict[str, Any]] = {} |
| | for query_text in query_variants: |
| | query = urllib.parse.urlencode( |
| | {"q": query_text, "sort": "stars", "order": "desc", "per_page": str(limit)} |
| | ) |
| | url = f"https://api.github.com/search/repositories?{query}" |
| | payload = _fetch_json( |
| | url, |
| | timeout=30.0, |
| | source_name="github-repositories", |
| | ) |
| | for item in payload.get("items", []): |
| | full_name = item.get("full_name") |
| | html_url = item.get("html_url") |
| | if not full_name or not html_url: |
| | continue |
| | combined[full_name] = item |
| |
|
| | out: list[dict[str, Any]] = [] |
| | for full_name, item in sorted(combined.items(), key=lambda kv: kv[1].get("stargazers_count", 0), reverse=True): |
| | name_blob = " ".join( |
| | [ |
| | full_name or "", |
| | item.get("name") or "", |
| | item.get("description") or "", |
| | " ".join(item.get("topics") or []), |
| | ] |
| | ) |
| | if not _is_pashto_centric(name_blob): |
| | continue |
| | if _is_low_signal_name(full_name): |
| | continue |
| |
|
| | html_url = item["html_url"] |
| | topics = item.get("topics") or [] |
| | category = _classify_repo_category(name_blob) |
| |
|
| | rid = f"candidate-gh-{category}-{_slug(full_name)}" |
| | description = (item.get("description") or "").strip() |
| | summary = description or "Candidate Pashto-related GitHub repository." |
| | out.append( |
| | _candidate( |
| | rid=rid, |
| | title=full_name, |
| | url=html_url, |
| | category=category, |
| | source="github", |
| | summary=summary[:240] if summary else "Candidate Pashto-related GitHub repository.", |
| | evidence_text="Repository metadata (name/description/topics) includes Pashto markers.", |
| | evidence_url=html_url, |
| | markers=["pashto"], |
| | tags=["pashto", "candidate", category, "github", *(topics[:3])], |
| | ) |
| | ) |
| | if len(out) >= limit: |
| | break |
| | return out |
| |
|
| |
|
| | def fetch_gitlab_pashto_projects(limit: int) -> list[dict[str, Any]]: |
| | combined: dict[str, dict[str, Any]] = {} |
| | errors: list[str] = [] |
| | for term in PASHTO_QUERY_TERMS: |
| | query = urllib.parse.urlencode( |
| | { |
| | "search": term, |
| | "simple": "true", |
| | "order_by": "star_count", |
| | "sort": "desc", |
| | "per_page": str(limit), |
| | } |
| | ) |
| | url = f"https://gitlab.com/api/v4/projects?{query}" |
| | try: |
| | payload = _fetch_json(url, timeout=30.0, source_name="gitlab-projects") |
| | except Exception as exc: |
| | errors.append(f"{term}: {exc}") |
| | continue |
| | for item in payload: |
| | full_name = (item.get("path_with_namespace") or item.get("name_with_namespace") or "").strip() |
| | if not full_name: |
| | continue |
| | combined[full_name] = item |
| |
|
| | if not combined and errors: |
| | raise RuntimeError("; ".join(errors)) |
| |
|
| | out: list[dict[str, Any]] = [] |
| | sorted_items = sorted( |
| | combined.items(), |
| | key=lambda kv: kv[1].get("star_count") or 0, |
| | reverse=True, |
| | ) |
| | for full_name, item in sorted_items: |
| | web_url = (item.get("web_url") or "").strip() |
| | if not web_url: |
| | continue |
| |
|
| | description = (item.get("description") or "").strip() |
| | topics = item.get("topics") or [] |
| | if not isinstance(topics, list): |
| | topics = [] |
| | topics = [str(topic).strip() for topic in topics if str(topic).strip()] |
| | name_blob = " ".join([full_name, item.get("name") or "", description, " ".join(topics)]) |
| | if not _is_pashto_centric(name_blob): |
| | continue |
| | if _is_low_signal_name(full_name): |
| | continue |
| |
|
| | category = _classify_repo_category(name_blob) |
| | rid = f"candidate-gitlab-{category}-{_slug(full_name)}" |
| | summary = description or "Candidate Pashto-related GitLab repository." |
| | out.append( |
| | _candidate( |
| | rid=rid, |
| | title=full_name, |
| | url=web_url, |
| | category=category, |
| | source="gitlab", |
| | summary=summary[:240] if summary else "Candidate Pashto-related GitLab repository.", |
| | evidence_text="Project metadata (name/description/topics) includes Pashto markers.", |
| | evidence_url=web_url, |
| | markers=["pashto"], |
| | tags=["pashto", "candidate", category, "gitlab", *(topics[:3])], |
| | ) |
| | ) |
| | if len(out) >= limit: |
| | break |
| | return out |
| |
|
| |
|
| | def fetch_openalex_papers(limit: int) -> list[dict[str, Any]]: |
| | combined: dict[str, dict[str, Any]] = {} |
| | errors: list[str] = [] |
| | for term in PASHTO_QUERY_TERMS: |
| | query = urllib.parse.urlencode({"search": term, "per-page": str(limit)}) |
| | url = f"https://api.openalex.org/works?{query}" |
| | try: |
| | payload = _fetch_json(url, timeout=30.0, source_name="openalex") |
| | except Exception as exc: |
| | errors.append(f"{term}: {exc}") |
| | continue |
| | for item in payload.get("results", []): |
| | work_id = (item.get("id") or "").strip() |
| | if not work_id: |
| | continue |
| | combined[work_id] = item |
| |
|
| | if not combined and errors: |
| | raise RuntimeError("; ".join(errors)) |
| |
|
| | out: list[dict[str, Any]] = [] |
| | for item in combined.values(): |
| | title = (item.get("display_name") or "").strip() |
| | if not title: |
| | continue |
| | if not _is_pashto_centric(title): |
| | continue |
| | if _is_low_signal_name(title): |
| | continue |
| |
|
| | doi = (item.get("doi") or "").strip() |
| | if doi and not doi.startswith("http"): |
| | doi = f"https://doi.org/{doi}" |
| | primary = item.get("primary_location") or {} |
| | landing = (primary.get("landing_page_url") or "").strip() |
| | paper_url = doi or landing or (item.get("id") or "").strip() |
| | if not paper_url: |
| | continue |
| |
|
| | rid = f"candidate-openalex-{_slug(title)}" |
| | out.append( |
| | _candidate( |
| | rid=rid, |
| | title=title, |
| | url=paper_url, |
| | category="paper", |
| | source="openalex", |
| | summary="Candidate paper returned from OpenAlex works search for Pashto.", |
| | evidence_text="Matched by explicit Pashto marker in title from OpenAlex works search.", |
| | evidence_url=paper_url, |
| | markers=["pashto"], |
| | tags=["pashto", "candidate", "paper", "openalex"], |
| | ) |
| | ) |
| | if len(out) >= limit: |
| | break |
| | return out |
| |
|
| |
|
| | def fetch_crossref_papers(limit: int) -> list[dict[str, Any]]: |
| | combined: dict[str, dict[str, Any]] = {} |
| | errors: list[str] = [] |
| | for term in PASHTO_QUERY_TERMS: |
| | query = urllib.parse.urlencode({"query.title": term, "rows": str(limit)}) |
| | url = f"https://api.crossref.org/works?{query}" |
| | try: |
| | payload = _fetch_json(url, timeout=30.0, source_name="crossref") |
| | except Exception as exc: |
| | errors.append(f"{term}: {exc}") |
| | continue |
| | for item in payload.get("message", {}).get("items", []): |
| | doi = (item.get("DOI") or "").strip() |
| | title_list = item.get("title") or [] |
| | title = (title_list[0] if isinstance(title_list, list) and title_list else "").strip() |
| | key = doi or title |
| | if not key: |
| | continue |
| | combined[key] = item |
| |
|
| | if not combined and errors: |
| | raise RuntimeError("; ".join(errors)) |
| |
|
| | out: list[dict[str, Any]] = [] |
| | for item in combined.values(): |
| | title_list = item.get("title") or [] |
| | title = (title_list[0] if isinstance(title_list, list) and title_list else "").strip() |
| | if not title: |
| | continue |
| | if not _is_pashto_centric(title): |
| | continue |
| | if _is_low_signal_name(title): |
| | continue |
| |
|
| | doi = (item.get("DOI") or "").strip() |
| | paper_url = (item.get("URL") or "").strip() |
| | if not paper_url and doi: |
| | paper_url = f"https://doi.org/{doi}" |
| | if not paper_url: |
| | continue |
| |
|
| | abstract = _strip_html(item.get("abstract") or "") |
| | rid = f"candidate-crossref-{_slug(title)}" |
| | out.append( |
| | _candidate( |
| | rid=rid, |
| | title=title, |
| | url=paper_url, |
| | category="paper", |
| | source="crossref", |
| | summary=(abstract or "Candidate paper returned from Crossref search for Pashto.")[:240], |
| | evidence_text="Matched by explicit Pashto marker in title from Crossref search.", |
| | evidence_url=paper_url, |
| | markers=["pashto"], |
| | tags=["pashto", "candidate", "paper", "crossref"], |
| | ) |
| | ) |
| | if len(out) >= limit: |
| | break |
| | return out |
| |
|
| |
|
| | def fetch_zenodo_records(limit: int) -> list[dict[str, Any]]: |
| | combined: dict[str, dict[str, Any]] = {} |
| | errors: list[str] = [] |
| | for term in PASHTO_QUERY_TERMS: |
| | query = urllib.parse.urlencode({"q": term, "size": str(limit), "sort": "mostrecent"}) |
| | url = f"https://zenodo.org/api/records/?{query}" |
| | try: |
| | payload = _fetch_json(url, timeout=30.0, source_name="zenodo") |
| | except Exception as exc: |
| | errors.append(f"{term}: {exc}") |
| | continue |
| | for item in payload.get("hits", {}).get("hits", []): |
| | record_id = str(item.get("id") or "").strip() |
| | if not record_id: |
| | continue |
| | combined[record_id] = item |
| |
|
| | if not combined and errors: |
| | raise RuntimeError("; ".join(errors)) |
| |
|
| | category_map = { |
| | "dataset": "dataset", |
| | "software": "code", |
| | "publication": "paper", |
| | "poster": "project", |
| | "presentation": "project", |
| | } |
| |
|
| | out: list[dict[str, Any]] = [] |
| | for item in combined.values(): |
| | metadata = item.get("metadata") or {} |
| | title = (metadata.get("title") or "").strip() |
| | description = _strip_html(metadata.get("description") or "") |
| | if not title: |
| | continue |
| | if not _is_pashto_centric(title, description): |
| | continue |
| | if _is_low_signal_name(title): |
| | continue |
| |
|
| | links = item.get("links") or {} |
| | record_url = (links.get("self_html") or links.get("doi") or "").strip() |
| | if not record_url: |
| | doi = (metadata.get("doi") or "").strip() |
| | if doi: |
| | record_url = f"https://doi.org/{doi}" |
| | if not record_url: |
| | continue |
| |
|
| | rtype = (metadata.get("resource_type") or {}).get("type") or "" |
| | category = category_map.get(str(rtype).casefold(), "project") |
| | rid = f"candidate-zenodo-{category}-{_slug(title)}" |
| | summary = description or "Candidate resource returned from Zenodo search for Pashto." |
| | out.append( |
| | _candidate( |
| | rid=rid, |
| | title=title, |
| | url=record_url, |
| | category=category, |
| | source="zenodo", |
| | summary=summary[:240], |
| | evidence_text="Zenodo metadata includes Pashto markers in title or description.", |
| | evidence_url=record_url, |
| | markers=["pashto"], |
| | tags=["pashto", "candidate", category, "zenodo"], |
| | ) |
| | ) |
| | if len(out) >= limit: |
| | break |
| | return out |
| |
|
| |
|
| | def fetch_dataverse_datasets(limit: int) -> list[dict[str, Any]]: |
| | combined: dict[str, dict[str, Any]] = {} |
| | errors: list[str] = [] |
| | base_url = "https://dataverse.harvard.edu" |
| | for term in PASHTO_QUERY_TERMS: |
| | query = urllib.parse.urlencode( |
| | { |
| | "q": term, |
| | "type": "dataset", |
| | "per_page": str(limit), |
| | "start": "0", |
| | } |
| | ) |
| | url = f"{base_url}/api/search?{query}" |
| | try: |
| | payload = _fetch_json(url, timeout=30.0, source_name="dataverse") |
| | except Exception as exc: |
| | errors.append(f"{term}: {exc}") |
| | continue |
| | for item in payload.get("data", {}).get("items", []): |
| | key = str(item.get("global_id") or item.get("identifier") or item.get("url") or "").strip() |
| | if not key: |
| | continue |
| | combined[key] = item |
| |
|
| | if not combined and errors: |
| | raise RuntimeError("; ".join(errors)) |
| |
|
| | out: list[dict[str, Any]] = [] |
| | for item in combined.values(): |
| | title = (item.get("name") or "").strip() |
| | description = (item.get("description") or "").strip() |
| | if not title: |
| | continue |
| | if not _is_pashto_centric(title, description): |
| | continue |
| | if _is_low_signal_name(title): |
| | continue |
| |
|
| | record_url = (item.get("url") or "").strip() |
| | if record_url and record_url.startswith("/"): |
| | record_url = f"{base_url}{record_url}" |
| | if not record_url: |
| | global_id = (item.get("global_id") or "").strip() |
| | if global_id: |
| | escaped_id = urllib.parse.quote(global_id, safe=":/") |
| | record_url = f"{base_url}/dataset.xhtml?persistentId={escaped_id}" |
| | if not record_url: |
| | continue |
| |
|
| | rid = f"candidate-dataverse-dataset-{_slug(title)}" |
| | out.append( |
| | _candidate( |
| | rid=rid, |
| | title=title, |
| | url=record_url, |
| | category="dataset", |
| | source="dataverse", |
| | summary=(description or "Candidate dataset returned from Dataverse search for Pashto.")[:240], |
| | evidence_text="Dataverse metadata includes Pashto markers in dataset title or description.", |
| | evidence_url=record_url, |
| | markers=["pashto"], |
| | tags=["pashto", "candidate", "dataset", "dataverse"], |
| | ) |
| | ) |
| | if len(out) >= limit: |
| | break |
| | return out |
| |
|
| |
|
| | def fetch_datacite_records(limit: int) -> list[dict[str, Any]]: |
| | combined: dict[str, dict[str, Any]] = {} |
| | errors: list[str] = [] |
| | for term in PASHTO_QUERY_TERMS: |
| | query = urllib.parse.urlencode( |
| | { |
| | "query": term, |
| | "page[size]": str(limit), |
| | } |
| | ) |
| | url = f"https://api.datacite.org/dois?{query}" |
| | try: |
| | payload = _fetch_json(url, timeout=30.0, source_name="datacite") |
| | except Exception as exc: |
| | errors.append(f"{term}: {exc}") |
| | continue |
| | for item in payload.get("data", []): |
| | record_id = (item.get("id") or "").strip() |
| | if not record_id: |
| | continue |
| | combined[record_id] = item |
| |
|
| | if not combined and errors: |
| | raise RuntimeError("; ".join(errors)) |
| |
|
| | dataset_types = {"dataset", "collection"} |
| | software_types = {"software"} |
| | paper_types = {"journalarticle", "conferencepaper", "preprint", "text"} |
| |
|
| | out: list[dict[str, Any]] = [] |
| | for item in combined.values(): |
| | attributes = item.get("attributes") or {} |
| | titles = attributes.get("titles") or [] |
| | title = "" |
| | if isinstance(titles, list) and titles: |
| | first = titles[0] or {} |
| | if isinstance(first, dict): |
| | title = (first.get("title") or "").strip() |
| | if not title: |
| | continue |
| | description_items = attributes.get("descriptions") or [] |
| | descriptions: list[str] = [] |
| | if isinstance(description_items, list): |
| | for block in description_items: |
| | if isinstance(block, dict): |
| | value = (block.get("description") or "").strip() |
| | if value: |
| | descriptions.append(_strip_html(value)) |
| | description_blob = " ".join(descriptions).strip() |
| | if not _is_pashto_centric(title, description_blob): |
| | continue |
| | if _is_low_signal_name(title): |
| | continue |
| |
|
| | doi = (attributes.get("doi") or item.get("id") or "").strip() |
| | record_url = (attributes.get("url") or "").strip() |
| | if not record_url and doi: |
| | record_url = f"https://doi.org/{doi}" |
| | if not record_url: |
| | continue |
| |
|
| | general_type = str((attributes.get("types") or {}).get("resourceTypeGeneral") or "").casefold() |
| | if general_type in dataset_types: |
| | category = "dataset" |
| | elif general_type in software_types: |
| | category = "code" |
| | elif general_type in paper_types: |
| | category = "paper" |
| | else: |
| | category = "project" |
| |
|
| | rid = f"candidate-datacite-{category}-{_slug(title)}" |
| | summary = description_blob or "Candidate record returned from DataCite DOI search for Pashto." |
| | out.append( |
| | _candidate( |
| | rid=rid, |
| | title=title, |
| | url=record_url, |
| | category=category, |
| | source="datacite", |
| | summary=summary[:240], |
| | evidence_text="DataCite metadata includes Pashto markers in title or description.", |
| | evidence_url=record_url, |
| | markers=["pashto"], |
| | tags=["pashto", "candidate", category, "datacite"], |
| | ) |
| | ) |
| | if len(out) >= limit: |
| | break |
| | return out |
| |
|
| |
|
| | def fetch_arxiv(limit: int) -> list[dict[str, Any]]: |
| | roots: list[ET.Element] = [] |
| | errors: list[str] = [] |
| | for term in PASHTO_QUERY_TERMS: |
| | query = urllib.parse.urlencode( |
| | {"search_query": f"all:{term}", "start": "0", "max_results": str(limit)} |
| | ) |
| | url = f"https://export.arxiv.org/api/query?{query}" |
| | try: |
| | xml_text = _fetch_text(url, timeout=30.0, source_name="arxiv") |
| | except Exception as exc: |
| | if not _is_ssl_cert_error(exc): |
| | errors.append(f"{term}: {exc}") |
| | continue |
| | |
| | insecure_context = ssl._create_unverified_context() |
| | print("[warn] arxiv SSL verification failed; retrying with unverified TLS context") |
| | xml_text = _fetch_text( |
| | url, |
| | timeout=30.0, |
| | ssl_context=insecure_context, |
| | source_name="arxiv", |
| | ) |
| | roots.append(ET.fromstring(xml_text)) |
| |
|
| | if not roots and errors: |
| | raise RuntimeError("; ".join(errors)) |
| |
|
| | ns = {"atom": "http://www.w3.org/2005/Atom"} |
| |
|
| | seen_links: set[str] = set() |
| | out: list[dict[str, Any]] = [] |
| | for root in roots: |
| | for entry in root.findall("atom:entry", ns): |
| | title = (entry.findtext("atom:title", default="", namespaces=ns) or "").strip() |
| | link = (entry.findtext("atom:id", default="", namespaces=ns) or "").strip() |
| | summary = (entry.findtext("atom:summary", default="", namespaces=ns) or "").strip() |
| | if not title or not link: |
| | continue |
| | if link in seen_links: |
| | continue |
| | |
| | if not _is_pashto_centric(title): |
| | continue |
| | if _is_low_signal_name(title): |
| | continue |
| |
|
| | seen_links.add(link) |
| | rid = f"candidate-arxiv-{_slug(title)}" |
| | out.append( |
| | _candidate( |
| | rid=rid, |
| | title=title, |
| | url=link, |
| | category="paper", |
| | source="arxiv", |
| | summary=summary[:240] if summary else "Candidate paper returned from arXiv query for Pashto.", |
| | evidence_text="Matched by Pashto marker in paper title from arXiv query results.", |
| | evidence_url=link, |
| | markers=["pashto"], |
| | tags=["pashto", "candidate", "paper"], |
| | ) |
| | ) |
| | if len(out) >= limit: |
| | return out |
| | return out |
| |
|
| |
|
| | def fetch_semantic_scholar(limit: int) -> list[dict[str, Any]]: |
| | fields = "title,url,abstract,year,externalIds" |
| | combined: dict[str, dict[str, Any]] = {} |
| | errors: list[str] = [] |
| | for term in PASHTO_QUERY_TERMS: |
| | query = urllib.parse.urlencode( |
| | {"query": term, "limit": str(limit), "fields": fields} |
| | ) |
| | url = f"https://api.semanticscholar.org/graph/v1/paper/search?{query}" |
| | try: |
| | payload = _fetch_json( |
| | url, |
| | timeout=30.0, |
| | source_name="semantic-scholar", |
| | ) |
| | except Exception as exc: |
| | errors.append(f"{term}: {exc}") |
| | continue |
| | for item in payload.get("data", []): |
| | title = (item.get("title") or "").strip() |
| | if not title: |
| | continue |
| | combined[title] = item |
| |
|
| | if not combined and errors: |
| | raise RuntimeError("; ".join(errors)) |
| |
|
| | out: list[dict[str, Any]] = [] |
| | for item in combined.values(): |
| | title = (item.get("title") or "").strip() |
| | if not title: |
| | continue |
| | |
| | if not _is_pashto_centric(title): |
| | continue |
| | if _is_low_signal_name(title): |
| | continue |
| | paper_url = (item.get("url") or "").strip() |
| | if not paper_url: |
| | ext = item.get("externalIds") or {} |
| | arxiv_id = ext.get("ArXiv") |
| | if arxiv_id: |
| | paper_url = f"https://arxiv.org/abs/{arxiv_id}" |
| | if not paper_url: |
| | continue |
| |
|
| | summary = (item.get("abstract") or "").strip() |
| | rid = f"candidate-s2-{_slug(title)}" |
| | out.append( |
| | _candidate( |
| | rid=rid, |
| | title=title, |
| | url=paper_url, |
| | category="paper", |
| | source="other", |
| | summary=summary[:240] if summary else "Candidate paper returned from Semantic Scholar search for Pashto.", |
| | evidence_text="Matched by explicit Pashto marker in paper title from Semantic Scholar search.", |
| | evidence_url=paper_url, |
| | markers=["pashto"], |
| | tags=["pashto", "candidate", "paper"], |
| | ) |
| | ) |
| | if len(out) >= limit: |
| | break |
| | return out |
| |
|
| |
|
| | def _dedupe_candidates( |
| | candidates: list[dict[str, Any]], |
| | existing_ids: set[str], |
| | existing_urls: set[str], |
| | ) -> list[dict[str, Any]]: |
| | unique: list[dict[str, Any]] = [] |
| | seen_ids = set(existing_ids) |
| | seen_urls = set(existing_urls) |
| |
|
| | for item in candidates: |
| | rid = item["id"] |
| | url = item["url"].rstrip("/") |
| | if rid in seen_ids or url in seen_urls: |
| | continue |
| | seen_ids.add(rid) |
| | seen_urls.add(url) |
| | unique.append(item) |
| | return unique |
| |
|
| |
|
| | def main() -> int: |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("--catalog", default="resources/catalog/resources.json") |
| | parser.add_argument("--output", default="resources/catalog/pending_candidates.json") |
| | parser.add_argument("--limit", type=int, default=15) |
| | args = parser.parse_args() |
| |
|
| | catalog_path = Path(args.catalog) |
| | output_path = Path(args.output) |
| |
|
| | catalog = json.loads(catalog_path.read_text(encoding="utf-8")) |
| | resources = catalog.get("resources", []) |
| | existing_ids = {resource.get("id", "") for resource in resources if isinstance(resource, dict)} |
| | existing_urls = { |
| | resource.get("url", "").rstrip("/") |
| | for resource in resources |
| | if isinstance(resource, dict) and isinstance(resource.get("url"), str) |
| | } |
| |
|
| | all_candidates: list[dict[str, Any]] = [] |
| | source_errors: list[str] = [] |
| | sources_used: list[str] = [] |
| |
|
| | fetch_steps = [ |
| | ("kaggle-datasets", lambda: fetch_kaggle_datasets(args.limit)), |
| | ("huggingface-datasets", lambda: fetch_huggingface("datasets", args.limit)), |
| | ("huggingface-models", lambda: fetch_huggingface("models", args.limit)), |
| | ("huggingface-spaces", lambda: fetch_huggingface_spaces(args.limit)), |
| | ("github-repositories", lambda: fetch_github_pashto_repos(args.limit)), |
| | ("gitlab-projects", lambda: fetch_gitlab_pashto_projects(args.limit)), |
| | ("openalex", lambda: fetch_openalex_papers(args.limit)), |
| | ("crossref", lambda: fetch_crossref_papers(args.limit)), |
| | ("zenodo", lambda: fetch_zenodo_records(args.limit)), |
| | ("dataverse", lambda: fetch_dataverse_datasets(args.limit)), |
| | ("datacite", lambda: fetch_datacite_records(args.limit)), |
| | ("arxiv", lambda: fetch_arxiv(args.limit)), |
| | ("semantic-scholar", lambda: fetch_semantic_scholar(args.limit)), |
| | ] |
| |
|
| | for source_name, step in fetch_steps: |
| | try: |
| | results = step() |
| | all_candidates.extend(results) |
| | sources_used.append(source_name) |
| | except Exception as exc: |
| | source_errors.append(f"{source_name}: {exc}") |
| |
|
| | unique_candidates = _dedupe_candidates(all_candidates, existing_ids, existing_urls) |
| | unique_candidates = sorted(unique_candidates, key=lambda item: item["title"].lower()) |
| |
|
| | payload: dict[str, Any] = { |
| | "generated_on": datetime.now(timezone.utc).isoformat(), |
| | "sources": sources_used, |
| | "candidate_count": len(unique_candidates), |
| | "candidates": unique_candidates, |
| | } |
| | if source_errors: |
| | payload["errors"] = source_errors |
| |
|
| | output_path.parent.mkdir(parents=True, exist_ok=True) |
| | if output_path.exists(): |
| | try: |
| | old_payload = json.loads(output_path.read_text(encoding="utf-8")) |
| | except json.JSONDecodeError: |
| | old_payload = None |
| | if isinstance(old_payload, dict): |
| | old_compare = {key: value for key, value in old_payload.items() if key != "generated_on"} |
| | new_compare = {key: value for key, value in payload.items() if key != "generated_on"} |
| | if old_compare == new_compare: |
| | print( |
| | f"Candidate sync complete: {len(unique_candidates)} new candidates, " |
| | f"{len(source_errors)} source errors, no file changes" |
| | ) |
| | return 0 |
| |
|
| | output_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") |
| |
|
| | print( |
| | f"Candidate sync complete: {len(unique_candidates)} new candidates, " |
| | f"{len(source_errors)} source errors" |
| | ) |
| | return 0 |
| |
|
| |
|
| | if __name__ == "__main__": |
| | raise SystemExit(main()) |
| |
|