mtyrrell commited on
Commit
707cb4c
·
1 Parent(s): 01327a1

refactor for orchestrator

Browse files
Files changed (2) hide show
  1. app/main.py +43 -243
  2. requirements.txt +1 -4
app/main.py CHANGED
@@ -1,14 +1,8 @@
1
  import gradio as gr
2
- from fastapi import FastAPI, UploadFile, File, HTTPException
3
- from pydantic import BaseModel
4
- from typing import Optional, Dict, Any, List
5
- import uvicorn
6
  import os
7
  import hashlib
8
  import logging
9
  from datetime import datetime
10
- from contextlib import asynccontextmanager
11
- import json
12
  import re
13
  from pathlib import Path
14
 
@@ -27,29 +21,8 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(
27
  logger = logging.getLogger(__name__)
28
 
29
  # Models
30
- class IngestRequest(BaseModel):
31
- doc_id: str
32
- file_content: bytes
33
- filename: str
34
- content_type: str
35
 
36
- class IngestResponse(BaseModel):
37
- doc_id: str
38
- chunks_indexed: int
39
- status: str
40
- metadata: Dict[str, Any]
41
-
42
- class DocumentChunk(BaseModel):
43
- doc_id: str
44
- chunk_id: str
45
- content: str
46
- metadata: Dict[str, Any]
47
-
48
- # Global storage for processed documents
49
- DOCUMENT_STORE: Dict[str, List[DocumentChunk]] = {}
50
- DOCUMENT_METADATA: Dict[str, Dict[str, Any]] = {}
51
-
52
- def extract_text_from_pdf_bytes(file_content: bytes) -> tuple[str, Dict[str, Any]]:
53
  """Extract text from PDF bytes (in memory)"""
54
  try:
55
  from io import BytesIO
@@ -66,7 +39,7 @@ def extract_text_from_pdf_bytes(file_content: bytes) -> tuple[str, Dict[str, Any
66
  logger.error(f"PDF extraction error: {str(e)}")
67
  raise Exception(f"Failed to extract text from PDF: {str(e)}")
68
 
69
- def extract_text_from_docx_bytes(file_content: bytes) -> tuple[str, Dict[str, Any]]:
70
  """Extract text from DOCX bytes (in memory)"""
71
  try:
72
  from io import BytesIO
@@ -84,8 +57,8 @@ def extract_text_from_docx_bytes(file_content: bytes) -> tuple[str, Dict[str, An
84
  logger.error(f"DOCX extraction error: {str(e)}")
85
  raise Exception(f"Failed to extract text from DOCX: {str(e)}")
86
 
87
- def clean_and_chunk_text(text: str, doc_id: str) -> List[DocumentChunk]:
88
- """Clean text and split into chunks"""
89
  # Basic text cleaning
90
  text = re.sub(r'\n+', '\n', text)
91
  text = re.sub(r'\s+', ' ', text)
@@ -109,37 +82,22 @@ def clean_and_chunk_text(text: str, doc_id: str) -> List[DocumentChunk]:
109
  chunks = text_splitter.split_text(text)
110
 
111
  # Create DocumentChunk objects
112
- document_chunks = []
113
  for i, chunk_text in enumerate(chunks):
114
- chunk = DocumentChunk(
115
- doc_id=doc_id,
116
- chunk_id=f"{doc_id}_chunk_{i}",
117
- content=chunk_text,
118
- metadata={
119
- "chunk_index": i,
120
- "chunk_length": len(chunk_text),
121
- "created_at": datetime.now().isoformat()
122
- }
123
- )
124
- document_chunks.append(chunk)
125
 
126
- return document_chunks
127
-
128
- def generate_doc_id(filename: str, content: bytes) -> str:
129
- """Generate unique document ID"""
130
- content_hash = hashlib.md5(content).hexdigest()[:8]
131
- clean_name = re.sub(r'[^a-zA-Z0-9._-]', '_', filename)
132
- name_without_ext = os.path.splitext(clean_name)[0]
133
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
134
- return f"{timestamp}_{name_without_ext}_{content_hash}"
135
 
136
- def process_document(file_content: bytes, filename: str) -> IngestResponse:
137
- """Main document processing function - now processes in memory"""
138
- start_time = datetime.now()
 
139
 
140
  try:
141
- # Generate document ID
142
- doc_id = generate_doc_id(filename, file_content)
 
 
143
 
144
  # Extract text based on file type (in memory)
145
  file_extension = os.path.splitext(filename)[1].lower()
@@ -152,196 +110,38 @@ def process_document(file_content: bytes, filename: str) -> IngestResponse:
152
  raise ValueError(f"Unsupported file type: {file_extension}")
153
 
154
  # Clean and chunk text
155
- chunks = clean_and_chunk_text(text, doc_id)
156
 
157
- # Store chunks and metadata in memory only
158
- DOCUMENT_STORE[doc_id] = chunks
159
- processing_time = (datetime.now() - start_time).total_seconds()
160
- DOCUMENT_METADATA[doc_id] = {
161
- "filename": filename,
162
- "doc_id": doc_id,
163
- "file_type": file_extension,
164
- "processing_time": processing_time,
165
- "total_text_length": len(text),
166
- "chunks_count": len(chunks),
167
- "processed_at": datetime.now().isoformat(),
168
- "status": "ready"
169
- }
170
 
171
- logger.info(f"Successfully processed document {doc_id}: {len(chunks)} chunks")
172
-
173
- return IngestResponse(
174
- doc_id=doc_id,
175
- chunks_indexed=len(chunks),
176
- status="ready",
177
- metadata=DOCUMENT_METADATA[doc_id]
178
- )
179
 
180
  except Exception as e:
181
  logger.error(f"Document processing failed: {str(e)}")
182
- raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")
183
-
184
- def get_document_context(doc_id: str, max_chunks: int = 10) -> str:
185
- """Retrieve document context for a given doc_id"""
186
- if doc_id not in DOCUMENT_STORE:
187
- return f"Document {doc_id} not found."
188
-
189
- chunks = DOCUMENT_STORE[doc_id][:max_chunks]
190
- context_parts = []
191
-
192
- for chunk in chunks:
193
- context_parts.append(f"[Chunk {chunk.metadata['chunk_index']}]: {chunk.content}")
194
-
195
- return "\n\n".join(context_parts)
196
-
197
- # Gradio functions
198
- def gradio_upload_and_process(file):
199
- """Process uploaded file through Gradio"""
200
- if file is None:
201
- return "No file uploaded", ""
202
-
203
- try:
204
- with open(file.name, 'rb') as f:
205
- file_content = f.read()
206
-
207
- filename = os.path.basename(file.name)
208
- result = process_document(file_content, filename)
209
-
210
- response_text = f"""
211
- Document ID: {result.doc_id}
212
- Chunks created: {result.chunks_indexed}
213
- Processing time: {result.metadata['processing_time']:.2f}s
214
- Total text length: {result.metadata['total_text_length']} characters
215
- File type: {result.metadata['file_type']}"""
216
-
217
- # Get chunks for display
218
- chunks = DOCUMENT_STORE.get(result.doc_id, [])
219
- chunks_display = ""
220
- if chunks:
221
- for i, chunk in enumerate(chunks): # Show first 5 chunks
222
- chunks_display += f"chunk: {i+1}\n"
223
- chunks_display += f"length: {len(chunk.content)}\n"
224
- chunks_display += f"content: {chunk.content}\n\n"
225
-
226
- return response_text, chunks_display
227
-
228
- except Exception as e:
229
- error_msg = f"Error processing document: {str(e)}"
230
- logger.error(error_msg)
231
- return error_msg, ""
232
-
233
- # Create simplified Gradio interface
234
- def create_gradio_interface():
235
- with gr.Blocks(title="ChatFed Ingestion Module") as demo:
236
- gr.Markdown("# ChatFed Ingestion Module")
237
- gr.Markdown("Chunks PDF or DOCX files using LangChain RecursiveCharacterTextSplitter. Intended for use in RAG pipelines as an MCP server with other ChatFed modules.")
238
-
239
- with gr.Row():
240
- with gr.Column():
241
- file_input = gr.File(label="Upload PDF or DOCX file for testing", file_types=[".pdf", ".docx"])
242
- process_btn = gr.Button("Process Document")
243
-
244
- with gr.Column():
245
- result_output = gr.Textbox(label="Processing Result", lines=4)
246
-
247
-
248
- with gr.Row():
249
- chunks_output = gr.Textbox(label="Processed Chunks", lines=15)
250
-
251
- process_btn.click(
252
- fn=gradio_upload_and_process,
253
- inputs=[file_input],
254
- outputs=[result_output, chunks_output]
255
- )
256
-
257
- return demo
258
-
259
- # FastAPI setup
260
- @asynccontextmanager
261
- async def lifespan(app: FastAPI):
262
- logger.info("Document Ingestion Module starting up...")
263
- yield
264
- logger.info("Document Ingestion Module shutting down...")
265
-
266
- app = FastAPI(title="ChatFed Document Ingestion", version="1.0.0", lifespan=lifespan)
267
-
268
- @app.get("/health")
269
- async def health_check():
270
- return {"status": "healthy", "documents_processed": len(DOCUMENT_METADATA)}
271
-
272
- @app.get("/")
273
- async def root():
274
- return {
275
- "message": "ChatFed Document Ingestion API",
276
- "endpoints": {
277
- "health": "/health",
278
- "ingest": "/ingest",
279
- "context": "/context/{doc_id}",
280
- "documents": "/documents"
281
- }
282
- }
283
-
284
- @app.post("/ingest")
285
- async def ingest_endpoint(file: UploadFile = File(...)):
286
- """Ingest a document file"""
287
- try:
288
- file_content = await file.read()
289
- result = process_document(file_content, file.filename)
290
- return result
291
- except Exception as e:
292
- raise HTTPException(status_code=500, detail=str(e))
293
-
294
- @app.get("/context/{doc_id}")
295
- async def get_context_endpoint(doc_id: str, max_chunks: int = 10):
296
- """Get context for a specific document"""
297
- try:
298
- context = get_document_context(doc_id, max_chunks)
299
- return {
300
- "doc_id": doc_id,
301
- "context": context,
302
- "metadata": DOCUMENT_METADATA.get(doc_id, {})
303
- }
304
- except Exception as e:
305
- raise HTTPException(status_code=404, detail=str(e))
306
-
307
- @app.get("/documents")
308
- async def list_documents_endpoint():
309
- """List all processed documents"""
310
- return {
311
- "documents": list(DOCUMENT_METADATA.keys()),
312
- "metadata": DOCUMENT_METADATA
313
- }
314
-
315
- @app.post("/context")
316
- async def get_context_simple(doc_id: str, max_chunks: int = 10):
317
- """Simple context endpoint for orchestrator integration"""
318
- try:
319
- context = get_document_context(doc_id, max_chunks)
320
- return {"context": context}
321
- except Exception as e:
322
- raise HTTPException(status_code=404, detail=str(e))
323
 
324
  if __name__ == "__main__":
325
- # Create and launch Gradio interface
326
- demo = create_gradio_interface()
327
-
328
- # Run both FastAPI and Gradio
329
- import threading
330
-
331
- def run_gradio():
332
- demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True, share=False, quiet=True)
333
-
334
- def run_fastapi():
335
- uvicorn.run(app, host="0.0.0.0", port=7863, log_level="info")
336
-
337
- # Start both services
338
- gradio_thread = threading.Thread(target=run_gradio, daemon=True)
339
- fastapi_thread = threading.Thread(target=run_fastapi, daemon=True)
340
-
341
- gradio_thread.start()
342
- fastapi_thread.start()
343
-
344
- try:
345
- gradio_thread.join()
346
- except KeyboardInterrupt:
347
- logger.info("Shutting down...")
 
 
1
  import gradio as gr
 
 
 
 
2
  import os
3
  import hashlib
4
  import logging
5
  from datetime import datetime
 
 
6
  import re
7
  from pathlib import Path
8
 
 
21
  logger = logging.getLogger(__name__)
22
 
23
  # Models
 
 
 
 
 
24
 
25
+ def extract_text_from_pdf_bytes(file_content: bytes) -> tuple[str, dict]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  """Extract text from PDF bytes (in memory)"""
27
  try:
28
  from io import BytesIO
 
39
  logger.error(f"PDF extraction error: {str(e)}")
40
  raise Exception(f"Failed to extract text from PDF: {str(e)}")
41
 
42
+ def extract_text_from_docx_bytes(file_content: bytes) -> tuple[str, dict]:
43
  """Extract text from DOCX bytes (in memory)"""
44
  try:
45
  from io import BytesIO
 
57
  logger.error(f"DOCX extraction error: {str(e)}")
58
  raise Exception(f"Failed to extract text from DOCX: {str(e)}")
59
 
60
+ def clean_and_chunk_text(text: str) -> str:
61
+ """Clean text and split into chunks, returning formatted context"""
62
  # Basic text cleaning
63
  text = re.sub(r'\n+', '\n', text)
64
  text = re.sub(r'\s+', ' ', text)
 
82
  chunks = text_splitter.split_text(text)
83
 
84
  # Create DocumentChunk objects
85
+ context_parts = []
86
  for i, chunk_text in enumerate(chunks):
87
+ context_parts.append(f"[Chunk {i+1}]: {chunk_text}")
 
 
 
 
 
 
 
 
 
 
88
 
89
+ return "\n\n".join(context_parts)
 
 
 
 
 
 
 
 
90
 
91
+ def ingest(file):
92
+ """Main ingestion function - processes file and returns context directly"""
93
+ if file is None:
94
+ return "No file uploaded", ""
95
 
96
  try:
97
+ with open(file.name, 'rb') as f:
98
+ file_content = f.read()
99
+
100
+ filename = os.path.basename(file.name)
101
 
102
  # Extract text based on file type (in memory)
103
  file_extension = os.path.splitext(filename)[1].lower()
 
110
  raise ValueError(f"Unsupported file type: {file_extension}")
111
 
112
  # Clean and chunk text
113
+ context = clean_and_chunk_text(text)
114
 
115
+ logger.info(f"Successfully processed document {filename}: {len(text)} characters")
 
 
 
 
 
 
 
 
 
 
 
 
116
 
117
+ return context
 
 
 
 
 
 
 
118
 
119
  except Exception as e:
120
  logger.error(f"Document processing failed: {str(e)}")
121
+ raise Exception(f"Processing failed: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
 
123
  if __name__ == "__main__":
124
+ ui = gr.Interface(
125
+ fn=ingest,
126
+ inputs=gr.File(
127
+ label="Document Upload",
128
+ file_types=[".pdf", ".docx"],
129
+ info="Upload a PDF or DOCX file to extract and chunk text for use as context"
130
+ ),
131
+ outputs=gr.Textbox(
132
+ label="Processed Context",
133
+ lines=15,
134
+ show_copy_button=True,
135
+ info="Chunked document content ready for use as context in RAG pipelines"
136
+ ),
137
+ title="ChatFed Ingestion Module",
138
+ description="Processes PDF or DOCX files and returns chunked text context. Intended for use in RAG pipelines as an MCP server with other ChatFed modules (i.e. context supplied to generation service).",
139
+ api_name="ingest"
140
+ )
141
+
142
+ ui.launch(
143
+ server_name="0.0.0.0",
144
+ server_port=7860,
145
+ mcp_server=True,
146
+ show_error=True
147
+ )
requirements.txt CHANGED
@@ -1,4 +1,4 @@
1
- fastapi==0.104.1
2
  uvicorn[standard]==0.24.0
3
  gradio==4.44.0
4
  pydantic==2.5.2
@@ -11,7 +11,4 @@ python-docx==1.1.0
11
  # LangChain text splitters (standalone package)
12
  langchain-text-splitters==0.0.1
13
 
14
- # Utilities
15
- python-dotenv==1.0.0
16
-
17
 
 
1
+ # fastapi==0.104.1
2
  uvicorn[standard]==0.24.0
3
  gradio==4.44.0
4
  pydantic==2.5.2
 
11
  # LangChain text splitters (standalone package)
12
  langchain-text-splitters==0.0.1
13
 
 
 
 
14