nikhil_staging / src /data /sources /json_source.py
nsthorat's picture
Push
55dc3dd
raw
history blame
No virus
1.87 kB
"""CSV source."""
from typing import Iterable
import duckdb
import pandas as pd
from pydantic import Field as PydanticField
from typing_extensions import override
from ...schema import Item
from ...utils import download_http_files
from ..duckdb_utils import duckdb_setup
from .source import Source, SourceSchema, schema_from_df
ROW_ID_COLUMN = '__row_id__'
class JSONDataset(Source):
"""JSON data loader
Supports both JSON and JSONL.
JSON files can live locally as a filepath, or point to an external URL.
""" # noqa: D415, D400
name = 'json'
filepaths: list[str] = PydanticField(description='A list of filepaths to JSON files.')
_source_schema: SourceSchema
_df: pd.DataFrame
@override
def setup(self) -> None:
# Download JSON files to local cache if they are via HTTP to speed up duckdb.
filepaths = download_http_files(self.filepaths)
con = duckdb.connect(database=':memory:')
# DuckDB expects s3 protocol: https://duckdb.org/docs/guides/import/s3_import.html.
s3_filepaths = [path.replace('gs://', 's3://') for path in filepaths]
# NOTE: We use duckdb here to increase parallelism for multiple files.
self._df = con.execute(f"""
{duckdb_setup(con)}
SELECT * FROM read_json_auto(
{s3_filepaths},
IGNORE_ERRORS=true
)
""").df()
# Create the source schema in prepare to share it between process and source_schema.
self._source_schema = schema_from_df(self._df, ROW_ID_COLUMN)
@override
def source_schema(self) -> SourceSchema:
"""Return the source schema."""
return self._source_schema
@override
def process(self) -> Iterable[Item]:
"""Process the source upload request."""
cols = self._df.columns.tolist()
yield from ({
ROW_ID_COLUMN: idx,
**dict(zip(cols, item_vals)),
} for idx, *item_vals in self._df.itertuples())