Spaces:
Runtime error
Runtime error
| import json | |
| from typing import List | |
| from langchain.docstore.document import Document | |
| from langchain.document_loaders.base import BaseLoader | |
| from langchain.utils import stringify_dict | |
| class AirbyteJSONLoader(BaseLoader): | |
| """Load local `Airbyte` json files.""" | |
| def __init__(self, file_path: str): | |
| """Initialize with a file path. This should start with '/tmp/airbyte_local/'.""" | |
| self.file_path = file_path | |
| """Path to the directory containing the json files.""" | |
| def load(self) -> List[Document]: | |
| text = "" | |
| for line in open(self.file_path, "r"): | |
| data = json.loads(line)["_airbyte_data"] | |
| text += stringify_dict(data) | |
| metadata = {"source": self.file_path} | |
| return [Document(page_content=text, metadata=metadata)] | |