Commit ·
8daf9b5
1
Parent(s): b59ef76
feat: add sheet-level chunk on CSV/XLSX ingestion
Browse filesEach tabular file now produces a sheet-level chunk (chunk_level=sheet)
alongside per-column chunks, enabling schema-aware retrieval without
loading the full Parquet at search time.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
src/knowledge/processing_service.py
CHANGED
|
@@ -156,6 +156,7 @@ class KnowledgeProcessingService:
|
|
| 156 |
metadata={
|
| 157 |
"user_id": db_doc.user_id,
|
| 158 |
"source_type": "document",
|
|
|
|
| 159 |
"updated_at": datetime.now(_JAKARTA_TZ).isoformat(),
|
| 160 |
"data": {
|
| 161 |
"document_id": db_doc.id,
|
|
@@ -169,12 +170,40 @@ class KnowledgeProcessingService:
|
|
| 169 |
))
|
| 170 |
return documents
|
| 171 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 172 |
async def _build_csv_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
|
| 173 |
"""Profile each column of a CSV file and upload Parquet to Azure Blob."""
|
| 174 |
df = pd.read_csv(BytesIO(content))
|
| 175 |
await upload_parquet(df, db_doc.user_id, db_doc.id)
|
| 176 |
logger.info(f"Uploaded Parquet for CSV {db_doc.id}")
|
| 177 |
-
|
|
|
|
|
|
|
| 178 |
|
| 179 |
async def _build_excel_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
|
| 180 |
"""Profile each column of every sheet in an Excel file and upload one Parquet per sheet."""
|
|
@@ -185,7 +214,9 @@ class KnowledgeProcessingService:
|
|
| 185 |
docs = self._profile_dataframe(df, source_name, db_doc)
|
| 186 |
for doc in docs:
|
| 187 |
doc.metadata["data"]["sheet_name"] = sheet_name
|
|
|
|
| 188 |
documents.extend(docs)
|
|
|
|
| 189 |
await upload_parquet(df, db_doc.user_id, db_doc.id, sheet_name)
|
| 190 |
logger.info(f"Uploaded Parquet for sheet '{sheet_name}' of {db_doc.id}")
|
| 191 |
return documents
|
|
|
|
| 156 |
metadata={
|
| 157 |
"user_id": db_doc.user_id,
|
| 158 |
"source_type": "document",
|
| 159 |
+
"chunk_level": "column",
|
| 160 |
"updated_at": datetime.now(_JAKARTA_TZ).isoformat(),
|
| 161 |
"data": {
|
| 162 |
"document_id": db_doc.id,
|
|
|
|
| 170 |
))
|
| 171 |
return documents
|
| 172 |
|
| 173 |
+
def _to_sheet_document(
|
| 174 |
+
self, df: pd.DataFrame, db_doc: DBDocument, sheet_name: str | None, source_name: str
|
| 175 |
+
) -> LangChainDocument:
|
| 176 |
+
col_summary = ", ".join(f"{c} ({df[c].dtype})" for c in df.columns)
|
| 177 |
+
text = (
|
| 178 |
+
f"Source: {source_name} ({len(df)} rows)\n"
|
| 179 |
+
f"Columns ({len(df.columns)}): {col_summary}"
|
| 180 |
+
)
|
| 181 |
+
return LangChainDocument(
|
| 182 |
+
page_content=text,
|
| 183 |
+
metadata={
|
| 184 |
+
"user_id": db_doc.user_id,
|
| 185 |
+
"source_type": "document",
|
| 186 |
+
"chunk_level": "sheet",
|
| 187 |
+
"updated_at": datetime.now(_JAKARTA_TZ).isoformat(),
|
| 188 |
+
"data": {
|
| 189 |
+
"document_id": db_doc.id,
|
| 190 |
+
"filename": db_doc.filename,
|
| 191 |
+
"file_type": db_doc.file_type,
|
| 192 |
+
"sheet_name": sheet_name,
|
| 193 |
+
"column_names": list(df.columns),
|
| 194 |
+
"row_count": len(df),
|
| 195 |
+
},
|
| 196 |
+
},
|
| 197 |
+
)
|
| 198 |
+
|
| 199 |
async def _build_csv_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
|
| 200 |
"""Profile each column of a CSV file and upload Parquet to Azure Blob."""
|
| 201 |
df = pd.read_csv(BytesIO(content))
|
| 202 |
await upload_parquet(df, db_doc.user_id, db_doc.id)
|
| 203 |
logger.info(f"Uploaded Parquet for CSV {db_doc.id}")
|
| 204 |
+
docs = self._profile_dataframe(df, db_doc.filename, db_doc)
|
| 205 |
+
docs.append(self._to_sheet_document(df, db_doc, sheet_name=None, source_name=db_doc.filename))
|
| 206 |
+
return docs
|
| 207 |
|
| 208 |
async def _build_excel_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
|
| 209 |
"""Profile each column of every sheet in an Excel file and upload one Parquet per sheet."""
|
|
|
|
| 214 |
docs = self._profile_dataframe(df, source_name, db_doc)
|
| 215 |
for doc in docs:
|
| 216 |
doc.metadata["data"]["sheet_name"] = sheet_name
|
| 217 |
+
doc.metadata["chunk_level"] = "column"
|
| 218 |
documents.extend(docs)
|
| 219 |
+
documents.append(self._to_sheet_document(df, db_doc, sheet_name, source_name))
|
| 220 |
await upload_parquet(df, db_doc.user_id, db_doc.id, sheet_name)
|
| 221 |
logger.info(f"Uploaded Parquet for sheet '{sheet_name}' of {db_doc.id}")
|
| 222 |
return documents
|