Rifqi Hafizuddin commited on
Commit
be9bbd9
·
1 Parent(s): f86da27

[NOTICKET] fix query now use orchestrator msg, rework db pipeline replace ingestion logic

Browse files
src/api/v1/chat.py CHANGED
@@ -242,7 +242,7 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
242
  results=retrieval_objects,
243
  user_id=request.user_id,
244
  db=db,
245
- question=request.message,
246
  )
247
  query_context = _format_query_results(query_results)
248
  if query_context:
 
242
  results=retrieval_objects,
243
  user_id=request.user_id,
244
  db=db,
245
+ question=intent_result.get("search_query") or request.message,
246
  )
247
  query_context = _format_query_results(query_results)
248
  if query_context:
src/pipeline/db_pipeline/db_pipeline_service.py CHANGED
@@ -195,7 +195,20 @@ class DbPipelineService:
195
  all_docs.extend(docs)
196
  logger.info("profiled table", table=table_name, count=len(docs))
197
 
198
- # Delete only after all docs are ready
 
 
 
 
 
 
 
 
 
 
 
 
 
199
  async with _pgvector_engine.begin() as conn:
200
  result = await conn.execute(
201
  text(
@@ -203,16 +216,14 @@ class DbPipelineService:
203
  "WHERE cmetadata->>'user_id' = :user_id "
204
  " AND cmetadata->>'source_type' = 'database' "
205
  " AND cmetadata->>'database_client_id' = :client_id "
 
206
  " AND collection_id = ("
207
  " SELECT uuid FROM langchain_pg_collection WHERE name = 'document_embeddings'"
208
  " )"
209
  ),
210
- {"user_id": user_id, "client_id": client_id},
211
  )
212
- logger.info("cleared old db embeddings", user_id=user_id, deleted=result.rowcount)
213
-
214
- if all_docs:
215
- await vector_store.aadd_documents(all_docs)
216
 
217
  logger.info("db pipeline complete", user_id=user_id, total=len(all_docs))
218
  return len(all_docs)
 
195
  all_docs.extend(docs)
196
  logger.info("profiled table", table=table_name, count=len(docs))
197
 
198
+ # Insert new chunks first; only delete stale chunks after the insert succeeds.
199
+ # Prevents data loss if aadd_documents fails — old embeddings stay queryable
200
+ # until they're proven replaceable. Stale rows are identified by an older
201
+ # updated_at than this run.
202
+ if not all_docs:
203
+ logger.warning(
204
+ "no docs produced from schema; skipping delete to preserve existing embeddings",
205
+ user_id=user_id,
206
+ client_id=client_id,
207
+ )
208
+ return 0
209
+
210
+ await vector_store.aadd_documents(all_docs)
211
+
212
  async with _pgvector_engine.begin() as conn:
213
  result = await conn.execute(
214
  text(
 
216
  "WHERE cmetadata->>'user_id' = :user_id "
217
  " AND cmetadata->>'source_type' = 'database' "
218
  " AND cmetadata->>'database_client_id' = :client_id "
219
+ " AND cmetadata->>'updated_at' < :updated_at "
220
  " AND collection_id = ("
221
  " SELECT uuid FROM langchain_pg_collection WHERE name = 'document_embeddings'"
222
  " )"
223
  ),
224
+ {"user_id": user_id, "client_id": client_id, "updated_at": updated_at},
225
  )
226
+ logger.info("cleared stale db embeddings", user_id=user_id, deleted=result.rowcount)
 
 
 
227
 
228
  logger.info("db pipeline complete", user_id=user_id, total=len(all_docs))
229
  return len(all_docs)