ApplyCRs / scripts /etsi_client.py
heymenn's picture
fix minor errors
2b96123
"""
etsi_client.py β€” ETSI document download helpers for ApplyCRs.
Provides:
CRFetcher β€” CR TDoc downloads via docbox.etsi.org
TSFetcher β€” TS DOCX downloads via portal.etsi.org WKI chain
"""
import json
import os
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin
import requests
import urllib3
from bs4 import BeautifulSoup
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def _get_proxies() -> dict:
"""Return a requests-compatible proxies dict from $http_proxy / $HTTP_PROXY."""
proxy = os.environ.get("http_proxy") or os.environ.get("HTTP_PROXY") or ""
if not proxy:
return {}
return {"http": proxy, "https": proxy}
class CRFetcher:
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/136.0.0.0 Safari/537.36"
)
}
def __init__(self, eol_user: str, eol_password: str):
self.eol_user = eol_user
self.eol_password = eol_password
self.main_ftp_url = "https://docbox.etsi.org/SET"
req_data = self.connect()
self.session = req_data["session"]
def connect(self):
session = requests.Session()
session.headers.update(self.HEADERS)
session.proxies.update(_get_proxies())
# Seed DNN session cookies β€” docbox requires the portal session to be
# initialised with domain=docbox.etsi.org so the .DOTNETNUKE cookie
# is scoped to .etsi.org and accepted by docbox.etsi.org as well.
login_redir_url = (
"https://portal.etsi.org/LoginRedirection.aspx"
"?domain=docbox.etsi.org&ReturnUrl=/"
)
session.get(login_redir_url, verify=False, timeout=15)
req = session.post(
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx",
data=json.dumps({"username": self.eol_user, "password": self.eol_password}),
headers={
"Content-Type": "application/json; charset=UTF-8",
"Referer": login_redir_url,
},
verify=False,
allow_redirects=False,
timeout=15,
)
if req.text == "Failed":
return {
"error": True,
"session": session,
"message": "Login failed! Check your credentials",
}
self.session = session
return {"error": False, "session": session, "message": "Login successful"}
def download_document(self, url: str) -> bytes:
"""Download a docbox file using the authenticated session.
If the session has expired the portal redirects to LoginRedirection β€”
we detect this and re-authenticate before retrying.
"""
resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True)
if resp.url and "LoginRedirection" in resp.url:
self.connect()
resp = self.session.get(url, verify=False, timeout=30, allow_redirects=True)
return resp.content
def get_workgroup(self, doc: str):
main_tsg = (
"SET-WG-R"
if any(doc.startswith(kw) for kw in ["SETREQ", "SCPREQ"])
else "SET-WG-T"
if any(doc.startswith(kw) for kw in ["SETTEC", "SCPTEC"])
else "SET"
if any(doc.startswith(kw) for kw in ["SET", "SCP"])
else None
)
if main_tsg is None:
return None, None, None
regex = re.search(r"\(([^)]+)\)", doc)
workgroup = "20" + regex.group(1)
return main_tsg, workgroup, doc
def find_workgroup_url(self, main_tsg, workgroup):
url = f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS"
response = self.session.get(url, verify=False, timeout=15)
if "LoginRedirection" in response.url:
self.connect()
response = self.session.get(url, verify=False, timeout=15)
soup = BeautifulSoup(response.text, "html.parser")
for item in soup.find_all("tr"):
link = item.find("a")
if link and workgroup in link.get_text():
return (
f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{link.get_text()}"
)
return f"{self.main_ftp_url}/{main_tsg}/05-CONTRIBUTIONS/{workgroup}"
def get_docs_from_url(self, url):
try:
response = self.session.get(url, verify=False, timeout=15)
soup = BeautifulSoup(response.text, "html.parser")
return [item.get_text() for item in soup.select("tr td a")]
except Exception as e:
print(f"Error accessing {url}: {e}")
return []
def search_document(self, doc_id: str):
original = doc_id
main_tsg, workgroup, doc = self.get_workgroup(doc_id)
urls = []
if main_tsg:
wg_url = self.find_workgroup_url(main_tsg, workgroup)
if wg_url:
entries = self.get_docs_from_url(wg_url)
for entry in entries:
if doc in entry.lower() or original in entry:
doc_url = f"{wg_url}/{entry}"
urls.append(doc_url)
elif "." not in entry.rstrip("/"):
sub_url = f"{wg_url}/{entry}"
files = self.get_docs_from_url(sub_url)
for f in files:
if doc in f.lower() or original in f:
urls.append(f"{sub_url}/{f}")
return (
urls[0]
if len(urls) == 1
else urls[-1]
if len(urls) > 1
else f"Document {doc_id} not found"
)
class TSFetcher:
def __init__(self, eol_user: str, eol_password: str):
self.eol_user = eol_user
self.eol_password = eol_password
self.main_url = "https://www.etsi.org/deliver/etsi_ts"
self.second_url = "https://www.etsi.org/deliver/etsi_tr"
self.headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/136.0.0.0 Safari/537.36"
)
}
def get_spec_path(self, doc_id: str):
if "-" in doc_id:
position, part = doc_id.split("-")
else:
position, part = doc_id, None
position = position.replace(" ", "")
if part:
if len(part) == 1:
part = "0" + part
spec_folder = position + part if part is not None else position
return (
f"{int(position) - (int(position) % 100)}_"
f"{int(position) - (int(position) % 100) + 99}/{spec_folder}"
)
def get_docs_from_url(self, url):
try:
response = requests.get(
url, verify=False, timeout=15, proxies=_get_proxies()
)
soup = BeautifulSoup(response.text, "html.parser")
docs = [item.get_text() for item in soup.find_all("a")][1:]
return docs
except Exception as e:
print(f"Error accessing {url}: {e}")
return []
def _normalise_version(self, version: str) -> str:
"""Normalise a user-supplied version string to ETSI zero-padded format.
'17.6.0' -> '17.06.00' (the '_60' release suffix is ignored during matching)
Already-normalised strings like '17.06.00' are returned unchanged."""
parts = version.strip("/").split(".")
if len(parts) == 3:
try:
return f"{int(parts[0]):02d}.{int(parts[1]):02d}.{int(parts[2]):02d}"
except ValueError:
pass
return version.strip("/")
def _pick_release(self, releases: list, version: str = None) -> str:
"""Return the release folder matching version, or the latest if not found/specified."""
if version:
target = self._normalise_version(version)
for r in releases:
folder = r.strip("/").split("_")[0]
if folder == target:
return r
return releases[-1]
def search_document(self, doc_id: str, version: str = None):
original = doc_id
url = f"{self.main_url}/{self.get_spec_path(original)}/"
url2 = f"{self.second_url}/{self.get_spec_path(original)}/"
print(url)
print(url2)
releases = self.get_docs_from_url(url)
if releases:
release = self._pick_release(releases, version)
files = self.get_docs_from_url(url + release)
for f in files:
if f.endswith(".pdf"):
return url + release + "/" + f
releases = self.get_docs_from_url(url2)
if releases:
release = self._pick_release(releases, version)
files = self.get_docs_from_url(url2 + release)
for f in files:
if f.endswith(".pdf"):
return url2 + release + "/" + f
return f"Specification {doc_id} not found"
def _get_wki_id_candidates(self, doc_id: str, version: str = None) -> tuple:
"""Return (candidates, version_str) for a spec version (best match first)."""
if version:
version_str = version
else:
pdf_url = self.search_document(doc_id)
if "not found" in pdf_url.lower():
return [], ""
parts = pdf_url.rstrip("/").split("/")
version_folder = parts[-2] # e.g. "18.04.00_60"
v_parts = version_folder.split("_")[0].split(".") # ["18", "04", "00"]
try:
version_str = f"{int(v_parts[0])}.{int(v_parts[1])}.{int(v_parts[2])}"
except (ValueError, IndexError):
return [], ""
def fetch_candidates():
spec_num = doc_id.split("-")[0].replace(" ", "")
import datetime
today = datetime.date.today().isoformat()
base_params = {
"format": "json",
"page": "1",
"title": "1",
"etsiNumber": "1",
"content": "1",
"version": "0",
"onApproval": "1",
"published": "1",
"withdrawn": "1",
"historical": "1",
"isCurrent": "1",
"superseded": "1",
"startDate": "1988-01-15",
"endDate": today,
"harmonized": "0",
"keyword": "",
"TB": "",
"stdType": "",
"frequency": "",
"mandate": "",
"collection": "",
"sort": "1",
}
# ETSI UI sends capital-V version; try both to be safe
queries = [
f"{doc_id} V{version_str}", # e.g. "104 005 V1.2.1" (UI format)
f"{doc_id} v{version_str}", # e.g. "104 005 v1.2.1"
doc_id, # e.g. "104 005" (wider net)
]
seen = {}
for query in queries:
params = {**base_params, "search": query}
try:
resp = requests.get(
"https://www.etsi.org/custom/standardssearch/data.php",
params=params,
headers={
**self.headers,
"Referer": "https://www.etsi.org/standards/",
},
verify=False,
timeout=15,
proxies=_get_proxies(),
)
data = resp.json()
if data and isinstance(data, list):
hits = [
str(item["wki_id"])
for item in data
if "wki_id" in item and spec_num in json.dumps(item)
]
for h in hits:
seen[h] = None
if hits:
print(f" wki_id search query={query!r} β†’ {len(hits)} hit(s)")
break
except Exception as e:
print(f"Error getting wki_id for {doc_id} (query={query!r}): {e}")
return list(seen.keys())
candidates = list(dict.fromkeys(fetch_candidates()))
return candidates, version_str
def _authenticate_eol(self) -> requests.Session:
"""Create a requests.Session authenticated to the ETSI EOL portal."""
session = requests.Session()
session.headers.update({"User-Agent": self.headers["User-Agent"]})
session.proxies.update(_get_proxies())
login_redir_url = (
"https://portal.etsi.org/LoginRedirection.aspx"
"?domain=docbox.etsi.org&ReturnUrl=/"
)
session.get(login_redir_url, verify=False, timeout=15)
login_resp = session.post(
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx",
data=json.dumps({"username": self.eol_user, "password": self.eol_password}),
headers={
"Content-Type": "application/json; charset=UTF-8",
"Referer": login_redir_url,
},
verify=False,
allow_redirects=False,
timeout=15,
)
if login_resp.text.strip() == "Failed":
raise RuntimeError(
"ETSI EOL login failed β€” check EOL_USER / EOL_PASSWORD"
)
return session
def search_document_docx(self, doc_id: str, version: str = None) -> str:
"""Download an ETSI spec as DOCX and return the local file path."""
candidates, version_str = self._get_wki_id_candidates(doc_id, version)
if not candidates:
return f"Specification {doc_id} not found"
try:
version_tag = "".join(f"{int(p):02d}" for p in version_str.split("."))
except (ValueError, AttributeError):
version_tag = ""
auth_session = self._authenticate_eol()
def try_wki(wki_id):
print(f"Trying wki_id={wki_id} for {doc_id}")
session = requests.Session()
session.headers.update({"User-Agent": self.headers["User-Agent"]})
session.proxies.update(_get_proxies())
session.cookies.update(auth_session.cookies)
# Step 1: LogonRedirection.asp registers the download intent server-side,
# generates a one-time profile_id, then 302s to NTaccount.asp.
# allow_redirects=True means the final response IS the NTaccount.asp page.
# Do NOT call NTaccount.asp again β€” a second call invalidates profile_id A
# and the server rejects the new profile_id B with "Your identifier is wrong".
r_logon = session.get(
f"https://portal.etsi.org/webapp/workprogram/LogonRedirection.asp"
f"?wki_id={wki_id}",
verify=False,
timeout=15,
allow_redirects=True,
)
meta_match = re.search(r"URL=([^\"'\s>]+)", r_logon.text)
if not meta_match:
print(
f" wki_id={wki_id}: authentication failed "
f"(no URL= in NTaccount.asp), trying next"
)
return None
meta_url = urljoin(r_logon.url, meta_match.group(1))
r2 = session.get(meta_url, allow_redirects=False, verify=False, timeout=15)
if r2.status_code != 302:
print(
f" wki_id={wki_id}: unexpected status {r2.status_code}, trying next"
)
return None
location2 = r2.headers.get("Location", "")
if "processerror" in location2.lower():
print(f" wki_id={wki_id}: portal rejected ({location2}), trying next")
return None
copy_url = urljoin("https://portal.etsi.org/", location2)
r3 = session.get(copy_url, allow_redirects=False, verify=False, timeout=15)
if r3.status_code == 302:
location3 = r3.headers.get("Location", "")
final_url = urljoin("https://portal.etsi.org/webapp/ewp/", location3)
r4 = session.get(final_url, verify=False, timeout=15)
else:
r4 = r3
docx_urls = re.findall(
r'href=["\']([^"\']*\.docx)["\']', r4.text, re.IGNORECASE
)
if not docx_urls:
print(f" wki_id={wki_id}: DOCX not found in page, trying next")
return None
spec_num = doc_id.split("-")[0].replace(" ", "")
matching_urls = [u for u in docx_urls if spec_num in u.split("/")[-1]]
if not matching_urls:
print(
f" wki_id={wki_id}: DOCX spec mismatch "
f"(expected {spec_num}), trying next"
)
return None
if version_tag:
version_candidates = [
version_tag, # "010201"
f"v{version_tag}", # "v010201"
version_str.replace(".", ""), # "121"
version_str, # "1.2.1"
version_str.replace(".", "_"), # "1_2_1"
]
versioned_urls = []
for tag in version_candidates:
versioned_urls = [
u for u in matching_urls if tag in u.split("/")[-1]
]
if versioned_urls:
break
if not versioned_urls:
found_names = [u.split("/")[-1] for u in matching_urls]
# Decode the available version from the first filename (e.g. v160500 β†’ 16.5.0)
avail_ver = None
if found_names:
m = re.search(r'v(\d{6})p?', found_names[0])
if m:
t = m.group(1)
avail_ver = f"{int(t[0:2])}.{int(t[2:4])}.{int(t[4:6])}"
if avail_ver:
print(
f"\n *** WARNING ***\n"
f" TS {doc_id} v{version_str} is not available on the ETSI portal.\n"
f" Portal has v{avail_ver} (file: {found_names[0]}).\n"
f" Options: target v{avail_ver} in your CR, or drop the TS DOCX manually.\n"
)
else:
print(
f" wki_id={wki_id}: version tag not in filenames {found_names}, "
f"rejecting (wrong version would be downloaded)"
)
return None
matching_urls = versioned_urls
docx_url = matching_urls[0]
dl = session.get(
docx_url,
headers={"Referer": r4.url},
verify=False,
timeout=60,
)
filename = docx_url.split("/")[-1]
tmp_path = f"/tmp/{filename}"
with open(tmp_path, "wb") as f:
f.write(dl.content)
print(f" wki_id={wki_id}: success")
return tmp_path
executor = ThreadPoolExecutor(max_workers=min(len(candidates), 4))
try:
futures = {executor.submit(try_wki, wki_id): wki_id for wki_id in candidates}
for future in as_completed(futures):
result = future.result()
if result is not None:
for f in futures:
f.cancel()
return result
finally:
executor.shutdown(wait=False)
return f"Specification {doc_id}: all {len(candidates)} wki_id candidate(s) rejected"