pashto-language-resources / scripts /sync_resources.py
musaw
Sync main snapshot to Hugging Face (no local binary banner)
2f53244
"""Discover new Pashto-related resource candidates from public endpoints.
This script does not auto-merge into the main catalog. It writes candidates to
`resources/catalog/pending_candidates.json` for maintainer review.
Usage:
python scripts/sync_resources.py
python scripts/sync_resources.py --limit 20 --output resources/catalog/pending_candidates.json
"""
from __future__ import annotations
import argparse
import json
import re
import socket
import ssl
import time
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from html import unescape
from http.client import IncompleteRead
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
USER_AGENT = "pashto-resource-sync/1.0"
MAX_FETCH_RETRIES = 4
RETRYABLE_HTTP_CODES = {429, 500, 502, 503, 504}
PASHTO_QUERY_TERMS = ["pashto", "pukhto", "pushto", "pakhto"]
PASHTO_TEXT_MARKERS = ("pashto", "pukhto", "pushto", "pakhto")
PASHTO_SCRIPT_MARKERS = ("پښتو", "پشتو")
PASHTO_WORD_RE = re.compile(
r"(?<![A-Za-z0-9])(pashto|pukhto|pushto|pakhto)(?![A-Za-z0-9])",
re.IGNORECASE,
)
PASHTO_CAMEL_RE = re.compile(
r"(?<![A-Za-z0-9])(pashto|pukhto|pakhto)(?=[A-Z])",
re.IGNORECASE,
)
PASHTO_CODE_RE = re.compile(r"\b(ps(_af)?|pus|pbt[_-]?arab)\b", re.IGNORECASE)
LOW_SIGNAL_RE = re.compile(r"(^|[-_/ ])(test|tmp|trial|scratch)([-_/ ]|$)", re.IGNORECASE)
def _slug(value: str) -> str:
value = value.lower()
value = re.sub(r"[^a-z0-9]+", "-", value)
value = re.sub(r"-+", "-", value).strip("-")
return value[:80] if value else "resource"
def _contains_pashto_marker(value: str) -> bool:
text = (value or "").strip()
if not text:
return False
if PASHTO_WORD_RE.search(text):
return True
if PASHTO_CAMEL_RE.search(text):
return True
if any(marker in text for marker in PASHTO_SCRIPT_MARKERS):
return True
lowered = text.casefold()
return bool(PASHTO_CODE_RE.search(lowered))
def _is_pashto_centric(*values: str) -> bool:
return any(_contains_pashto_marker(value) for value in values)
def _is_low_signal_name(value: str) -> bool:
return bool(LOW_SIGNAL_RE.search(value or ""))
def _strip_html(value: str) -> str:
text = re.sub(r"<[^>]+>", " ", value or "")
text = unescape(text)
return re.sub(r"\s+", " ", text).strip()
def _classify_repo_category(name_blob: str) -> str:
lowered = (name_blob or "").casefold()
code_tokens = ("toolkit", "library", "nlp", "asr", "tts", "ocr", "api", "code", "cli", "sdk")
if any(token in lowered for token in code_tokens):
return "code"
return "project"
def _parse_retry_after_seconds(retry_after: str | None) -> float | None:
if not retry_after:
return None
retry_after = retry_after.strip()
if not retry_after:
return None
if retry_after.isdigit():
return float(retry_after)
try:
retry_at = parsedate_to_datetime(retry_after)
except (TypeError, ValueError):
return None
now = datetime.now(timezone.utc)
if retry_at.tzinfo is None:
retry_at = retry_at.replace(tzinfo=timezone.utc)
return max(0.0, (retry_at - now).total_seconds())
def _is_ssl_cert_error(exc: BaseException) -> bool:
if isinstance(exc, ssl.SSLCertVerificationError):
return True
if isinstance(exc, URLError):
reason = exc.reason
if isinstance(reason, ssl.SSLCertVerificationError):
return True
return "CERTIFICATE_VERIFY_FAILED" in str(exc)
def _retryable_network_error(exc: BaseException) -> bool:
if _is_ssl_cert_error(exc):
return False
if isinstance(exc, (TimeoutError, socket.timeout, IncompleteRead, ConnectionResetError)):
return True
if isinstance(exc, URLError):
reason = exc.reason
if isinstance(reason, (TimeoutError, socket.timeout, IncompleteRead, ConnectionResetError)):
return True
return True
return False
def _retry_delay(attempt: int, retry_after: str | None = None) -> float:
parsed = _parse_retry_after_seconds(retry_after)
if parsed is not None:
return min(max(parsed, 0.0), 60.0)
return min(2 ** (attempt - 1), 30.0)
def _fetch_bytes(
url: str,
*,
timeout: float = 20.0,
ssl_context: ssl.SSLContext | None = None,
source_name: str = "remote",
) -> bytes:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
last_exc: BaseException | None = None
for attempt in range(1, MAX_FETCH_RETRIES + 1):
try:
with urllib.request.urlopen(req, timeout=timeout, context=ssl_context) as response:
return response.read()
except HTTPError as exc:
last_exc = exc
if exc.code in RETRYABLE_HTTP_CODES and attempt < MAX_FETCH_RETRIES:
delay = _retry_delay(attempt, exc.headers.get("Retry-After"))
print(
f"[retry] {source_name} HTTP {exc.code} from {url}; "
f"retrying in {delay:.1f}s ({attempt}/{MAX_FETCH_RETRIES})"
)
time.sleep(delay)
continue
raise
except Exception as exc: # noqa: BLE001
last_exc = exc
if _retryable_network_error(exc) and attempt < MAX_FETCH_RETRIES:
delay = _retry_delay(attempt)
print(
f"[retry] {source_name} network error from {url}: {exc}; "
f"retrying in {delay:.1f}s ({attempt}/{MAX_FETCH_RETRIES})"
)
time.sleep(delay)
continue
raise
if last_exc is not None:
raise RuntimeError(f"{source_name} fetch failed after retries: {last_exc}") from last_exc
raise RuntimeError(f"{source_name} fetch failed unexpectedly for {url}")
def _fetch_json(
url: str,
*,
timeout: float = 20.0,
ssl_context: ssl.SSLContext | None = None,
source_name: str = "remote",
) -> Any:
payload = _fetch_bytes(
url,
timeout=timeout,
ssl_context=ssl_context,
source_name=source_name,
)
return json.loads(payload.decode("utf-8"))
def _fetch_text(
url: str,
*,
timeout: float = 20.0,
ssl_context: ssl.SSLContext | None = None,
source_name: str = "remote",
) -> str:
payload = _fetch_bytes(
url,
timeout=timeout,
ssl_context=ssl_context,
source_name=source_name,
)
return payload.decode("utf-8", errors="replace")
def _candidate(
*,
rid: str,
title: str,
url: str,
category: str,
source: str,
summary: str,
evidence_text: str,
evidence_url: str,
markers: list[str],
tags: list[str],
) -> dict[str, Any]:
return {
"id": rid,
"title": title.strip(),
"url": url.strip(),
"category": category,
"source": source,
"status": "candidate",
"summary": summary.strip(),
"primary_use": "Needs maintainer review before promotion to verified catalog.",
"tasks": [],
"pashto_evidence": {
"evidence_text": evidence_text.strip(),
"evidence_url": evidence_url.strip(),
"markers": markers,
},
"tags": tags,
}
def fetch_huggingface(kind: str, limit: int) -> list[dict[str, Any]]:
if kind not in {"datasets", "models"}:
return []
combined: dict[str, dict[str, Any]] = {}
errors: list[str] = []
for term in PASHTO_QUERY_TERMS:
query = urllib.parse.urlencode({"search": term, "limit": str(limit)})
url = f"https://huggingface.co/api/{kind}?{query}"
try:
payload = _fetch_json(url, source_name=f"huggingface-{kind}")
except Exception as exc: # noqa: BLE001
errors.append(f"{term}: {exc}")
continue
for item in payload:
repo_id = item.get("id") or item.get("modelId")
if not repo_id:
continue
combined[repo_id] = item
if not combined and errors:
raise RuntimeError("; ".join(errors))
category = "dataset" if kind == "datasets" else "model"
out: list[dict[str, Any]] = []
for item in combined.values():
repo_id = item.get("id") or item.get("modelId")
if not repo_id:
continue
if not _is_pashto_centric(repo_id):
continue
if _is_low_signal_name(repo_id):
continue
repo_url = f"https://huggingface.co/{'datasets/' if kind == 'datasets' else ''}{repo_id}"
rid = f"candidate-hf-{kind[:-1]}-{_slug(repo_id)}"
out.append(
_candidate(
rid=rid,
title=repo_id,
url=repo_url,
category=category,
source="huggingface",
summary=f"Candidate {category} returned from Hugging Face search for Pashto.",
evidence_text="Matched by Pashto keyword in Hugging Face search results.",
evidence_url=repo_url,
markers=["pashto"],
tags=["pashto", "candidate", category],
)
)
if len(out) >= limit:
break
return out
def fetch_huggingface_spaces(limit: int) -> list[dict[str, Any]]:
combined: dict[str, dict[str, Any]] = {}
errors: list[str] = []
for term in PASHTO_QUERY_TERMS:
query = urllib.parse.urlencode({"search": term, "limit": str(limit)})
url = f"https://huggingface.co/api/spaces?{query}"
try:
payload = _fetch_json(url, source_name="huggingface-spaces")
except Exception as exc: # noqa: BLE001
errors.append(f"{term}: {exc}")
continue
for item in payload:
space_id = item.get("id")
if not space_id:
continue
combined[space_id] = item
if not combined and errors:
raise RuntimeError("; ".join(errors))
out: list[dict[str, Any]] = []
for item in combined.values():
space_id = item.get("id")
if not space_id:
continue
if not _is_pashto_centric(space_id):
continue
if _is_low_signal_name(space_id):
continue
space_url = f"https://huggingface.co/spaces/{space_id}"
rid = f"candidate-hf-project-{_slug(space_id)}"
summary = "Candidate project app returned from Hugging Face Spaces Pashto search."
out.append(
_candidate(
rid=rid,
title=space_id,
url=space_url,
category="project",
source="huggingface",
summary=summary,
evidence_text="Matched by Pashto keyword in Hugging Face Spaces search.",
evidence_url=space_url,
markers=["pashto"],
tags=["pashto", "candidate", "project", "space"],
)
)
if len(out) >= limit:
break
return out
def fetch_kaggle_datasets(limit: int) -> list[dict[str, Any]]:
# Public Kaggle dataset listing endpoint (no auth needed for list responses).
combined: list[dict[str, Any]] = []
seen_urls: set[str] = set()
errors: list[str] = []
for term in PASHTO_QUERY_TERMS:
query = urllib.parse.urlencode({"search": term, "page": "1"})
url = f"https://www.kaggle.com/api/v1/datasets/list?{query}"
try:
payload = _fetch_json(url, source_name="kaggle-datasets")
except Exception as exc: # noqa: BLE001
errors.append(f"{term}: {exc}")
continue
for item in payload:
dataset_url = (item.get("urlNullable") or "").strip()
if not dataset_url or dataset_url in seen_urls:
continue
seen_urls.add(dataset_url)
combined.append(item)
if not combined and errors:
raise RuntimeError("; ".join(errors))
out: list[dict[str, Any]] = []
for item in combined:
title = (item.get("titleNullable") or "").strip()
dataset_url = (item.get("urlNullable") or "").strip()
owner = (item.get("ownerRefNullable") or "").strip()
subtitle = (item.get("subtitleNullable") or "").strip()
if not title or not dataset_url:
continue
if not _is_pashto_centric(title, subtitle):
continue
if _is_low_signal_name(title):
continue
owner_prefix = f"{owner}/" if owner else ""
rid = f"candidate-kaggle-dataset-{_slug(owner_prefix + title)}"
out.append(
_candidate(
rid=rid,
title=title,
url=dataset_url,
category="dataset",
source="kaggle",
summary=(subtitle or "Candidate Kaggle dataset returned from Pashto search.")[:240],
evidence_text="Kaggle dataset title/subtitle includes Pashto keyword.",
evidence_url=dataset_url,
markers=["Pashto"],
tags=["pashto", "candidate", "dataset", "kaggle"],
)
)
if len(out) >= limit:
break
return out
def fetch_github_pashto_repos(limit: int) -> list[dict[str, Any]]:
# Query by topic first for high precision, then by keyword for recall.
query_variants = [
"topic:pashto",
"topic:pukhto",
"pashto in:name,description,readme",
"pukhto in:name,description,readme",
"pushto in:name,description,readme",
"pakhto in:name,description,readme",
]
combined: dict[str, dict[str, Any]] = {}
for query_text in query_variants:
query = urllib.parse.urlencode(
{"q": query_text, "sort": "stars", "order": "desc", "per_page": str(limit)}
)
url = f"https://api.github.com/search/repositories?{query}"
payload = _fetch_json(
url,
timeout=30.0,
source_name="github-repositories",
)
for item in payload.get("items", []):
full_name = item.get("full_name")
html_url = item.get("html_url")
if not full_name or not html_url:
continue
combined[full_name] = item
out: list[dict[str, Any]] = []
for full_name, item in sorted(combined.items(), key=lambda kv: kv[1].get("stargazers_count", 0), reverse=True):
name_blob = " ".join(
[
full_name or "",
item.get("name") or "",
item.get("description") or "",
" ".join(item.get("topics") or []),
]
)
if not _is_pashto_centric(name_blob):
continue
if _is_low_signal_name(full_name):
continue
html_url = item["html_url"]
topics = item.get("topics") or []
category = _classify_repo_category(name_blob)
rid = f"candidate-gh-{category}-{_slug(full_name)}"
description = (item.get("description") or "").strip()
summary = description or "Candidate Pashto-related GitHub repository."
out.append(
_candidate(
rid=rid,
title=full_name,
url=html_url,
category=category,
source="github",
summary=summary[:240] if summary else "Candidate Pashto-related GitHub repository.",
evidence_text="Repository metadata (name/description/topics) includes Pashto markers.",
evidence_url=html_url,
markers=["pashto"],
tags=["pashto", "candidate", category, "github", *(topics[:3])],
)
)
if len(out) >= limit:
break
return out
def fetch_gitlab_pashto_projects(limit: int) -> list[dict[str, Any]]:
combined: dict[str, dict[str, Any]] = {}
errors: list[str] = []
for term in PASHTO_QUERY_TERMS:
query = urllib.parse.urlencode(
{
"search": term,
"simple": "true",
"order_by": "star_count",
"sort": "desc",
"per_page": str(limit),
}
)
url = f"https://gitlab.com/api/v4/projects?{query}"
try:
payload = _fetch_json(url, timeout=30.0, source_name="gitlab-projects")
except Exception as exc: # noqa: BLE001
errors.append(f"{term}: {exc}")
continue
for item in payload:
full_name = (item.get("path_with_namespace") or item.get("name_with_namespace") or "").strip()
if not full_name:
continue
combined[full_name] = item
if not combined and errors:
raise RuntimeError("; ".join(errors))
out: list[dict[str, Any]] = []
sorted_items = sorted(
combined.items(),
key=lambda kv: kv[1].get("star_count") or 0,
reverse=True,
)
for full_name, item in sorted_items:
web_url = (item.get("web_url") or "").strip()
if not web_url:
continue
description = (item.get("description") or "").strip()
topics = item.get("topics") or []
if not isinstance(topics, list):
topics = []
topics = [str(topic).strip() for topic in topics if str(topic).strip()]
name_blob = " ".join([full_name, item.get("name") or "", description, " ".join(topics)])
if not _is_pashto_centric(name_blob):
continue
if _is_low_signal_name(full_name):
continue
category = _classify_repo_category(name_blob)
rid = f"candidate-gitlab-{category}-{_slug(full_name)}"
summary = description or "Candidate Pashto-related GitLab repository."
out.append(
_candidate(
rid=rid,
title=full_name,
url=web_url,
category=category,
source="gitlab",
summary=summary[:240] if summary else "Candidate Pashto-related GitLab repository.",
evidence_text="Project metadata (name/description/topics) includes Pashto markers.",
evidence_url=web_url,
markers=["pashto"],
tags=["pashto", "candidate", category, "gitlab", *(topics[:3])],
)
)
if len(out) >= limit:
break
return out
def fetch_openalex_papers(limit: int) -> list[dict[str, Any]]:
combined: dict[str, dict[str, Any]] = {}
errors: list[str] = []
for term in PASHTO_QUERY_TERMS:
query = urllib.parse.urlencode({"search": term, "per-page": str(limit)})
url = f"https://api.openalex.org/works?{query}"
try:
payload = _fetch_json(url, timeout=30.0, source_name="openalex")
except Exception as exc: # noqa: BLE001
errors.append(f"{term}: {exc}")
continue
for item in payload.get("results", []):
work_id = (item.get("id") or "").strip()
if not work_id:
continue
combined[work_id] = item
if not combined and errors:
raise RuntimeError("; ".join(errors))
out: list[dict[str, Any]] = []
for item in combined.values():
title = (item.get("display_name") or "").strip()
if not title:
continue
if not _is_pashto_centric(title):
continue
if _is_low_signal_name(title):
continue
doi = (item.get("doi") or "").strip()
if doi and not doi.startswith("http"):
doi = f"https://doi.org/{doi}"
primary = item.get("primary_location") or {}
landing = (primary.get("landing_page_url") or "").strip()
paper_url = doi or landing or (item.get("id") or "").strip()
if not paper_url:
continue
rid = f"candidate-openalex-{_slug(title)}"
out.append(
_candidate(
rid=rid,
title=title,
url=paper_url,
category="paper",
source="openalex",
summary="Candidate paper returned from OpenAlex works search for Pashto.",
evidence_text="Matched by explicit Pashto marker in title from OpenAlex works search.",
evidence_url=paper_url,
markers=["pashto"],
tags=["pashto", "candidate", "paper", "openalex"],
)
)
if len(out) >= limit:
break
return out
def fetch_crossref_papers(limit: int) -> list[dict[str, Any]]:
combined: dict[str, dict[str, Any]] = {}
errors: list[str] = []
for term in PASHTO_QUERY_TERMS:
query = urllib.parse.urlencode({"query.title": term, "rows": str(limit)})
url = f"https://api.crossref.org/works?{query}"
try:
payload = _fetch_json(url, timeout=30.0, source_name="crossref")
except Exception as exc: # noqa: BLE001
errors.append(f"{term}: {exc}")
continue
for item in payload.get("message", {}).get("items", []):
doi = (item.get("DOI") or "").strip()
title_list = item.get("title") or []
title = (title_list[0] if isinstance(title_list, list) and title_list else "").strip()
key = doi or title
if not key:
continue
combined[key] = item
if not combined and errors:
raise RuntimeError("; ".join(errors))
out: list[dict[str, Any]] = []
for item in combined.values():
title_list = item.get("title") or []
title = (title_list[0] if isinstance(title_list, list) and title_list else "").strip()
if not title:
continue
if not _is_pashto_centric(title):
continue
if _is_low_signal_name(title):
continue
doi = (item.get("DOI") or "").strip()
paper_url = (item.get("URL") or "").strip()
if not paper_url and doi:
paper_url = f"https://doi.org/{doi}"
if not paper_url:
continue
abstract = _strip_html(item.get("abstract") or "")
rid = f"candidate-crossref-{_slug(title)}"
out.append(
_candidate(
rid=rid,
title=title,
url=paper_url,
category="paper",
source="crossref",
summary=(abstract or "Candidate paper returned from Crossref search for Pashto.")[:240],
evidence_text="Matched by explicit Pashto marker in title from Crossref search.",
evidence_url=paper_url,
markers=["pashto"],
tags=["pashto", "candidate", "paper", "crossref"],
)
)
if len(out) >= limit:
break
return out
def fetch_zenodo_records(limit: int) -> list[dict[str, Any]]:
combined: dict[str, dict[str, Any]] = {}
errors: list[str] = []
for term in PASHTO_QUERY_TERMS:
query = urllib.parse.urlencode({"q": term, "size": str(limit), "sort": "mostrecent"})
url = f"https://zenodo.org/api/records/?{query}"
try:
payload = _fetch_json(url, timeout=30.0, source_name="zenodo")
except Exception as exc: # noqa: BLE001
errors.append(f"{term}: {exc}")
continue
for item in payload.get("hits", {}).get("hits", []):
record_id = str(item.get("id") or "").strip()
if not record_id:
continue
combined[record_id] = item
if not combined and errors:
raise RuntimeError("; ".join(errors))
category_map = {
"dataset": "dataset",
"software": "code",
"publication": "paper",
"poster": "project",
"presentation": "project",
}
out: list[dict[str, Any]] = []
for item in combined.values():
metadata = item.get("metadata") or {}
title = (metadata.get("title") or "").strip()
description = _strip_html(metadata.get("description") or "")
if not title:
continue
if not _is_pashto_centric(title, description):
continue
if _is_low_signal_name(title):
continue
links = item.get("links") or {}
record_url = (links.get("self_html") or links.get("doi") or "").strip()
if not record_url:
doi = (metadata.get("doi") or "").strip()
if doi:
record_url = f"https://doi.org/{doi}"
if not record_url:
continue
rtype = (metadata.get("resource_type") or {}).get("type") or ""
category = category_map.get(str(rtype).casefold(), "project")
rid = f"candidate-zenodo-{category}-{_slug(title)}"
summary = description or "Candidate resource returned from Zenodo search for Pashto."
out.append(
_candidate(
rid=rid,
title=title,
url=record_url,
category=category,
source="zenodo",
summary=summary[:240],
evidence_text="Zenodo metadata includes Pashto markers in title or description.",
evidence_url=record_url,
markers=["pashto"],
tags=["pashto", "candidate", category, "zenodo"],
)
)
if len(out) >= limit:
break
return out
def fetch_dataverse_datasets(limit: int) -> list[dict[str, Any]]:
combined: dict[str, dict[str, Any]] = {}
errors: list[str] = []
base_url = "https://dataverse.harvard.edu"
for term in PASHTO_QUERY_TERMS:
query = urllib.parse.urlencode(
{
"q": term,
"type": "dataset",
"per_page": str(limit),
"start": "0",
}
)
url = f"{base_url}/api/search?{query}"
try:
payload = _fetch_json(url, timeout=30.0, source_name="dataverse")
except Exception as exc: # noqa: BLE001
errors.append(f"{term}: {exc}")
continue
for item in payload.get("data", {}).get("items", []):
key = str(item.get("global_id") or item.get("identifier") or item.get("url") or "").strip()
if not key:
continue
combined[key] = item
if not combined and errors:
raise RuntimeError("; ".join(errors))
out: list[dict[str, Any]] = []
for item in combined.values():
title = (item.get("name") or "").strip()
description = (item.get("description") or "").strip()
if not title:
continue
if not _is_pashto_centric(title, description):
continue
if _is_low_signal_name(title):
continue
record_url = (item.get("url") or "").strip()
if record_url and record_url.startswith("/"):
record_url = f"{base_url}{record_url}"
if not record_url:
global_id = (item.get("global_id") or "").strip()
if global_id:
escaped_id = urllib.parse.quote(global_id, safe=":/")
record_url = f"{base_url}/dataset.xhtml?persistentId={escaped_id}"
if not record_url:
continue
rid = f"candidate-dataverse-dataset-{_slug(title)}"
out.append(
_candidate(
rid=rid,
title=title,
url=record_url,
category="dataset",
source="dataverse",
summary=(description or "Candidate dataset returned from Dataverse search for Pashto.")[:240],
evidence_text="Dataverse metadata includes Pashto markers in dataset title or description.",
evidence_url=record_url,
markers=["pashto"],
tags=["pashto", "candidate", "dataset", "dataverse"],
)
)
if len(out) >= limit:
break
return out
def fetch_datacite_records(limit: int) -> list[dict[str, Any]]:
combined: dict[str, dict[str, Any]] = {}
errors: list[str] = []
for term in PASHTO_QUERY_TERMS:
query = urllib.parse.urlencode(
{
"query": term,
"page[size]": str(limit),
}
)
url = f"https://api.datacite.org/dois?{query}"
try:
payload = _fetch_json(url, timeout=30.0, source_name="datacite")
except Exception as exc: # noqa: BLE001
errors.append(f"{term}: {exc}")
continue
for item in payload.get("data", []):
record_id = (item.get("id") or "").strip()
if not record_id:
continue
combined[record_id] = item
if not combined and errors:
raise RuntimeError("; ".join(errors))
dataset_types = {"dataset", "collection"}
software_types = {"software"}
paper_types = {"journalarticle", "conferencepaper", "preprint", "text"}
out: list[dict[str, Any]] = []
for item in combined.values():
attributes = item.get("attributes") or {}
titles = attributes.get("titles") or []
title = ""
if isinstance(titles, list) and titles:
first = titles[0] or {}
if isinstance(first, dict):
title = (first.get("title") or "").strip()
if not title:
continue
description_items = attributes.get("descriptions") or []
descriptions: list[str] = []
if isinstance(description_items, list):
for block in description_items:
if isinstance(block, dict):
value = (block.get("description") or "").strip()
if value:
descriptions.append(_strip_html(value))
description_blob = " ".join(descriptions).strip()
if not _is_pashto_centric(title, description_blob):
continue
if _is_low_signal_name(title):
continue
doi = (attributes.get("doi") or item.get("id") or "").strip()
record_url = (attributes.get("url") or "").strip()
if not record_url and doi:
record_url = f"https://doi.org/{doi}"
if not record_url:
continue
general_type = str((attributes.get("types") or {}).get("resourceTypeGeneral") or "").casefold()
if general_type in dataset_types:
category = "dataset"
elif general_type in software_types:
category = "code"
elif general_type in paper_types:
category = "paper"
else:
category = "project"
rid = f"candidate-datacite-{category}-{_slug(title)}"
summary = description_blob or "Candidate record returned from DataCite DOI search for Pashto."
out.append(
_candidate(
rid=rid,
title=title,
url=record_url,
category=category,
source="datacite",
summary=summary[:240],
evidence_text="DataCite metadata includes Pashto markers in title or description.",
evidence_url=record_url,
markers=["pashto"],
tags=["pashto", "candidate", category, "datacite"],
)
)
if len(out) >= limit:
break
return out
def fetch_arxiv(limit: int) -> list[dict[str, Any]]:
roots: list[ET.Element] = []
errors: list[str] = []
for term in PASHTO_QUERY_TERMS:
query = urllib.parse.urlencode(
{"search_query": f"all:{term}", "start": "0", "max_results": str(limit)}
)
url = f"https://export.arxiv.org/api/query?{query}"
try:
xml_text = _fetch_text(url, timeout=30.0, source_name="arxiv")
except Exception as exc: # noqa: BLE001
if not _is_ssl_cert_error(exc):
errors.append(f"{term}: {exc}")
continue
# arXiv occasionally fails cert chain validation in some runner images.
insecure_context = ssl._create_unverified_context()
print("[warn] arxiv SSL verification failed; retrying with unverified TLS context")
xml_text = _fetch_text(
url,
timeout=30.0,
ssl_context=insecure_context,
source_name="arxiv",
)
roots.append(ET.fromstring(xml_text))
if not roots and errors:
raise RuntimeError("; ".join(errors))
ns = {"atom": "http://www.w3.org/2005/Atom"}
seen_links: set[str] = set()
out: list[dict[str, Any]] = []
for root in roots:
for entry in root.findall("atom:entry", ns):
title = (entry.findtext("atom:title", default="", namespaces=ns) or "").strip()
link = (entry.findtext("atom:id", default="", namespaces=ns) or "").strip()
summary = (entry.findtext("atom:summary", default="", namespaces=ns) or "").strip()
if not title or not link:
continue
if link in seen_links:
continue
# Strict: keep only papers with explicit Pashto markers in title.
if not _is_pashto_centric(title):
continue
if _is_low_signal_name(title):
continue
seen_links.add(link)
rid = f"candidate-arxiv-{_slug(title)}"
out.append(
_candidate(
rid=rid,
title=title,
url=link,
category="paper",
source="arxiv",
summary=summary[:240] if summary else "Candidate paper returned from arXiv query for Pashto.",
evidence_text="Matched by Pashto marker in paper title from arXiv query results.",
evidence_url=link,
markers=["pashto"],
tags=["pashto", "candidate", "paper"],
)
)
if len(out) >= limit:
return out
return out
def fetch_semantic_scholar(limit: int) -> list[dict[str, Any]]:
fields = "title,url,abstract,year,externalIds"
combined: dict[str, dict[str, Any]] = {}
errors: list[str] = []
for term in PASHTO_QUERY_TERMS:
query = urllib.parse.urlencode(
{"query": term, "limit": str(limit), "fields": fields}
)
url = f"https://api.semanticscholar.org/graph/v1/paper/search?{query}"
try:
payload = _fetch_json(
url,
timeout=30.0,
source_name="semantic-scholar",
)
except Exception as exc: # noqa: BLE001
errors.append(f"{term}: {exc}")
continue
for item in payload.get("data", []):
title = (item.get("title") or "").strip()
if not title:
continue
combined[title] = item
if not combined and errors:
raise RuntimeError("; ".join(errors))
out: list[dict[str, Any]] = []
for item in combined.values():
title = (item.get("title") or "").strip()
if not title:
continue
# Strict: keep only papers with explicit Pashto markers in title.
if not _is_pashto_centric(title):
continue
if _is_low_signal_name(title):
continue
paper_url = (item.get("url") or "").strip()
if not paper_url:
ext = item.get("externalIds") or {}
arxiv_id = ext.get("ArXiv")
if arxiv_id:
paper_url = f"https://arxiv.org/abs/{arxiv_id}"
if not paper_url:
continue
summary = (item.get("abstract") or "").strip()
rid = f"candidate-s2-{_slug(title)}"
out.append(
_candidate(
rid=rid,
title=title,
url=paper_url,
category="paper",
source="other",
summary=summary[:240] if summary else "Candidate paper returned from Semantic Scholar search for Pashto.",
evidence_text="Matched by explicit Pashto marker in paper title from Semantic Scholar search.",
evidence_url=paper_url,
markers=["pashto"],
tags=["pashto", "candidate", "paper"],
)
)
if len(out) >= limit:
break
return out
def _dedupe_candidates(
candidates: list[dict[str, Any]],
existing_ids: set[str],
existing_urls: set[str],
) -> list[dict[str, Any]]:
unique: list[dict[str, Any]] = []
seen_ids = set(existing_ids)
seen_urls = set(existing_urls)
for item in candidates:
rid = item["id"]
url = item["url"].rstrip("/")
if rid in seen_ids or url in seen_urls:
continue
seen_ids.add(rid)
seen_urls.add(url)
unique.append(item)
return unique
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--catalog", default="resources/catalog/resources.json")
parser.add_argument("--output", default="resources/catalog/pending_candidates.json")
parser.add_argument("--limit", type=int, default=15)
args = parser.parse_args()
catalog_path = Path(args.catalog)
output_path = Path(args.output)
catalog = json.loads(catalog_path.read_text(encoding="utf-8"))
resources = catalog.get("resources", [])
existing_ids = {resource.get("id", "") for resource in resources if isinstance(resource, dict)}
existing_urls = {
resource.get("url", "").rstrip("/")
for resource in resources
if isinstance(resource, dict) and isinstance(resource.get("url"), str)
}
all_candidates: list[dict[str, Any]] = []
source_errors: list[str] = []
sources_used: list[str] = []
fetch_steps = [
("kaggle-datasets", lambda: fetch_kaggle_datasets(args.limit)),
("huggingface-datasets", lambda: fetch_huggingface("datasets", args.limit)),
("huggingface-models", lambda: fetch_huggingface("models", args.limit)),
("huggingface-spaces", lambda: fetch_huggingface_spaces(args.limit)),
("github-repositories", lambda: fetch_github_pashto_repos(args.limit)),
("gitlab-projects", lambda: fetch_gitlab_pashto_projects(args.limit)),
("openalex", lambda: fetch_openalex_papers(args.limit)),
("crossref", lambda: fetch_crossref_papers(args.limit)),
("zenodo", lambda: fetch_zenodo_records(args.limit)),
("dataverse", lambda: fetch_dataverse_datasets(args.limit)),
("datacite", lambda: fetch_datacite_records(args.limit)),
("arxiv", lambda: fetch_arxiv(args.limit)),
("semantic-scholar", lambda: fetch_semantic_scholar(args.limit)),
]
for source_name, step in fetch_steps:
try:
results = step()
all_candidates.extend(results)
sources_used.append(source_name)
except Exception as exc: # noqa: BLE001
source_errors.append(f"{source_name}: {exc}")
unique_candidates = _dedupe_candidates(all_candidates, existing_ids, existing_urls)
unique_candidates = sorted(unique_candidates, key=lambda item: item["title"].lower())
payload: dict[str, Any] = {
"generated_on": datetime.now(timezone.utc).isoformat(),
"sources": sources_used,
"candidate_count": len(unique_candidates),
"candidates": unique_candidates,
}
if source_errors:
payload["errors"] = source_errors
output_path.parent.mkdir(parents=True, exist_ok=True)
if output_path.exists():
try:
old_payload = json.loads(output_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
old_payload = None
if isinstance(old_payload, dict):
old_compare = {key: value for key, value in old_payload.items() if key != "generated_on"}
new_compare = {key: value for key, value in payload.items() if key != "generated_on"}
if old_compare == new_compare:
print(
f"Candidate sync complete: {len(unique_candidates)} new candidates, "
f"{len(source_errors)} source errors, no file changes"
)
return 0
output_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
print(
f"Candidate sync complete: {len(unique_candidates)} new candidates, "
f"{len(source_errors)} source errors"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())