File size: 31,219 Bytes
963ae98
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
#!/usr/bin/env python3
"""

Unified AI Services Application

Coordinates NER, OCR, and RAG services with combined workflows

"""

import asyncio
import subprocess
import signal
import sys
import os
import time
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any, Union
from contextlib import asynccontextmanager
from datetime import datetime
import tempfile
import io

import httpx
import uvicorn
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, BackgroundTasks, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel, HttpUrl
import psutil

# Import our configuration
from configs import get_config, validate_environment

# Get configuration
config = get_config()

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Global service processes
service_processes: Dict[str, subprocess.Popen] = {}
service_health: Dict[str, bool] = {}

# Pydantic Models for Unified API
class ServiceStatus(BaseModel):
    name: str
    status: str
    port: int
    health: bool
    uptime: Optional[float] = None
    response_time: Optional[float] = None

class UnifiedAnalysisRequest(BaseModel):
    text: Optional[str] = None
    url: Optional[HttpUrl] = None
    extract_relationships: bool = True
    include_embeddings: bool = True
    include_summary: bool = True
    generate_graph_files: bool = True
    export_formats: List[str] = ["neo4j", "json", "graphml"]
    enable_rag_indexing: bool = False
    rag_title: Optional[str] = None
    rag_keywords: Optional[List[str]] = None
    rag_metadata: Optional[Dict[str, Any]] = None

class CombinedSearchRequest(BaseModel):
    query: str
    limit: int = 10
    similarity_threshold: float = 0.2
    include_ner_analysis: bool = True
    ner_export_formats: List[str] = ["json"]

class UnifiedResponse(BaseModel):
    success: bool
    service_calls: List[str]
    ner_analysis: Optional[Dict[str, Any]] = None
    rag_document: Optional[Dict[str, Any]] = None
    search_results: Optional[Dict[str, Any]] = None
    processing_time: float
    error: Optional[str] = None

# Service Management Functions
async def start_service(service_name: str, script_path: str, port: int) -> bool:
    """Start a service as a subprocess"""
    try:
        logger.info(f"πŸš€ Starting {service_name} service on port {port}")
        
        # Check if port is already in use
        if is_port_in_use(port):
            logger.warning(f"Port {port} is already in use. Assuming {service_name} is already running.")
            return True
        
        # Start the service
        if sys.platform == "win32":
            process = subprocess.Popen([
                sys.executable, script_path
            ], creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
        else:
            process = subprocess.Popen([
                sys.executable, script_path
            ], preexec_fn=os.setsid)
        
        service_processes[service_name] = process
        
        # Wait for service to start
        for i in range(30):  # 30 second timeout
            await asyncio.sleep(1)
            if await check_service_health(service_name, port):
                logger.info(f"βœ… {service_name} service started successfully")
                service_health[service_name] = True
                return True
        
        logger.error(f"❌ {service_name} service failed to start within timeout")
        return False
        
    except Exception as e:
        logger.error(f"❌ Failed to start {service_name} service: {e}")
        return False

def is_port_in_use(port: int) -> bool:
    """Check if a port is already in use"""
    try:
        for conn in psutil.net_connections():
            if conn.laddr.port == port:
                return True
        return False
    except:
        return False

async def check_service_health(service_name: str, port: int) -> bool:
    """Check if a service is healthy"""
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"http://localhost:{port}/health",
                timeout=5.0
            )
            return response.status_code == 200
    except:
        return False

async def get_service_status(service_name: str, port: int) -> ServiceStatus:
    """Get detailed status of a service"""
    start_time = time.time()
    health = await check_service_health(service_name, port)
    response_time = time.time() - start_time
    
    uptime = None
    if service_name in service_processes:
        process = service_processes[service_name]
        if process.poll() is None:  # Process is running
            try:
                uptime = time.time() - psutil.Process(process.pid).create_time()
            except:
                uptime = None
    
    return ServiceStatus(
        name=service_name,
        status="running" if health else "down",
        port=port,
        health=health,
        uptime=uptime,
        response_time=response_time
    )

async def stop_all_services():
    """Stop all managed services"""
    logger.info("πŸ›‘ Stopping all services...")
    
    for service_name, process in service_processes.items():
        try:
            if process.poll() is None:  # Process is running
                logger.info(f"Stopping {service_name}...")
                
                if sys.platform == "win32":
                    process.send_signal(signal.CTRL_BREAK_EVENT)
                else:
                    os.killpg(os.getpgid(process.pid), signal.SIGTERM)
                
                # Wait for graceful shutdown
                try:
                    process.wait(timeout=10)
                except subprocess.TimeoutExpired:
                    logger.warning(f"Force killing {service_name}")
                    process.kill()
                    
                logger.info(f"βœ… {service_name} stopped")
        except Exception as e:
            logger.error(f"Error stopping {service_name}: {e}")

# Service Communication Functions
async def call_ner_service(endpoint: str, method: str = "GET", **kwargs) -> Dict[str, Any]:
    """Call NER service endpoint"""
    try:
        async with httpx.AsyncClient(timeout=300.0) as client:
            url = f"{config.NER_SERVICE_URL}{endpoint}"
            response = await client.request(method, url, **kwargs)
            
            if response.status_code == 200:
                return response.json()
            else:
                raise HTTPException(status_code=response.status_code, detail=response.text)
                
    except httpx.RequestError as e:
        raise HTTPException(status_code=503, detail=f"NER service unavailable: {e}")

async def call_ocr_service(endpoint: str, method: str = "GET", **kwargs) -> Dict[str, Any]:
    """Call OCR service endpoint"""
    try:
        async with httpx.AsyncClient(timeout=300.0) as client:
            url = f"{config.OCR_SERVICE_URL}{endpoint}"
            response = await client.request(method, url, **kwargs)
            
            if response.status_code == 200:
                return response.json()
            else:
                raise HTTPException(status_code=response.status_code, detail=response.text)
                
    except httpx.RequestError as e:
        raise HTTPException(status_code=503, detail=f"OCR service unavailable: {e}")

async def call_rag_service(endpoint: str, method: str = "GET", **kwargs) -> Dict[str, Any]:
    """Call RAG service endpoint"""
    try:
        async with httpx.AsyncClient(timeout=300.0) as client:
            url = f"{config.RAG_SERVICE_URL}{endpoint}"
            response = await client.request(method, url, **kwargs)
            
            if response.status_code == 200:
                return response.json()
            else:
                raise HTTPException(status_code=response.status_code, detail=response.text)
                
    except httpx.RequestError as e:
        raise HTTPException(status_code=503, detail=f"RAG service unavailable: {e}")

# Application Lifecycle
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan management"""
    logger.info("πŸš€ Starting Unified AI Services Application")
    
    # Print configuration summary
    config.print_configuration_summary()
    
    # Validate environment
    if not validate_environment():
        logger.error("❌ Environment validation failed. Please check your configuration.")
        raise RuntimeError("Invalid environment configuration")
    
    # Define service paths
    service_definitions = [
        ("ocr", "services/ocr_service.py", config.ocr.PORT),
        ("rag", "services/rag_service.py", config.rag.PORT),
        ("ner", "services/ner_service.py", config.ner.PORT)
    ]
    
    # Start services
    started_services = []
    for service_name, script_path, port in service_definitions:
        if os.path.exists(script_path):
            success = await start_service(service_name, script_path, port)
            if success:
                started_services.append(service_name)
            else:
                logger.error(f"Failed to start {service_name} service")
        else:
            logger.warning(f"Service script not found: {script_path}")
    
    if len(started_services) == 0:
        logger.error("❌ No services could be started")
        raise RuntimeError("Failed to start any services")
    
    logger.info(f"βœ… Started {len(started_services)} services: {', '.join(started_services)}")
    
    # Yield control to the application
    yield
    
    # Cleanup
    await stop_all_services()
    logger.info("🏁 Unified AI Services Application shutdown complete")

# FastAPI Application
app = FastAPI(
    title="Unified AI Services",
    description="Coordinated NER, OCR, and RAG services with combined workflows",
    version="1.0.0",
    lifespan=lifespan
)

# CORS configuration
allowed_origins = config.ner.ALLOWED_ORIGINS
if allowed_origins != "*":
    try:
        allowed_origins = json.loads(allowed_origins)
    except:
        allowed_origins = ["*"]

app.add_middleware(
    CORSMiddleware,
    allow_origins=allowed_origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Main API Endpoints
@app.get("/")
async def root():
    return {
        "message": "Unified AI Services",
        "version": "1.0.0",
        "services": {
            "ner": f"{config.NER_SERVICE_URL}",
            "ocr": f"{config.OCR_SERVICE_URL}",
            "rag": f"{config.RAG_SERVICE_URL}"
        },
        "unified_endpoints": {
            "status": "/status",
            "analyze": "/analyze",
            "search": "/search",
            "combined": "/combined/*"
        }
    }

@app.get("/health")
async def unified_health():
    """Unified health check for all services"""
    services = [
        ("ner", config.ner.PORT),
        ("ocr", config.ocr.PORT),
        ("rag", config.rag.PORT)
    ]
    
    service_statuses = []
    overall_healthy = True
    
    for service_name, port in services:
        status = await get_service_status(service_name, port)
        service_statuses.append(status.dict())
        if not status.health:
            overall_healthy = False
    
    return {
        "status": "healthy" if overall_healthy else "degraded",
        "services": service_statuses,
        "timestamp": datetime.utcnow().isoformat(),
        "configuration": {
            "ner_url": config.NER_SERVICE_URL,
            "ocr_url": config.OCR_SERVICE_URL,
            "rag_url": config.RAG_SERVICE_URL
        }
    }

@app.get("/status")
async def detailed_status():
    """Detailed status of all services"""
    services = [
        ("ner", config.ner.PORT),
        ("ocr", config.ocr.PORT),
        ("rag", config.rag.PORT)
    ]
    
    detailed_statuses = {}
    
    for service_name, port in services:
        try:
            # Get service-specific health data
            async with httpx.AsyncClient() as client:
                response = await client.get(f"http://localhost:{port}/health", timeout=10.0)
                if response.status_code == 200:
                    detailed_statuses[service_name] = response.json()
                else:
                    detailed_statuses[service_name] = {"status": "error", "error": f"HTTP {response.status_code}"}
        except Exception as e:
            detailed_statuses[service_name] = {"status": "unreachable", "error": str(e)}
    
    return {
        "unified_app": {
            "status": "running",
            "port": config.MAIN_PORT,
            "uptime": time.time() - start_time if 'start_time' in globals() else 0
        },
        "services": detailed_statuses,
        "configuration_valid": validate_environment()
    }

# Unified Analysis Endpoints
@app.post("/analyze/unified")
async def unified_analysis(request: UnifiedAnalysisRequest):
    """Unified analysis combining NER and optional RAG indexing"""
    start_time = time.time()
    service_calls = []
    
    try:
        # Step 1: NER Analysis
        ner_data = {
            "text": request.text,
            "url": str(request.url) if request.url else None,
            "extract_relationships": request.extract_relationships,
            "include_embeddings": request.include_embeddings,
            "include_summary": request.include_summary,
            "generate_graph_files": request.generate_graph_files,
            "export_formats": request.export_formats
        }
        
        # Remove None values
        ner_data = {k: v for k, v in ner_data.items() if v is not None}
        
        if request.text:
            ner_result = await call_ner_service("/analyze/text", "POST", json=ner_data)
            service_calls.append("ner_text")
        elif request.url:
            ner_result = await call_ner_service("/analyze/url", "POST", json=ner_data)
            service_calls.append("ner_url")
        else:
            raise HTTPException(status_code=400, detail="Either text or url must be provided")
        
        # Step 2: Optional RAG indexing
        rag_result = None
        if request.enable_rag_indexing and ner_result.get("success"):
            try:
                rag_data = {
                    "title": request.rag_title or f"NER Analysis {ner_result.get('analysis_id', 'unknown')}",
                    "keywords": request.rag_keywords or ner_result.get("keywords", []),
                    "metadata": {
                        **(request.rag_metadata or {}),
                        "ner_analysis_id": ner_result.get("analysis_id"),
                        "entity_count": len(ner_result.get("entities", [])),
                        "relationship_count": len(ner_result.get("relationships", []))
                    }
                }
                
                if request.text:
                    # Create temporary file for RAG service
                    with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
                        f.write(request.text)
                        temp_path = f.name
                    
                    try:
                        with open(temp_path, 'rb') as f:
                            files = {"file": ("ner_analysis.txt", f, "text/plain")}
                            form_data = {
                                "title": rag_data["title"],
                                "keywords": json.dumps(rag_data["keywords"]),
                                "metadata": json.dumps(rag_data["metadata"])
                            }
                            
                            async with httpx.AsyncClient(timeout=300.0) as client:
                                response = await client.post(
                                    f"{config.RAG_SERVICE_URL}/documents/upload",
                                    files=files,
                                    data=form_data
                                )
                                if response.status_code == 200:
                                    rag_result = response.json()
                                    service_calls.append("rag_upload")
                    finally:
                        os.unlink(temp_path)
                        
                elif request.url:
                    async with httpx.AsyncClient(timeout=300.0) as client:
                        response = await client.post(
                            f"{config.RAG_SERVICE_URL}/documents/url",
                            json={
                                "url": str(request.url),
                                **rag_data,
                                "extract_images": True
                            }
                        )
                        if response.status_code == 200:
                            rag_result = response.json()
                            service_calls.append("rag_url")
                            
            except Exception as e:
                logger.warning(f"RAG indexing failed: {e}")
                # Continue without RAG result
        
        processing_time = time.time() - start_time
        
        return UnifiedResponse(
            success=True,
            service_calls=service_calls,
            ner_analysis=ner_result,
            rag_document=rag_result,
            processing_time=processing_time
        )
        
    except Exception as e:
        processing_time = time.time() - start_time
        logger.error(f"Unified analysis failed: {e}")
        
        return UnifiedResponse(
            success=False,
            service_calls=service_calls,
            processing_time=processing_time,
            error=str(e)
        )

@app.post("/search/combined")
async def combined_search(request: CombinedSearchRequest):
    """Combined search using RAG with optional NER analysis of results"""
    start_time = time.time()
    service_calls = []
    
    try:
        # Step 1: RAG Search
        search_data = {
            "query": request.query,
            "limit": request.limit,
            "similarity_threshold": request.similarity_threshold
        }
        
        search_result = await call_rag_service("/search", "POST", json=search_data)
        service_calls.append("rag_search")
        
        # Step 2: Optional NER analysis of search results
        ner_results = []
        if request.include_ner_analysis and search_result.get("results"):
            for i, result in enumerate(search_result["results"][:3]):  # Analyze top 3 results
                chunk_content = result.get("chunk", {}).get("content", "")
                if chunk_content:
                    try:
                        ner_data = {
                            "text": chunk_content,
                            "extract_relationships": True,
                            "include_embeddings": False,
                            "include_summary": False,
                            "generate_graph_files": False,
                            "export_formats": request.ner_export_formats
                        }
                        
                        ner_result = await call_ner_service("/analyze/text", "POST", json=ner_data)
                        ner_results.append({
                            "result_index": i,
                            "ner_analysis": ner_result
                        })
                        service_calls.append(f"ner_text_{i}")
                        
                    except Exception as e:
                        logger.warning(f"NER analysis failed for result {i}: {e}")
        
        processing_time = time.time() - start_time
        
        return UnifiedResponse(
            success=True,
            service_calls=service_calls,
            search_results={
                **search_result,
                "ner_analyses": ner_results
            },
            processing_time=processing_time
        )
        
    except Exception as e:
        processing_time = time.time() - start_time
        logger.error(f"Combined search failed: {e}")
        
        return UnifiedResponse(
            success=False,
            service_calls=service_calls,
            processing_time=processing_time,
            error=str(e)
        )

# Service Proxy Endpoints
@app.api_route("/ner/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def ner_proxy(path: str, request):
    """Proxy requests to NER service"""
    try:
        async with httpx.AsyncClient(timeout=300.0) as client:
            url = f"{config.NER_SERVICE_URL}/{path}"
            
            # Forward the request
            if request.method == "GET":
                response = await client.get(url, params=request.query_params)
            else:
                # Handle different content types
                content_type = request.headers.get("content-type", "")
                
                if "multipart/form-data" in content_type:
                    # Handle file uploads
                    form = await request.form()
                    files = {}
                    data = {}
                    
                    for key, value in form.items():
                        if hasattr(value, 'read'):  # File-like object
                            files[key] = (value.filename, await value.read(), value.content_type)
                        else:
                            data[key] = value
                    
                    response = await client.request(request.method, url, files=files, data=data)
                else:
                    # Handle JSON/other content
                    body = await request.body()
                    response = await client.request(
                        request.method, 
                        url, 
                        content=body,
                        headers={k: v for k, v in request.headers.items() if k.lower() != "host"}
                    )
            
            # Return response
            return response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text
            
    except httpx.RequestError as e:
        raise HTTPException(status_code=503, detail=f"NER service unavailable: {e}")

@app.api_route("/ocr/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def ocr_proxy(path: str, request):
    """Proxy requests to OCR service"""
    try:
        async with httpx.AsyncClient(timeout=300.0) as client:
            url = f"{config.OCR_SERVICE_URL}/{path}"
            
            # Forward the request
            if request.method == "GET":
                response = await client.get(url, params=request.query_params)
            else:
                # Handle different content types
                content_type = request.headers.get("content-type", "")
                
                if "multipart/form-data" in content_type:
                    # Handle file uploads
                    form = await request.form()
                    files = {}
                    data = {}
                    
                    for key, value in form.items():
                        if hasattr(value, 'read'):  # File-like object
                            files[key] = (value.filename, await value.read(), value.content_type)
                        else:
                            data[key] = value
                    
                    response = await client.request(request.method, url, files=files, data=data)
                else:
                    # Handle JSON/other content
                    body = await request.body()
                    response = await client.request(
                        request.method, 
                        url, 
                        content=body,
                        headers={k: v for k, v in request.headers.items() if k.lower() != "host"}
                    )
            
            # Return response
            return response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text
            
    except httpx.RequestError as e:
        raise HTTPException(status_code=503, detail=f"OCR service unavailable: {e}")

@app.api_route("/rag/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def rag_proxy(path: str, request):
    """Proxy requests to RAG service"""
    try:
        async with httpx.AsyncClient(timeout=300.0) as client:
            url = f"{config.RAG_SERVICE_URL}/{path}"
            
            # Forward the request
            if request.method == "GET":
                response = await client.get(url, params=request.query_params)
            else:
                # Handle different content types
                content_type = request.headers.get("content-type", "")
                
                if "multipart/form-data" in content_type:
                    # Handle file uploads
                    form = await request.form()
                    files = {}
                    data = {}
                    
                    for key, value in form.items():
                        if hasattr(value, 'read'):  # File-like object
                            files[key] = (value.filename, await value.read(), value.content_type)
                        else:
                            data[key] = value
                    
                    response = await client.request(request.method, url, files=files, data=data)
                else:
                    # Handle JSON/other content
                    body = await request.body()
                    response = await client.request(
                        request.method, 
                        url, 
                        content=body,
                        headers={k: v for k, v in request.headers.items() if k.lower() != "host"}
                    )
            
            # Return response
            return response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text
            
    except httpx.RequestError as e:
        raise HTTPException(status_code=503, detail=f"RAG service unavailable: {e}")

# Convenience endpoints (direct service access)
@app.get("/analyze/text")
@app.post("/analyze/text")
async def analyze_text_direct(request=None):
    """Direct access to NER text analysis"""
    if request:
        return await call_ner_service("/analyze/text", "POST", json=await request.json())
    else:
        return {"message": "Use POST method with text data"}

@app.get("/documents")
async def list_documents():
    """Direct access to RAG document listing"""
    return await call_rag_service("/documents", "GET")

@app.post("/search")
async def search_direct(request):
    """Direct access to RAG search"""
    return await call_rag_service("/search", "POST", json=await request.json())

# Utility endpoints
@app.get("/services")
async def list_services():
    """List all available services and their endpoints"""
    return {
        "services": {
            "ner": {
                "url": config.NER_SERVICE_URL,
                "description": "Named Entity Recognition with relationship extraction",
                "endpoints": [
                    "/analyze/text", "/analyze/file", "/analyze/url", "/analyze/multi",
                    "/download/{analysis_id}/{file_type}", "/statistics", "/entity-types", "/relationship-types"
                ]
            },
            "ocr": {
                "url": config.OCR_SERVICE_URL,
                "description": "Optical Character Recognition with document processing",
                "endpoints": [
                    "/ocr/upload", "/ocr/url", "/ocr/analyze"
                ]
            },
            "rag": {
                "url": config.RAG_SERVICE_URL,
                "description": "Retrieval-Augmented Generation with vector search",
                "endpoints": [
                    "/documents/upload", "/documents/url", "/search", "/documents", "/documents/{id}"
                ]
            }
        },
        "unified": {
            "url": f"http://localhost:{config.MAIN_PORT}",
            "description": "Unified interface for combined workflows",
            "endpoints": [
                "/analyze/unified", "/search/combined", "/ner/*", "/ocr/*", "/rag/*"
            ]
        }
    }

# Signal handlers for graceful shutdown
def signal_handler(signum, frame):
    """Handle shutdown signals"""
    logger.info(f"Received signal {signum}, initiating graceful shutdown...")
    asyncio.create_task(stop_all_services())

# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# Store start time for uptime calculation
start_time = time.time()

if __name__ == "__main__":
    print("πŸš€ Starting Unified AI Services Application")
    print("=" * 50)
    
    # Validate configuration before starting
    if not validate_environment():
        print("❌ Configuration validation failed!")
        print("Please check your .env file and ensure all required services are configured.")
        sys.exit(1)
    
    print(f"🌐 Main application will run on: http://{config.MAIN_HOST}:{config.MAIN_PORT}")
    print(f"πŸ“Š Services will be started automatically:")
    print(f"   β€’ NER Service: http://localhost:{config.ner.PORT}")
    print(f"   β€’ OCR Service: http://localhost:{config.ocr.PORT}")
    print(f"   β€’ RAG Service: http://localhost:{config.rag.PORT}")
    print("")
    print("🎯 Available endpoints:")
    print("   β€’ Main API: /")
    print("   β€’ Health Check: /health")
    print("   β€’ Unified Analysis: /analyze/unified")
    print("   β€’ Combined Search: /search/combined")
    print("   β€’ Service Proxies: /ner/*, /ocr/*, /rag/*")
    print("")
    print("πŸ“– API Documentation: /docs")
    print("")
    
    try:
        uvicorn.run(
            "app:app",
            host=config.MAIN_HOST,
            port=config.MAIN_PORT,
            reload=config.ner.DEBUG,
            log_level="info"
        )
    except KeyboardInterrupt:
        print("\nπŸ›‘ Shutting down gracefully...")
    finally:
        # Cleanup will be handled by the lifespan context manager
        pass