File size: 12,892 Bytes
9145e48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import logging
import asyncio
from typing import Dict, Any, Optional
import tempfile
import os
from pathlib import Path
import uuid

from core.document_parser import DocumentParser
from core.chunker import TextChunker
from core.text_preprocessor import TextPreprocessor
from services.vector_store_service import VectorStoreService
from services.document_store_service import DocumentStoreService
from services.embedding_service import EmbeddingService
from services.ocr_service import OCRService

logger = logging.getLogger(__name__)

class IngestionTool:
    def __init__(self, vector_store: VectorStoreService, document_store: DocumentStoreService, 
             embedding_service: EmbeddingService, ocr_service: OCRService):
        self.vector_store = vector_store
        self.document_store = document_store
        self.embedding_service = embedding_service
        self.ocr_service = ocr_service
        
        self.document_parser = DocumentParser()
        # Pass OCR service to document parser
        self.document_parser.ocr_service = ocr_service
        
        self.text_chunker = TextChunker()
        self.text_preprocessor = TextPreprocessor()
    
    async def process_document(self, file_path: str, file_type: str, task_id: Optional[str] = None) -> Dict[str, Any]:
        """Process a document through the full ingestion pipeline"""
        if task_id is None:
            task_id = str(uuid.uuid4())
        
        try:
            logger.info(f"Starting document processing for {file_path}")
            
            # Step 1: Parse the document
            filename = Path(file_path).name
            document = await self.document_parser.parse_document(file_path, filename)
            
            if not document.content:
                logger.warning(f"No content extracted from document {filename}")
                return {
                    "success": False,
                    "error": "No content could be extracted from the document",
                    "task_id": task_id
                }
            
            # Step 2: Store the document
            await self.document_store.store_document(document)
            
            # Step 3: Process content for embeddings
            chunks = await self._create_and_embed_chunks(document)
            
            if not chunks:
                logger.warning(f"No chunks created for document {document.id}")
                return {
                    "success": False,
                    "error": "Failed to create text chunks",
                    "task_id": task_id,
                    "document_id": document.id
                }
            
            # Step 4: Store embeddings
            success = await self.vector_store.add_chunks(chunks)
            
            if not success:
                logger.error(f"Failed to store embeddings for document {document.id}")
                return {
                    "success": False,
                    "error": "Failed to store embeddings",
                    "task_id": task_id,
                    "document_id": document.id
                }
            
            logger.info(f"Successfully processed document {document.id} with {len(chunks)} chunks")
            
            return {
                "success": True,
                "task_id": task_id,
                "document_id": document.id,
                "filename": document.filename,
                "chunks_created": len(chunks),
                "content_length": len(document.content),
                "doc_type": document.doc_type.value,
                "message": f"Successfully processed {filename}"
            }
            
        except Exception as e:
            logger.error(f"Error processing document {file_path}: {str(e)}")
            return {
                "success": False,
                "error": str(e),
                "task_id": task_id,
                "message": f"Failed to process document: {str(e)}"
            }
    
    async def _create_and_embed_chunks(self, document) -> list:
        """Create chunks and generate embeddings"""
        try:
            # Step 1: Create chunks
            chunks = self.text_chunker.chunk_document(
                document.id, 
                document.content, 
                method="recursive"
            )
            
            if not chunks:
                return []
            
            # Step 2: Optimize chunks for embedding
            optimized_chunks = self.text_chunker.optimize_chunks_for_embedding(chunks)
            
            # Step 3: Generate embeddings
            texts = [chunk.content for chunk in optimized_chunks]
            embeddings = await self.embedding_service.generate_embeddings(texts)
            
            # Step 4: Add embeddings to chunks
            embedded_chunks = []
            for i, chunk in enumerate(optimized_chunks):
                if i < len(embeddings):
                    chunk.embedding = embeddings[i]
                    embedded_chunks.append(chunk)
            
            return embedded_chunks
            
        except Exception as e:
            logger.error(f"Error creating and embedding chunks: {str(e)}")
            return []
    
    async def process_url(self, url: str, task_id: Optional[str] = None) -> Dict[str, Any]:
        """Process a document from a URL"""
        try:
            import requests
            from urllib.parse import urlparse
            
            # Download the file
            response = requests.get(url, timeout=30)
            response.raise_for_status()
            
            # Determine file type from URL or content-type
            parsed_url = urlparse(url)
            filename = Path(parsed_url.path).name or "downloaded_file"
            
            # Create temporary file
            with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file:
                tmp_file.write(response.content)
                tmp_file_path = tmp_file.name
            
            try:
                # Process the downloaded file
                result = await self.process_document(tmp_file_path, "", task_id)
                result["source_url"] = url
                return result
            finally:
                # Clean up temporary file
                if os.path.exists(tmp_file_path):
                    os.unlink(tmp_file_path)
                    
        except Exception as e:
            logger.error(f"Error processing URL {url}: {str(e)}")
            return {
                "success": False,
                "error": str(e),
                "task_id": task_id or str(uuid.uuid4()),
                "source_url": url
            }
    
    async def process_text_content(self, content: str, filename: str = "text_content.txt", 
                                 task_id: Optional[str] = None) -> Dict[str, Any]:
        """Process raw text content directly"""
        try:
            from core.models import Document, DocumentType
            from datetime import datetime
            
            # Create document object
            document = Document(
                id=str(uuid.uuid4()),
                filename=filename,
                content=content,
                doc_type=DocumentType.TEXT,
                file_size=len(content.encode('utf-8')),
                created_at=datetime.utcnow(),
                metadata={
                    "source": "direct_text_input",
                    "content_length": len(content),
                    "word_count": len(content.split())
                }
            )
            
            # Store the document
            await self.document_store.store_document(document)
            
            # Process content for embeddings
            chunks = await self._create_and_embed_chunks(document)
            
            if chunks:
                await self.vector_store.add_chunks(chunks)
            
            return {
                "success": True,
                "task_id": task_id or str(uuid.uuid4()),
                "document_id": document.id,
                "filename": filename,
                "chunks_created": len(chunks),
                "content_length": len(content),
                "message": f"Successfully processed text content"
            }
            
        except Exception as e:
            logger.error(f"Error processing text content: {str(e)}")
            return {
                "success": False,
                "error": str(e),
                "task_id": task_id or str(uuid.uuid4())
            }
    
    async def reprocess_document(self, document_id: str, task_id: Optional[str] = None) -> Dict[str, Any]:
        """Reprocess an existing document (useful for updating embeddings)"""
        try:
            # Get the document
            document = await self.document_store.get_document(document_id)
            
            if not document:
                return {
                    "success": False,
                    "error": f"Document {document_id} not found",
                    "task_id": task_id or str(uuid.uuid4())
                }
            
            # Remove existing chunks from vector store
            await self.vector_store.delete_document(document_id)
            
            # Recreate and embed chunks
            chunks = await self._create_and_embed_chunks(document)
            
            if chunks:
                await self.vector_store.add_chunks(chunks)
            
            return {
                "success": True,
                "task_id": task_id or str(uuid.uuid4()),
                "document_id": document_id,
                "filename": document.filename,
                "chunks_created": len(chunks),
                "message": f"Successfully reprocessed {document.filename}"
            }
            
        except Exception as e:
            logger.error(f"Error reprocessing document {document_id}: {str(e)}")
            return {
                "success": False,
                "error": str(e),
                "task_id": task_id or str(uuid.uuid4()),
                "document_id": document_id
            }
    
    async def batch_process_directory(self, directory_path: str, task_id: Optional[str] = None) -> Dict[str, Any]:
        """Process multiple documents from a directory"""
        try:
            directory = Path(directory_path)
            if not directory.exists() or not directory.is_dir():
                return {
                    "success": False,
                    "error": f"Directory {directory_path} does not exist",
                    "task_id": task_id or str(uuid.uuid4())
                }
            
            # Supported file extensions
            supported_extensions = {'.txt', '.pdf', '.docx', '.png', '.jpg', '.jpeg', '.bmp', '.tiff'}
            
            # Find all supported files
            files_to_process = []
            for ext in supported_extensions:
                files_to_process.extend(directory.glob(f"*{ext}"))
                files_to_process.extend(directory.glob(f"*{ext.upper()}"))
            
            if not files_to_process:
                return {
                    "success": False,
                    "error": "No supported files found in directory",
                    "task_id": task_id or str(uuid.uuid4())
                }
            
            # Process files
            results = []
            successful = 0
            failed = 0
            
            for file_path in files_to_process:
                try:
                    result = await self.process_document(str(file_path), file_path.suffix)
                    results.append(result)
                    
                    if result.get("success"):
                        successful += 1
                    else:
                        failed += 1
                        
                except Exception as e:
                    failed += 1
                    results.append({
                        "success": False,
                        "error": str(e),
                        "filename": file_path.name
                    })
            
            return {
                "success": True,
                "task_id": task_id or str(uuid.uuid4()),
                "directory": str(directory),
                "total_files": len(files_to_process),
                "successful": successful,
                "failed": failed,
                "results": results,
                "message": f"Processed {successful}/{len(files_to_process)} files successfully"
            }
            
        except Exception as e:
            logger.error(f"Error batch processing directory {directory_path}: {str(e)}")
            return {
                "success": False,
                "error": str(e),
                "task_id": task_id or str(uuid.uuid4())
            }