Demo-Agentic-Service-Data-Eyond / src /knowledge /processing_service.py
ishaq101's picture
[NOTICKET] Demo agentic agent
bef5e76
"""Service for processing documents and ingesting to vector store."""
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document as LangChainDocument
from src.db.postgres.vector_store import get_vector_store
from src.storage.az_blob.az_blob import blob_storage
from src.db.postgres.models import Document as DBDocument
from src.config.settings import settings
from sqlalchemy.ext.asyncio import AsyncSession
from src.middlewares.logging import get_logger
from azure.ai.documentintelligence.aio import DocumentIntelligenceClient
from azure.core.credentials import AzureKeyCredential
from typing import List
import pypdf
import docx
from io import BytesIO
logger = get_logger("knowledge_processing")
class KnowledgeProcessingService:
"""Service for processing documents and ingesting to vector store."""
def __init__(self):
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
async def process_document(self, db_doc: DBDocument, db: AsyncSession) -> int:
"""Process document and ingest to vector store.
Returns:
Number of chunks ingested
"""
try:
logger.info(f"Processing document {db_doc.id}")
content = await blob_storage.download_file(db_doc.blob_name)
if db_doc.file_type == "pdf":
documents = await self._build_pdf_documents(content, db_doc)
else:
text = self._extract_text(content, db_doc.file_type)
if not text.strip():
raise ValueError("No text extracted from document")
chunks = self.text_splitter.split_text(text)
documents = [
LangChainDocument(
page_content=chunk,
metadata={
"document_id": db_doc.id,
"user_id": db_doc.user_id,
"filename": db_doc.filename,
"chunk_index": i,
}
)
for i, chunk in enumerate(chunks)
]
if not documents:
raise ValueError("No text extracted from document")
vector_store = get_vector_store()
await vector_store.aadd_documents(documents)
logger.info(f"Processed {db_doc.id}: {len(documents)} chunks ingested")
return len(documents)
except Exception as e:
logger.error(f"Failed to process document {db_doc.id}", error=str(e))
raise
async def _build_pdf_documents(
self, content: bytes, db_doc: DBDocument
) -> List[LangChainDocument]:
"""Build LangChain documents from PDF with page_label metadata.
Uses Azure Document Intelligence (per-page) when credentials are present,
falls back to pypdf (also per-page) otherwise.
"""
documents: List[LangChainDocument] = []
if settings.azureai_docintel_endpoint and settings.azureai_docintel_key:
async with DocumentIntelligenceClient(
endpoint=settings.azureai_docintel_endpoint,
credential=AzureKeyCredential(settings.azureai_docintel_key),
) as client:
poller = await client.begin_analyze_document(
model_id="prebuilt-read",
body=BytesIO(content),
content_type="application/pdf",
)
result = await poller.result()
logger.info(f"Azure DI extracted {len(result.pages or [])} pages")
for page in result.pages or []:
page_text = "\n".join(
line.content for line in (page.lines or [])
)
if not page_text.strip():
continue
for chunk in self.text_splitter.split_text(page_text):
documents.append(LangChainDocument(
page_content=chunk,
metadata={
"document_id": db_doc.id,
"user_id": db_doc.user_id,
"filename": db_doc.filename,
"chunk_index": len(documents),
"page_label": page.page_number,
}
))
else:
logger.warning("Azure DI not configured, using pypdf")
pdf_reader = pypdf.PdfReader(BytesIO(content))
for page_num, page in enumerate(pdf_reader.pages, start=1):
page_text = page.extract_text() or ""
if not page_text.strip():
continue
for chunk in self.text_splitter.split_text(page_text):
documents.append(LangChainDocument(
page_content=chunk,
metadata={
"document_id": db_doc.id,
"user_id": db_doc.user_id,
"filename": db_doc.filename,
"chunk_index": len(documents),
"page_label": page_num,
}
))
return documents
def _extract_text(self, content: bytes, file_type: str) -> str:
"""Extract text from DOCX or TXT content."""
if file_type == "docx":
doc = docx.Document(BytesIO(content))
return "\n".join(p.text for p in doc.paragraphs)
elif file_type == "txt":
return content.decode("utf-8")
else:
raise ValueError(f"Unsupported file type: {file_type}")
knowledge_processor = KnowledgeProcessingService()