nikhil_no_persistent / lilac /sources /parquet_source.py
nsthorat-lilac's picture
Duplicate from lilacai/nikhil_staging
bfc0ec6
raw
history blame
1.26 kB
"""Parquet source."""
from typing import Iterable, Optional
import pyarrow as pa
import pyarrow.parquet as pq
from pydantic import Field
from typing_extensions import override
from ..schema import Item, arrow_schema_to_schema
from .source import Source, SourceSchema
class ParquetSource(Source):
"""Parquet source."""
name = 'parquet'
filepaths: list[str] = Field(description='A list of paths to parquet files.')
_source_schema: Optional[SourceSchema] = None
_table: Optional[pa.Table] = None
@override
def setup(self) -> None:
assert self.filepaths, 'filepaths must be specified.'
self._table = pa.concat_tables([pq.read_table(f) for f in self.filepaths])
self._source_schema = SourceSchema(
fields=arrow_schema_to_schema(pq.read_schema(self.filepaths[0])).fields,
num_items=self._table.num_rows)
@override
def source_schema(self) -> SourceSchema:
"""Return the source schema."""
assert self._source_schema is not None, 'setup() must be called first.'
return self._source_schema
@override
def process(self) -> Iterable[Item]:
"""Process the source upload request."""
assert self._table is not None, 'setup() must be called first.'
for row in self._table.to_pylist():
yield row