cjber's picture
fix(updates to doc format):
5767260
import logging
from io import BytesIO
from pathlib import Path
from typing import Any
import polars as pl
import pypandoc
import requests
from pypdf import PdfReader
from tqdm import tqdm
from planning_ai.common.utils import Paths
def get_schema() -> dict[str, Any]:
return {
"id": pl.Int64,
"method": pl.String,
"text": pl.String,
"respondentpostcode": pl.String,
"attachments": pl.List(
pl.Struct(
[
pl.Field("id", pl.Int64),
pl.Field("url", pl.String),
pl.Field("published", pl.Boolean),
]
)
),
"representations": pl.List(
pl.Struct(
[
pl.Field("id", pl.Int64),
pl.Field("support/object", pl.Utf8),
pl.Field("document", pl.String),
pl.Field("documentelementid", pl.Int64),
pl.Field("documentelementtitle", pl.String),
pl.Field("summary", pl.String),
]
)
),
}
def process_files(files: list[Path], schema: dict[str, Any]) -> None:
dfs = [pl.read_json(file, schema=schema) for file in tqdm(files)]
(
pl.concat(dfs)
.explode("attachments")
.explode("representations")
.with_columns(
pl.col("attachments").name.map_fields(lambda x: f"attachments_{x}")
)
.unnest("attachments")
.with_columns(
pl.col("representations").name.map_fields(lambda x: f"representations_{x}")
)
.unnest("representations")
.write_parquet(Paths.STAGING / "gcpt3.parquet")
)
def download_attachments():
df = pl.read_parquet(Paths.STAGING / "gcpt3.parquet")
df.columns
existing_files = {f.stem for f in (Paths.RAW / "pdfs").glob("*.pdf")}
failed_files = set()
failed_file_path = Paths.RAW / "failed_downloads.txt"
if failed_file_path.exists():
with open(failed_file_path, "r") as file:
failed_files = set(file.read().splitlines())
for row in tqdm(
df.drop_nulls(subset="attachments_id")
.unique(subset="attachments_id")
.sample(shuffle=True, fraction=1)
.rows(named=True)
):
if (
str(row["attachments_id"]) in existing_files
or str(row["attachments_id"]) in failed_files
):
failed_files.add(row["attachments_id"])
continue
file_path = Paths.RAW / "pdfs" / f"{row['attachments_id']}.pdf"
try:
response = requests.get(row["attachments_url"], timeout=3)
response.raise_for_status()
response_type = response.headers.get("content-type")
if response_type == "application/pdf":
PdfReader(BytesIO(response.content)) # check if pdf is valid
with open(file_path, "wb") as f:
f.write(response.content)
print(f"Downloaded {row['attachments_url']} to {file_path}")
elif (
response_type
== "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
):
pypandoc.convert_file(response.content, "pdf", outputfile=file_path)
else:
logging.error(f"Response returned {response_type}, not saving.")
continue
except requests.RequestException as e:
logging.error(f"RequestException for {row['attachments_url']}: {e}")
failed_files.add(row["attachments_id"])
with open(failed_file_path, "a") as file:
file.write(f"{row['attachments_id']}\n")
print(f"Skipping {row['attachments_url']} due to error: {e}")
except Exception as e:
logging.error(f"Unexpected error for {row['attachments_url']}: {e}")
row["attachments_url"]
failed_files.add(row["attachments_id"])
with open(failed_file_path, "a") as file:
file.write(f"{row['attachments_id']}\n")
print(f"Unexpected error for {row['attachments_url']}: {e}")
def main() -> None:
files = list(Path(Paths.RAW / "gcpt3").glob("*.json"))
schema = get_schema()
process_files(files, schema)
download_attachments()
if __name__ == "__main__":
main()