File size: 4,417 Bytes
8d63d9f
5277da5
214b80b
 
 
 
5767260
8d63d9f
5277da5
214b80b
 
 
 
 
 
 
 
8eab782
 
8d63d9f
214b80b
 
 
 
7ece9c1
214b80b
 
 
 
 
 
 
 
 
7ece9c1
214b80b
7ece9c1
 
214b80b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8d63d9f
 
5767260
32ffa94
963aee4
8d63d9f
 
 
 
 
32ffa94
8d63d9f
 
 
 
 
 
 
 
2d24223
e9d4b1a
8d63d9f
963aee4
 
 
 
 
 
 
5767260
 
 
 
 
 
 
 
 
 
 
 
 
 
963aee4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8d63d9f
 
214b80b
 
 
 
963aee4
214b80b
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import logging
from io import BytesIO
from pathlib import Path
from typing import Any

import polars as pl
import pypandoc
import requests
from pypdf import PdfReader
from tqdm import tqdm

from planning_ai.common.utils import Paths


def get_schema() -> dict[str, Any]:
    return {
        "id": pl.Int64,
        "method": pl.String,
        "text": pl.String,
        "respondentpostcode": pl.String,
        "attachments": pl.List(
            pl.Struct(
                [
                    pl.Field("id", pl.Int64),
                    pl.Field("url", pl.String),
                    pl.Field("published", pl.Boolean),
                ]
            )
        ),
        "representations": pl.List(
            pl.Struct(
                [
                    pl.Field("id", pl.Int64),
                    pl.Field("support/object", pl.Utf8),
                    pl.Field("document", pl.String),
                    pl.Field("documentelementid", pl.Int64),
                    pl.Field("documentelementtitle", pl.String),
                    pl.Field("summary", pl.String),
                ]
            )
        ),
    }


def process_files(files: list[Path], schema: dict[str, Any]) -> None:
    dfs = [pl.read_json(file, schema=schema) for file in tqdm(files)]
    (
        pl.concat(dfs)
        .explode("attachments")
        .explode("representations")
        .with_columns(
            pl.col("attachments").name.map_fields(lambda x: f"attachments_{x}")
        )
        .unnest("attachments")
        .with_columns(
            pl.col("representations").name.map_fields(lambda x: f"representations_{x}")
        )
        .unnest("representations")
        .write_parquet(Paths.STAGING / "gcpt3.parquet")
    )


def download_attachments():
    df = pl.read_parquet(Paths.STAGING / "gcpt3.parquet")
    df.columns

    existing_files = {f.stem for f in (Paths.RAW / "pdfs").glob("*.pdf")}

    failed_files = set()
    failed_file_path = Paths.RAW / "failed_downloads.txt"
    if failed_file_path.exists():
        with open(failed_file_path, "r") as file:
            failed_files = set(file.read().splitlines())

    for row in tqdm(
        df.drop_nulls(subset="attachments_id")
        .unique(subset="attachments_id")
        .sample(shuffle=True, fraction=1)
        .rows(named=True)
    ):
        if (
            str(row["attachments_id"]) in existing_files
            or str(row["attachments_id"]) in failed_files
        ):
            failed_files.add(row["attachments_id"])
            continue
        file_path = Paths.RAW / "pdfs" / f"{row['attachments_id']}.pdf"
        try:
            response = requests.get(row["attachments_url"], timeout=3)
            response.raise_for_status()

            response_type = response.headers.get("content-type")
            if response_type == "application/pdf":
                PdfReader(BytesIO(response.content))  # check if pdf is valid
                with open(file_path, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {row['attachments_url']} to {file_path}")
            elif (
                response_type
                == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
            ):
                pypandoc.convert_file(response.content, "pdf", outputfile=file_path)
            else:
                logging.error(f"Response returned {response_type}, not saving.")
                continue

        except requests.RequestException as e:
            logging.error(f"RequestException for {row['attachments_url']}: {e}")
            failed_files.add(row["attachments_id"])
            with open(failed_file_path, "a") as file:
                file.write(f"{row['attachments_id']}\n")
            print(f"Skipping {row['attachments_url']} due to error: {e}")

        except Exception as e:
            logging.error(f"Unexpected error for {row['attachments_url']}: {e}")
            row["attachments_url"]
            failed_files.add(row["attachments_id"])
            with open(failed_file_path, "a") as file:
                file.write(f"{row['attachments_id']}\n")
            print(f"Unexpected error for {row['attachments_url']}: {e}")


def main() -> None:
    files = list(Path(Paths.RAW / "gcpt3").glob("*.json"))
    schema = get_schema()
    process_files(files, schema)
    download_attachments()


if __name__ == "__main__":
    main()