Spaces:
Sleeping
Sleeping
File size: 4,417 Bytes
8d63d9f 5277da5 214b80b 5767260 8d63d9f 5277da5 214b80b 8eab782 8d63d9f 214b80b 7ece9c1 214b80b 7ece9c1 214b80b 7ece9c1 214b80b 8d63d9f 5767260 32ffa94 963aee4 8d63d9f 32ffa94 8d63d9f 2d24223 e9d4b1a 8d63d9f 963aee4 5767260 963aee4 8d63d9f 214b80b 963aee4 214b80b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
import logging
from io import BytesIO
from pathlib import Path
from typing import Any
import polars as pl
import pypandoc
import requests
from pypdf import PdfReader
from tqdm import tqdm
from planning_ai.common.utils import Paths
def get_schema() -> dict[str, Any]:
return {
"id": pl.Int64,
"method": pl.String,
"text": pl.String,
"respondentpostcode": pl.String,
"attachments": pl.List(
pl.Struct(
[
pl.Field("id", pl.Int64),
pl.Field("url", pl.String),
pl.Field("published", pl.Boolean),
]
)
),
"representations": pl.List(
pl.Struct(
[
pl.Field("id", pl.Int64),
pl.Field("support/object", pl.Utf8),
pl.Field("document", pl.String),
pl.Field("documentelementid", pl.Int64),
pl.Field("documentelementtitle", pl.String),
pl.Field("summary", pl.String),
]
)
),
}
def process_files(files: list[Path], schema: dict[str, Any]) -> None:
dfs = [pl.read_json(file, schema=schema) for file in tqdm(files)]
(
pl.concat(dfs)
.explode("attachments")
.explode("representations")
.with_columns(
pl.col("attachments").name.map_fields(lambda x: f"attachments_{x}")
)
.unnest("attachments")
.with_columns(
pl.col("representations").name.map_fields(lambda x: f"representations_{x}")
)
.unnest("representations")
.write_parquet(Paths.STAGING / "gcpt3.parquet")
)
def download_attachments():
df = pl.read_parquet(Paths.STAGING / "gcpt3.parquet")
df.columns
existing_files = {f.stem for f in (Paths.RAW / "pdfs").glob("*.pdf")}
failed_files = set()
failed_file_path = Paths.RAW / "failed_downloads.txt"
if failed_file_path.exists():
with open(failed_file_path, "r") as file:
failed_files = set(file.read().splitlines())
for row in tqdm(
df.drop_nulls(subset="attachments_id")
.unique(subset="attachments_id")
.sample(shuffle=True, fraction=1)
.rows(named=True)
):
if (
str(row["attachments_id"]) in existing_files
or str(row["attachments_id"]) in failed_files
):
failed_files.add(row["attachments_id"])
continue
file_path = Paths.RAW / "pdfs" / f"{row['attachments_id']}.pdf"
try:
response = requests.get(row["attachments_url"], timeout=3)
response.raise_for_status()
response_type = response.headers.get("content-type")
if response_type == "application/pdf":
PdfReader(BytesIO(response.content)) # check if pdf is valid
with open(file_path, "wb") as f:
f.write(response.content)
print(f"Downloaded {row['attachments_url']} to {file_path}")
elif (
response_type
== "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
):
pypandoc.convert_file(response.content, "pdf", outputfile=file_path)
else:
logging.error(f"Response returned {response_type}, not saving.")
continue
except requests.RequestException as e:
logging.error(f"RequestException for {row['attachments_url']}: {e}")
failed_files.add(row["attachments_id"])
with open(failed_file_path, "a") as file:
file.write(f"{row['attachments_id']}\n")
print(f"Skipping {row['attachments_url']} due to error: {e}")
except Exception as e:
logging.error(f"Unexpected error for {row['attachments_url']}: {e}")
row["attachments_url"]
failed_files.add(row["attachments_id"])
with open(failed_file_path, "a") as file:
file.write(f"{row['attachments_id']}\n")
print(f"Unexpected error for {row['attachments_url']}: {e}")
def main() -> None:
files = list(Path(Paths.RAW / "gcpt3").glob("*.json"))
schema = get_schema()
process_files(files, schema)
download_attachments()
if __name__ == "__main__":
main()
|