sofhiaazzhr commited on
Commit
959b1b0
·
1 Parent(s): 5f86993

[NOTICKET][doc] remove column filter and fallback cap for full-schema approach

Browse files
Files changed (1) hide show
  1. src/query/executors/tabular.py +10 -18
src/query/executors/tabular.py CHANGED
@@ -1,9 +1,13 @@
1
  """Executor for tabular document sources (source_type="document", file_type csv/xlsx).
2
 
 
 
 
 
3
  Flow:
4
  1. Group RetrievalResult chunks by (document_id, sheet_name).
5
  2. Per group: download Parquet from Azure Blob → pandas DataFrame.
6
- 3. Build schema context from DataFrame columns + sample values.
7
  4. LLM decides operation (groupby_sum, filter, top_n, etc.) via structured output.
8
  5. Pandas runs the operation; retry up to 3x on error with feedback to LLM.
9
  6. Fallback to raw rows if all retries fail.
@@ -28,7 +32,6 @@ logger = get_logger("tabular_executor")
28
 
29
 
30
  class _GroupInfo(TypedDict):
31
- columns: list[str]
32
  filename: str
33
  file_type: str
34
 
@@ -225,7 +228,7 @@ class TabularExecutor(BaseExecutor):
225
  if not tabular:
226
  return []
227
 
228
- # Group by (document_id, sheet_name) collect relevant column names
229
  groups: dict[tuple[str, str | None], _GroupInfo] = {}
230
  for r in tabular:
231
  data = r.metadata.get("data", {})
@@ -233,29 +236,18 @@ class TabularExecutor(BaseExecutor):
233
  if not doc_id:
234
  continue
235
  sheet_name = data.get("sheet_name") # None for CSV
236
- col_name = data.get("column_name")
237
- filename = data.get("filename", "")
238
- file_type = data.get("file_type", "")
239
-
240
  key = (doc_id, sheet_name)
241
  if key not in groups:
242
  groups[key] = {
243
- "columns": [],
244
- "filename": filename,
245
- "file_type": file_type,
246
  }
247
- if col_name and col_name not in groups[key]["columns"]:
248
- groups[key]["columns"].append(col_name)
249
 
250
  async def _process_group(
251
  doc_id: str, sheet_name: str | None, info: _GroupInfo
252
  ) -> QueryResult | None:
253
  try:
254
  df = await download_parquet(user_id, doc_id, sheet_name)
255
- if info["columns"]:
256
- valid_cols = [c for c in info["columns"] if c in df.columns]
257
- if valid_cols:
258
- df = df[valid_cols]
259
  df_result = await self._query_with_agent(df, question, limit)
260
 
261
  table_label = info["filename"]
@@ -321,9 +313,9 @@ class TabularExecutor(BaseExecutor):
321
  prev_error = str(e)
322
  logger.warning("tabular agent error", attempt=attempt + 1, error=prev_error)
323
 
324
- # Fallback: return raw rows
325
  logger.warning("tabular agent failed after retries, returning raw rows")
326
- return df.head(limit)[df.columns[:20]]
327
 
328
 
329
  tabular_executor = TabularExecutor()
 
1
  """Executor for tabular document sources (source_type="document", file_type csv/xlsx).
2
 
3
+ Receives sheet-level RetrievalResults from SchemaRetriever (each result
4
+ represents a relevant sheet, with its full column list available via
5
+ data.column_names in metadata).
6
+
7
  Flow:
8
  1. Group RetrievalResult chunks by (document_id, sheet_name).
9
  2. Per group: download Parquet from Azure Blob → pandas DataFrame.
10
+ 3. Build schema context from full DataFrame columns + sample values.
11
  4. LLM decides operation (groupby_sum, filter, top_n, etc.) via structured output.
12
  5. Pandas runs the operation; retry up to 3x on error with feedback to LLM.
13
  6. Fallback to raw rows if all retries fail.
 
32
 
33
 
34
  class _GroupInfo(TypedDict):
 
35
  filename: str
36
  file_type: str
37
 
 
228
  if not tabular:
229
  return []
230
 
231
+ # Group by (document_id, sheet_name) one parquet download per group
232
  groups: dict[tuple[str, str | None], _GroupInfo] = {}
233
  for r in tabular:
234
  data = r.metadata.get("data", {})
 
236
  if not doc_id:
237
  continue
238
  sheet_name = data.get("sheet_name") # None for CSV
 
 
 
 
239
  key = (doc_id, sheet_name)
240
  if key not in groups:
241
  groups[key] = {
242
+ "filename": data.get("filename", ""),
243
+ "file_type": data.get("file_type", ""),
 
244
  }
 
 
245
 
246
  async def _process_group(
247
  doc_id: str, sheet_name: str | None, info: _GroupInfo
248
  ) -> QueryResult | None:
249
  try:
250
  df = await download_parquet(user_id, doc_id, sheet_name)
 
 
 
 
251
  df_result = await self._query_with_agent(df, question, limit)
252
 
253
  table_label = info["filename"]
 
313
  prev_error = str(e)
314
  logger.warning("tabular agent error", attempt=attempt + 1, error=prev_error)
315
 
316
+ # Fallback: return raw rows (all columns — chat.py caps rows at 20 before LLM)
317
  logger.warning("tabular agent failed after retries, returning raw rows")
318
+ return df.head(limit)
319
 
320
 
321
  tabular_executor = TabularExecutor()