#!/usr/bin/env python3 """ Enhanced NER Analysis Service - Cleaned and Optimized Advanced Named Entity Recognition with Thai language support, relationship extraction, and graph database exports """ import os import io import json import logging import re import csv import tempfile import zipfile from datetime import datetime from typing import Optional, List, Dict, Any, Union, Tuple from pathlib import Path from contextlib import asynccontextmanager from collections import defaultdict import xml.etree.ElementTree as ET import httpx import asyncpg from azure.storage.blob import BlobServiceClient from azure.core.credentials import AzureKeyCredential from fastapi import FastAPI, File, UploadFile, HTTPException, Form, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from pydantic import BaseModel, HttpUrl, field_validator import uvicorn import docx from azure.ai.inference import ChatCompletionsClient from azure.ai.inference.models import SystemMessage, UserMessage from openai import AzureOpenAI # Import unified configuration try: from configs import get_config config = get_config().ner unified_config = get_config() print("✅ Using unified configuration") except ImportError: print("⚠️ Unified config not available, using fallback configuration") # Fallback configuration from dotenv import load_dotenv load_dotenv() class FallbackConfig: HOST = os.getenv("HOST", "0.0.0.0") PORT = int(os.getenv("NER_PORT", "8500")) DEBUG = os.getenv("DEBUG", "False").lower() == "true" # Database POSTGRES_HOST = os.getenv("POSTGRES_HOST", "") POSTGRES_PORT = int(os.getenv("POSTGRES_PORT", "5432")) POSTGRES_USER = os.getenv("POSTGRES_USER", "") POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "") POSTGRES_DATABASE = os.getenv("POSTGRES_DATABASE", "postgres") # APIs OCR_SERVICE_URL = os.getenv("OCR_SERVICE_URL", "http://localhost:8400") DEEPSEEK_ENDPOINT = os.getenv("DEEPSEEK_ENDPOINT", "") DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY", "") DEEPSEEK_MODEL = os.getenv("DEEPSEEK_MODEL", "DeepSeek-R1-0528") AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "") AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY", "") EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-large") # Storage AZURE_STORAGE_ACCOUNT_URL = os.getenv("AZURE_STORAGE_ACCOUNT_URL", "") AZURE_BLOB_SAS_TOKEN = os.getenv("AZURE_BLOB_SAS_TOKEN", "") BLOB_CONTAINER = os.getenv("BLOB_CONTAINER", "historylog") # Limits MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB MAX_TEXT_LENGTH = 100000 # 100KB SUPPORTED_TEXT_FORMATS = {'.txt', '.doc', '.docx', '.rtf'} SUPPORTED_OCR_FORMATS = {'.pdf', '.jpg', '.jpeg', '.png', '.tiff', '.bmp', '.gif'} ENTITY_TYPES = [ "PERSON", "ORGANIZATION", "LOCATION", "DATE", "TIME", "MONEY", "PRODUCT", "EVENT", "VEHICLE", "SUSPICIOUS_OBJECT", "ILLEGAL_ACTIVITY", "EVIDENCE", "ILLEGAL_ITEM", "WEAPON", "DRUG", "CHEMICAL", "DOCUMENT", "PHONE_NUMBER", "ADDRESS", "EMAIL" ] RELATIONSHIP_TYPES = [ "works_for", "founded", "located_in", "part_of", "associated_with", "owns", "manages", "ทำงานที่", "ก่อตั้ง", "ตั้งอยู่ที่", "เกี่ยวข้องกับ", "เป็นเจ้าของ", "arrested_by", "investigated_by", "confiscated_from", "used_in", "evidence_of", "จับกุมโดย", "สอบสวนโดย", "ยึดจาก", "หลักฐานของ" ] config = FallbackConfig() # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Export directories EXPORT_DIR = Path("exports") EXPORT_DIR.mkdir(exist_ok=True) # Global variables pg_pool = None vector_available = False clients = {} # Pydantic Models class NERRequest(BaseModel): text: Optional[str] = None url: Optional[HttpUrl] = None extract_relationships: bool = True include_embeddings: bool = True include_summary: bool = True generate_graph_files: bool = True export_formats: List[str] = ["neo4j", "json", "graphml"] @field_validator('text') @classmethod def validate_text_length(cls, v): if v and len(v) > config.MAX_TEXT_LENGTH: raise ValueError(f"Text too long (max {config.MAX_TEXT_LENGTH} characters)") return v class MultiInputRequest(BaseModel): texts: Optional[List[str]] = None urls: Optional[List[HttpUrl]] = None extract_relationships: bool = True include_embeddings: bool = True include_summary: bool = True combine_results: bool = True generate_graph_files: bool = True export_formats: List[str] = ["neo4j", "json", "graphml"] class EntityResult(BaseModel): id: str text: str label: str confidence: float start_pos: int end_pos: int source_type: Optional[str] = None source_index: Optional[int] = None frequency: int = 1 importance_score: float = 0.0 metadata: Optional[Dict[str, Any]] = None class RelationshipResult(BaseModel): id: str source_entity_id: str target_entity_id: str source_entity: str target_entity: str relationship_type: str confidence: float strength: float context: str evidence_count: int = 1 bidirectional: bool = False metadata: Optional[Dict[str, Any]] = None class NodeResult(BaseModel): id: str label: str type: str confidence: float frequency: int = 1 importance_score: float = 0.0 properties: Dict[str, Any] class LinkResult(BaseModel): id: str source: str target: str relationship: str confidence: float strength: float evidence_count: int = 1 properties: Dict[str, Any] class GraphData(BaseModel): nodes: List[NodeResult] links: List[LinkResult] metadata: Dict[str, Any] class ExportFiles(BaseModel): neo4j_nodes: Optional[str] = None neo4j_relationships: Optional[str] = None json_export: Optional[str] = None graphml_export: Optional[str] = None csv_nodes: Optional[str] = None csv_edges: Optional[str] = None gexf_export: Optional[str] = None analysis_report: Optional[str] = None download_bundle: Optional[str] = None class NERResponse(BaseModel): success: bool analysis_id: str source_text: str source_type: str language: str entities: List[EntityResult] keywords: List[str] relationships: List[RelationshipResult] summary: str embeddings: Optional[List[float]] = None graph_data: GraphData export_files: ExportFiles processing_time: float character_count: int word_count: int sentence_count: int entity_relationship_stats: Dict[str, Any] error: Optional[str] = None class MultiNERResponse(BaseModel): success: bool analysis_id: str combined_analysis: NERResponse individual_analyses: List[NERResponse] processing_time: float total_sources: int error: Optional[str] = None # Utility Functions def generate_unique_id(prefix: str = "item") -> str: """Generate unique ID with timestamp""" return f"{prefix}_{int(datetime.utcnow().timestamp() * 1000)}" def normalize_text(text: str) -> str: """Normalize text for comparison""" return re.sub(r'\s+', ' ', text.strip().lower()) def calculate_text_similarity(text1: str, text2: str) -> float: """Calculate basic text similarity""" norm1 = normalize_text(text1) norm2 = normalize_text(text2) if norm1 == norm2: return 1.0 words1 = set(norm1.split()) words2 = set(norm2.split()) if not words1 and not words2: return 1.0 if not words1 or not words2: return 0.0 intersection = words1.intersection(words2) union = words1.union(words2) return len(intersection) / len(union) if union else 0.0 def deduplicate_entities(entities: List[Dict[str, Any]], similarity_threshold: float = 0.8) -> List[Dict[str, Any]]: """Remove duplicate entities based on text similarity""" if not entities: return [] deduplicated = [] processed_texts = set() for entity in entities: entity_text = entity.get('text', '').strip() normalized_text = normalize_text(entity_text) if not entity_text or normalized_text in processed_texts: continue is_duplicate = False for existing_entity in deduplicated: existing_text = existing_entity.get('text', '') similarity = calculate_text_similarity(entity_text, existing_text) if similarity >= similarity_threshold: if entity.get('confidence', 0) > existing_entity.get('confidence', 0): deduplicated.remove(existing_entity) break else: is_duplicate = True break if not is_duplicate: entity['id'] = entity.get('id', generate_unique_id('ent')) deduplicated.append(entity) processed_texts.add(normalized_text) return deduplicated def detect_language(text: str) -> str: """Enhanced language detection""" if not text: return "en" thai_chars = len(re.findall(r'[ก-๙]', text)) english_chars = len(re.findall(r'[a-zA-Z]', text)) total_chars = thai_chars + english_chars if total_chars == 0: return "en" thai_ratio = thai_chars / total_chars if thai_ratio > 0.3: return "th" elif thai_ratio > 0.1: return "mixed" else: return "en" def get_text_stats(text: str) -> Dict[str, int]: """Get comprehensive text statistics""" return { "character_count": len(text), "word_count": len(text.split()), "sentence_count": len(re.findall(r'[.!?]+', text)), "paragraph_count": len([p for p in text.split('\n\n') if p.strip()]), "line_count": len(text.split('\n')) } # Client Management def get_blob_client(): if clients.get('blob') is None and config.AZURE_STORAGE_ACCOUNT_URL and config.AZURE_BLOB_SAS_TOKEN: try: clients['blob'] = BlobServiceClient( account_url=config.AZURE_STORAGE_ACCOUNT_URL, credential=config.AZURE_BLOB_SAS_TOKEN ) except Exception as e: logger.error(f"Failed to initialize blob client: {e}") return clients.get('blob') def get_deepseek_client(): if clients.get('deepseek') is None and config.DEEPSEEK_ENDPOINT and config.DEEPSEEK_API_KEY: try: clients['deepseek'] = ChatCompletionsClient( endpoint=config.DEEPSEEK_ENDPOINT, credential=AzureKeyCredential(config.DEEPSEEK_API_KEY), api_version="2024-05-01-preview" ) except Exception as e: logger.error(f"Failed to initialize DeepSeek client: {e}") return clients.get('deepseek') def get_openai_client(): if clients.get('openai') is None and config.AZURE_OPENAI_ENDPOINT and config.AZURE_OPENAI_API_KEY: try: clients['openai'] = AzureOpenAI( api_version="2024-12-01-preview", azure_endpoint=config.AZURE_OPENAI_ENDPOINT, api_key=config.AZURE_OPENAI_API_KEY ) except Exception as e: logger.error(f"Failed to initialize OpenAI client: {e}") return clients.get('openai') # Database Operations async def init_database(): global pg_pool, vector_available logger.info("🔄 Connecting to database...") try: pg_pool = await asyncpg.create_pool( host=config.POSTGRES_HOST, port=config.POSTGRES_PORT, user=config.POSTGRES_USER, password=config.POSTGRES_PASSWORD, database=config.POSTGRES_DATABASE, ssl='require', min_size=2, max_size=10, command_timeout=60 ) async with pg_pool.acquire() as conn: logger.info("✅ Database connected") # Check vector extension try: await conn.execute("CREATE EXTENSION IF NOT EXISTS vector;") await conn.fetchval("SELECT '[1,2,3]'::vector(3)") vector_available = True logger.info("✅ Vector extension available") except: vector_available = False logger.info("⚠️ Vector extension not available (using JSONB)") # Create tables await create_tables(conn) logger.info("✅ Database setup complete") return True except Exception as e: logger.error(f"❌ Database init failed: {e}") return False async def create_tables(conn): """Create enhanced database tables for ER model""" await conn.execute(""" CREATE TABLE IF NOT EXISTS ner_analyses ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), analysis_id VARCHAR(255) UNIQUE NOT NULL, source_text TEXT NOT NULL, source_type VARCHAR(50) NOT NULL, language VARCHAR(10) DEFAULT 'en', entities JSONB NOT NULL DEFAULT '[]', keywords JSONB NOT NULL DEFAULT '[]', relationships JSONB NOT NULL DEFAULT '[]', summary TEXT DEFAULT '', embeddings JSONB DEFAULT '[]', graph_data JSONB DEFAULT '{}', export_files JSONB DEFAULT '{}', text_stats JSONB DEFAULT '{}', er_stats JSONB DEFAULT '{}', processing_time FLOAT DEFAULT 0, entity_types JSONB DEFAULT '[]', relationship_types JSONB DEFAULT '[]', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """) await conn.execute(""" CREATE TABLE IF NOT EXISTS entities ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), entity_id VARCHAR(255) NOT NULL, analysis_id VARCHAR(255) NOT NULL, text VARCHAR(1000) NOT NULL, label VARCHAR(100) NOT NULL, confidence FLOAT DEFAULT 0, start_pos INTEGER DEFAULT 0, end_pos INTEGER DEFAULT 0, frequency INTEGER DEFAULT 1, importance_score FLOAT DEFAULT 0, metadata JSONB DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (analysis_id) REFERENCES ner_analyses(analysis_id) ON DELETE CASCADE ); """) await conn.execute(""" CREATE TABLE IF NOT EXISTS relationships ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), relationship_id VARCHAR(255) NOT NULL, analysis_id VARCHAR(255) NOT NULL, source_entity_id VARCHAR(255) NOT NULL, target_entity_id VARCHAR(255) NOT NULL, source_entity VARCHAR(1000) NOT NULL, target_entity VARCHAR(1000) NOT NULL, relationship_type VARCHAR(200) NOT NULL, confidence FLOAT DEFAULT 0, strength FLOAT DEFAULT 0, context TEXT DEFAULT '', evidence_count INTEGER DEFAULT 1, bidirectional BOOLEAN DEFAULT FALSE, metadata JSONB DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (analysis_id) REFERENCES ner_analyses(analysis_id) ON DELETE CASCADE ); """) # Create indexes try: await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_analysis_id ON ner_analyses(analysis_id); CREATE INDEX IF NOT EXISTS idx_entities_analysis ON entities(analysis_id); CREATE INDEX IF NOT EXISTS idx_relationships_analysis ON relationships(analysis_id); """) except: pass # Text Extraction def extract_text_from_file(file_content: bytes, filename: str) -> str: file_ext = Path(filename).suffix.lower() if file_ext == '.txt': return file_content.decode('utf-8', errors='ignore') elif file_ext == '.docx': doc = docx.Document(io.BytesIO(file_content)) return '\n'.join([p.text for p in doc.paragraphs]) else: return file_content.decode('utf-8', errors='ignore') async def get_text_from_ocr(file_content: bytes, filename: str) -> str: try: async with httpx.AsyncClient(timeout=300) as client: files = {'file': (filename, file_content)} response = await client.post(f"{config.OCR_SERVICE_URL}/ocr/upload", files=files) if response.status_code == 200: return response.json().get('content', '') except Exception as e: logger.error(f"OCR service error: {e}") pass raise HTTPException(status_code=500, detail="OCR processing failed") async def get_text_from_url(url: str) -> str: try: async with httpx.AsyncClient(timeout=300) as client: response = await client.post(f"{config.OCR_SERVICE_URL}/ocr/url", json={"url": str(url), "extract_images": True}) if response.status_code == 200: return response.json().get('content', '') except Exception as e: logger.error(f"URL processing error: {e}") pass raise HTTPException(status_code=500, detail="URL processing failed") # Enhanced NER and Relationship Analysis async def analyze_with_deepseek(text: str, language: str = None) -> Dict[str, Any]: """Enhanced analysis with improved relationship extraction""" deepseek_client = get_deepseek_client() if not deepseek_client: logger.warning("DeepSeek not configured, using manual extraction") return extract_manual_entities_and_relationships(text, language) try: if not language: language = detect_language(text) if language == "th": system_prompt = """คุณเป็นผู้เชี่ยวชาญในการจดจำนามเอกลักษณ์และการสกัดความสัมพันธ์สำหรับภาษาไทย วิเคราะห์ข้อความและสกัดข้อมูลดังนี้: 1. นามเอกลักษณ์ทุกประเภท (บุคคล องค์กร สถานที่ วันที่ เวลา เงิน ฯลฯ) 2. ความสัมพันธ์ระหว่างนามเอกลักษณ์ - ต้องสกัดทุกความสัมพันธ์ที่พบ 3. คำหลักสำคัญจากข้อความ 4. สรุปที่ครอบคลุม ให้ผลลัพธ์เป็น JSON: { "entities": [{"text": "ข้อความ", "label": "ประเภท", "confidence": 0.95, "start_pos": 0, "end_pos": 10}], "keywords": ["คำหลัก1", "คำหลัก2"], "relationships": [{"source_entity": "A", "target_entity": "B", "relationship_type": "ประเภท", "confidence": 0.9, "context": "บริบท"}], "summary": "สรุป" }""" else: system_prompt = """You are an expert in Named Entity Recognition and relationship extraction. Analyze the text and extract: 1. All named entities (people, organizations, locations, dates, money, etc.) 2. ALL relationships between entities - extract every relationship found 3. Important keywords from the text 4. Comprehensive summary Return ONLY valid JSON: { "entities": [{"text": "entity text", "label": "TYPE", "confidence": 0.95, "start_pos": 0, "end_pos": 10}], "keywords": ["keyword1", "keyword2"], "relationships": [{"source_entity": "Entity A", "target_entity": "Entity B", "relationship_type": "relationship_type", "confidence": 0.9, "context": "context"}], "summary": "Comprehensive summary" }""" user_prompt = f"วิเคราะห์ข้อความนี้:\n\n{text[:8000]}" if language == "th" else f"Analyze this text:\n\n{text[:8000]}" response = deepseek_client.complete( messages=[ SystemMessage(content=system_prompt), UserMessage(content=user_prompt) ], max_tokens=6000, model=config.DEEPSEEK_MODEL, temperature=0.1 ) result_text = response.choices[0].message.content.strip() # Extract JSON from response start_idx = result_text.find('{') end_idx = result_text.rfind('}') + 1 if start_idx != -1 and end_idx > start_idx: json_text = result_text[start_idx:end_idx] try: json_result = json.loads(json_text) logger.info("✅ Successfully parsed JSON from DeepSeek") except: try: fixed_json = json_text.replace("'", '"').replace('True', 'true').replace('False', 'false') json_result = json.loads(fixed_json) logger.info("✅ Successfully parsed fixed JSON") except: json_result = None else: json_result = None if json_result: entities = deduplicate_entities(json_result.get('entities', [])) keywords = json_result.get('keywords', []) relationships = json_result.get('relationships', []) summary = json_result.get('summary', '') # Ensure relationships are extracted if len(relationships) == 0 and len(entities) >= 2: logger.warning("No relationships found by DeepSeek, applying rule-based extraction") rule_based_relationships = extract_rule_based_relationships(entities, text, language) relationships.extend(rule_based_relationships) # Enhance relationships with IDs for rel in relationships: if 'id' not in rel: rel['id'] = generate_unique_id('rel') if 'strength' not in rel: rel['strength'] = rel.get('confidence', 0.8) if 'evidence_count' not in rel: rel['evidence_count'] = 1 if 'bidirectional' not in rel: rel['bidirectional'] = False return { "entities": entities, "keywords": keywords[:20], "relationships": relationships, "summary": summary or f"Analysis of {len(text)} characters" } logger.warning("JSON parsing failed, using manual extraction") return extract_manual_entities_and_relationships(text, language) except Exception as e: logger.error(f"DeepSeek analysis error: {e}") return extract_manual_entities_and_relationships(text, language) def extract_rule_based_relationships(entities: List[Dict], text: str, language: str) -> List[Dict]: """Extract relationships using rule-based approach""" relationships = [] if len(entities) < 2: return relationships # Define relationship patterns if language == "th": patterns = [ (r'(.+?)\s*ทำงาน(?:ที่|ใน|กับ)\s*(.+)', 'ทำงานที่'), (r'(.+?)\s*เป็น(?:เจ้าของ|ของ)\s*(.+)', 'เป็นเจ้าของ'), (r'(.+?)\s*ตั้งอยู่(?:ที่|ใน)\s*(.+)', 'ตั้งอยู่ที่'), (r'(.+?)\s*(?:จับกุม|จับ)\s*(.+)', 'จับกุมโดย'), ] else: patterns = [ (r'(.+?)\s*(?:works?\s+(?:for|at|in)|employed\s+by)\s*(.+)', 'works_for'), (r'(.+?)\s*(?:owns?|possesses?)\s*(.+)', 'owns'), (r'(.+?)\s*(?:located\s+(?:in|at)|based\s+in)\s*(.+)', 'located_in'), (r'(.+?)\s*(?:arrested\s+by|detained\s+by)\s*(.+)', 'arrested_by'), ] for pattern, rel_type in patterns: for match in re.finditer(pattern, text, re.IGNORECASE | re.UNICODE): source_text = match.group(1).strip() target_text = match.group(2).strip() source_entity = find_best_entity_match(source_text, entities) target_entity = find_best_entity_match(target_text, entities) if source_entity and target_entity and source_entity != target_entity: relationship = { 'id': generate_unique_id('rel'), 'source_entity': source_entity['text'], 'target_entity': target_entity['text'], 'relationship_type': rel_type, 'confidence': 0.7, 'strength': 0.7, 'context': match.group(0), 'evidence_count': 1, 'bidirectional': False, 'metadata': {'extraction_method': 'rule_based'} } relationships.append(relationship) return relationships def find_best_entity_match(text: str, entities: List[Dict]) -> Optional[Dict]: """Find the best matching entity for given text""" text_norm = normalize_text(text) for entity in entities: if normalize_text(entity['text']) == text_norm: return entity best_match = None best_score = 0 for entity in entities: score = calculate_text_similarity(text, entity['text']) if score > best_score and score > 0.6: best_score = score best_match = entity return best_match def extract_manual_entities_and_relationships(text: str, language: str = None) -> Dict[str, Any]: """Enhanced manual extraction with relationship detection""" if not language: language = detect_language(text) entities = [] keywords = [] # Enhanced patterns for different languages if language == "th": patterns = { 'PERSON': [r'(?:คุณ|นาย|นาง|นางสาว|ดร\.?)\s*[ก-๙\w\s]+'], 'ORGANIZATION': [r'บริษัท\s+[ก-๙\w\s]+(?:จำกัด|มหาชน)', r'สถานีตำรวจ[ก-๙\w\s]+'], 'LOCATION': [r'จังหวัด[ก-๙\w\s]+', r'กรุงเทพมหานคร|กรุงเทพฯ?'], 'MONEY': [r'\d+(?:,\d{3})*\s*(?:บาท|ล้านบาท|พันบาท)'], 'DATE': [r'\d{1,2}\/\d{1,2}\/\d{4}'], } words = re.findall(r'[ก-๙]+', text) thai_stop_words = {'และ', 'หรือ', 'แต่', 'ใน', 'ที่', 'เพื่อ', 'กับ', 'จาก', 'โดย', 'ของ'} keywords = [word for word in words if word not in thai_stop_words and len(word) > 2] else: patterns = { 'PERSON': [r'\b(?:Mr|Mrs|Ms|Dr|Prof)\.\s+[A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+)*'], 'ORGANIZATION': [r'\b[A-Z][a-zA-Z]+\s+(?:Inc|Corp|Company|Ltd|Co|LLC|Corporation|Limited|University)\b'], 'LOCATION': [r'\b(?:New York|Los Angeles|Chicago|Bangkok|London|Paris|Berlin)\b'], 'MONEY': [r'\$[\d,]+\.?\d*', r'\b\d+(?:,\d{3})*\s*(?:dollars?|USD|million|billion)\b'], 'DATE': [r'\b\d{1,2}\/\d{1,2}\/\d{4}\b'], } words = re.findall(r'\b[a-zA-Z]{3,}\b', text) english_stop_words = {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} keywords = [word.lower() for word in words if word.lower() not in english_stop_words] # Extract entities for label, pattern_list in patterns.items(): for pattern in pattern_list: for match in re.finditer(pattern, text, re.UNICODE | re.IGNORECASE): entity_text = match.group().strip() if len(entity_text) > 1: entities.append({ "id": generate_unique_id('ent'), "text": entity_text, "label": label, "confidence": 0.8, "start_pos": match.start(), "end_pos": match.end(), "frequency": 1, "importance_score": 0.7, "metadata": {"source": "manual_extraction"} }) # Deduplicate entities = deduplicate_entities(entities) keywords = list(set(keywords))[:20] # Extract relationships relationships = [] if len(entities) >= 2: relationships = extract_rule_based_relationships(entities, text, language) summary = f"Analysis of {len(text)} characters found {len(entities)} entities and {len(relationships)} relationships" return { "entities": entities, "keywords": keywords, "relationships": relationships, "summary": summary } async def generate_embeddings(text: str) -> List[float]: openai_client = get_openai_client() if not openai_client: return [] try: response = openai_client.embeddings.create( input=[text[:8000]], model=config.EMBEDDING_MODEL, dimensions=1536 ) return response.data[0].embedding except Exception as e: logger.error(f"Embedding failed: {e}") return [] def create_enhanced_graph_data(entities: List[Dict], relationships: List[Dict]) -> GraphData: """Create enhanced graph data with comprehensive ER model""" nodes = [] links = [] entity_map = {} # Create nodes for entity in entities: node_id = entity.get('id', generate_unique_id('ent')) entity_map[entity['text']] = node_id node_properties = { "original_text": entity['text'], "entity_type": entity['label'], "confidence": entity.get('confidence', 0.0), "start_position": entity.get('start_pos', 0), "end_position": entity.get('end_pos', 0), "frequency": entity.get('frequency', 1), "importance_score": entity.get('importance_score', 0.0), "metadata": entity.get('metadata', {}) } nodes.append(NodeResult( id=node_id, label=entity['text'], type=entity['label'], confidence=entity.get('confidence', 0.0), frequency=entity.get('frequency', 1), importance_score=entity.get('importance_score', 0.0), properties=node_properties )) # Create links for rel in relationships: source_id = entity_map.get(rel['source_entity']) target_id = entity_map.get(rel['target_entity']) if source_id and target_id: link_id = rel.get('id', generate_unique_id('link')) link_properties = { "relationship_type": rel['relationship_type'], "confidence": rel.get('confidence', 0.0), "strength": rel.get('strength', rel.get('confidence', 0.0)), "context": rel.get('context', ''), "evidence_count": rel.get('evidence_count', 1), "bidirectional": rel.get('bidirectional', False), "metadata": rel.get('metadata', {}) } links.append(LinkResult( id=link_id, source=source_id, target=target_id, relationship=rel['relationship_type'], confidence=rel.get('confidence', 0.0), strength=rel.get('strength', rel.get('confidence', 0.0)), evidence_count=rel.get('evidence_count', 1), properties=link_properties )) # Calculate metadata entity_types = defaultdict(int) relationship_types = defaultdict(int) for entity in entities: entity_types[entity['label']] += 1 for rel in relationships: relationship_types[rel['relationship_type']] += 1 metadata = { "total_entities": len(entities), "total_relationships": len(relationships), "entity_type_distribution": dict(entity_types), "relationship_type_distribution": dict(relationship_types), "graph_density": len(relationships) / (len(entities) * (len(entities) - 1) / 2) if len(entities) > 1 else 0, "average_entity_confidence": sum(entity.get('confidence', 0) for entity in entities) / len(entities) if entities else 0, "average_relationship_confidence": sum(rel.get('confidence', 0) for rel in relationships) / len(relationships) if relationships else 0, "unique_entity_types": len(entity_types), "unique_relationship_types": len(relationship_types) } return GraphData( nodes=nodes, links=links, metadata=metadata ) # Export Functions (simplified) async def generate_export_files(analysis_id: str, entities: List[Dict], relationships: List[Dict], graph_data: GraphData, formats: List[str]) -> ExportFiles: """Generate export files for various formats""" export_files = ExportFiles() analysis_dir = EXPORT_DIR / analysis_id analysis_dir.mkdir(exist_ok=True) try: if "neo4j" in formats: nodes_file, rels_file = await generate_neo4j_csv(analysis_dir, entities, relationships) export_files.neo4j_nodes = str(nodes_file) export_files.neo4j_relationships = str(rels_file) if "json" in formats: json_file = await generate_json_export(analysis_dir, entities, relationships, graph_data) export_files.json_export = str(json_file) if "graphml" in formats: graphml_file = await generate_graphml_export(analysis_dir, entities, relationships) export_files.graphml_export = str(graphml_file) logger.info(f"✅ Generated export files for analysis {analysis_id}") except Exception as e: logger.error(f"❌ Export file generation failed: {e}") return export_files async def generate_neo4j_csv(export_dir: Path, entities: List[Dict], relationships: List[Dict]) -> Tuple[Path, Path]: """Generate Neo4j compatible CSV files""" nodes_file = export_dir / "neo4j_nodes.csv" with open(nodes_file, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow([ 'nodeId:ID', 'text', 'label:LABEL', 'confidence:float', 'frequency:int', 'importance:float' ]) for entity in entities: writer.writerow([ entity.get('id', generate_unique_id('ent')), entity['text'], entity['label'], entity.get('confidence', 0.0), entity.get('frequency', 1), entity.get('importance_score', 0.0) ]) rels_file = export_dir / "neo4j_relationships.csv" entity_map = {entity['text']: entity.get('id', generate_unique_id('ent')) for entity in entities} with open(rels_file, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow([ ':START_ID', ':END_ID', ':TYPE', 'confidence:float', 'strength:float', 'context' ]) for rel in relationships: source_id = entity_map.get(rel['source_entity']) target_id = entity_map.get(rel['target_entity']) if source_id and target_id: writer.writerow([ source_id, target_id, rel['relationship_type'].upper().replace(' ', '_'), rel.get('confidence', 0.0), rel.get('strength', rel.get('confidence', 0.0)), rel.get('context', '') ]) return nodes_file, rels_file async def generate_json_export(export_dir: Path, entities: List[Dict], relationships: List[Dict], graph_data: GraphData) -> Path: """Generate comprehensive JSON export""" json_file = export_dir / "analysis_export.json" export_data = { "metadata": { "export_timestamp": datetime.utcnow().isoformat(), "format_version": "1.0", "total_entities": len(entities), "total_relationships": len(relationships) }, "entities": entities, "relationships": relationships, "graph_data": graph_data.dict(), "statistics": { "entity_types": list(set(e['label'] for e in entities)), "relationship_types": list(set(r['relationship_type'] for r in relationships)), "average_confidence": sum(e.get('confidence', 0) for e in entities) / len(entities) if entities else 0 } } with open(json_file, 'w', encoding='utf-8') as f: json.dump(export_data, f, indent=2, ensure_ascii=False) return json_file async def generate_graphml_export(export_dir: Path, entities: List[Dict], relationships: List[Dict]) -> Path: """Generate GraphML format""" graphml_file = export_dir / "graph_export.graphml" # Create GraphML structure root = ET.Element('graphml') root.set('xmlns', 'http://graphml.graphdrawing.org/xmlns') # Define attributes ET.SubElement(root, 'key', id='label', **{'for': 'node', 'attr.name': 'label', 'attr.type': 'string'}) ET.SubElement(root, 'key', id='type', **{'for': 'node', 'attr.name': 'type', 'attr.type': 'string'}) ET.SubElement(root, 'key', id='rel_type', **{'for': 'edge', 'attr.name': 'relationship', 'attr.type': 'string'}) graph = ET.SubElement(root, 'graph', id='G', edgedefault='directed') # Add nodes entity_map = {} for entity in entities: node_id = entity.get('id', generate_unique_id('ent')) entity_map[entity['text']] = node_id node = ET.SubElement(graph, 'node', id=node_id) label_data = ET.SubElement(node, 'data', key='label') label_data.text = entity['text'] type_data = ET.SubElement(node, 'data', key='type') type_data.text = entity['label'] # Add edges for i, rel in enumerate(relationships): source_id = entity_map.get(rel['source_entity']) target_id = entity_map.get(rel['target_entity']) if source_id and target_id: edge = ET.SubElement(graph, 'edge', id=f"e{i}", source=source_id, target=target_id) rel_data = ET.SubElement(edge, 'data', key='rel_type') rel_data.text = rel['relationship_type'] # Write to file tree = ET.ElementTree(root) tree.write(graphml_file, encoding='utf-8', xml_declaration=True) return graphml_file def calculate_er_stats(entities: List[Dict], relationships: List[Dict]) -> Dict[str, Any]: """Calculate Entity-Relationship statistics""" if not entities: return {} entity_types = defaultdict(int) relationship_types = defaultdict(int) for entity in entities: entity_types[entity['label']] += 1 for rel in relationships: relationship_types[rel['relationship_type']] += 1 return { "total_entities": len(entities), "total_relationships": len(relationships), "entity_type_distribution": dict(entity_types), "relationship_type_distribution": dict(relationship_types), "graph_density": len(relationships) / (len(entities) * (len(entities) - 1) / 2) if len(entities) > 1 else 0, "unique_entity_types": len(entity_types), "unique_relationship_types": len(relationship_types) } async def save_to_database(data: Dict[str, Any]) -> bool: if not pg_pool: logger.error("No database pool available") return False try: async with pg_pool.acquire() as conn: await conn.execute(""" INSERT INTO ner_analyses ( analysis_id, source_text, source_type, language, entities, keywords, relationships, summary, embeddings, graph_data, export_files, text_stats, er_stats, processing_time, entity_types, relationship_types ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (analysis_id) DO UPDATE SET entities = EXCLUDED.entities, relationships = EXCLUDED.relationships, summary = EXCLUDED.summary """, data['analysis_id'], data['source_text'][:10000], data['source_type'], data['language'], json.dumps(data['entities'], ensure_ascii=False), json.dumps(data['keywords'], ensure_ascii=False), json.dumps(data['relationships'], ensure_ascii=False), data['summary'], json.dumps(data.get('embeddings', [])), json.dumps(data.get('graph_data', {}), ensure_ascii=False, default=str), json.dumps(data.get('export_files', {}), ensure_ascii=False, default=str), json.dumps(data.get('text_stats', {})), json.dumps(data.get('er_stats', {})), float(data.get('processing_time', 0)), json.dumps(list(set(entity.get('label', '') for entity in data.get('entities', [])))), json.dumps(list(set(rel.get('relationship_type', '') for rel in data.get('relationships', [])))) ) logger.info(f"✅ Analysis {data['analysis_id']} saved to database") return True except Exception as e: logger.error(f"❌ DB save failed for {data.get('analysis_id', 'unknown')}: {e}") return False async def save_to_blob(analysis_id: str, data: Dict[str, Any]) -> bool: blob_client = get_blob_client() if not blob_client: return False try: blob_name = f"ner_analysis/{analysis_id}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json" blob_client_obj = blob_client.get_blob_client(container=config.BLOB_CONTAINER, blob=blob_name) blob_client_obj.upload_blob(json.dumps(data, indent=2, ensure_ascii=False, default=str), overwrite=True) return True except Exception as e: logger.error(f"Blob save failed: {e}") return False # App Lifecycle @asynccontextmanager async def lifespan(app: FastAPI): logger.info("🚀 Starting Enhanced NER Analysis Service...") logger.info("🔄 Database initialization...") db_ok = await init_database() if not db_ok: logger.error("❌ Database initialization failed!") raise RuntimeError("Database initialization failed") logger.info("🔄 Initializing API clients...") get_deepseek_client() get_openai_client() get_blob_client() logger.info("🔄 Creating export directories...") EXPORT_DIR.mkdir(exist_ok=True) logger.info("🎉 Enhanced NER Analysis Service is ready!") logger.info(f"📡 Server running on http://{config.HOST}:{config.PORT}") yield logger.info("🛑 Shutting down...") if pg_pool: await pg_pool.close() logger.info("✅ Database connections closed") # FastAPI App app = FastAPI( title="Enhanced NER Analysis Service", description="Advanced Named Entity Recognition with relationship extraction and graph exports", version="2.0.0", lifespan=lifespan ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # API Endpoints @app.get("/") async def root(): deepseek_available = bool(config.DEEPSEEK_ENDPOINT and config.DEEPSEEK_API_KEY) openai_available = bool(config.AZURE_OPENAI_ENDPOINT and config.AZURE_OPENAI_API_KEY) blob_available = bool(config.AZURE_STORAGE_ACCOUNT_URL and config.AZURE_BLOB_SAS_TOKEN) return { "message": "Enhanced NER Analysis Service", "version": "2.0.0", "status": "operational", "supported_entities": config.ENTITY_TYPES, "supported_relationships": config.RELATIONSHIP_TYPES[:10], "export_formats": ["neo4j", "json", "graphml"], "features": { "ner_analysis": True, "relationship_extraction": True, "thai_language_support": True, "graph_database_export": True, "embedding_generation": openai_available, "deepseek_analysis": deepseek_available, "blob_storage": blob_available } } @app.get("/health") async def health(): deepseek_available = bool(config.DEEPSEEK_ENDPOINT and config.DEEPSEEK_API_KEY) openai_available = bool(config.AZURE_OPENAI_ENDPOINT and config.AZURE_OPENAI_API_KEY) blob_available = bool(config.AZURE_STORAGE_ACCOUNT_URL and config.AZURE_BLOB_SAS_TOKEN) return { "status": "healthy", "service": "NER Analysis Service", "version": "2.0.0", "database": pg_pool is not None, "vector_extension": vector_available, "deepseek": deepseek_available, "openai": openai_available, "blob_storage": blob_available, "supported_entity_count": len(config.ENTITY_TYPES), "supported_relationship_count": len(config.RELATIONSHIP_TYPES), "export_formats": ["neo4j", "json", "graphml"] } @app.post("/analyze/text", response_model=NERResponse) async def analyze_text(request: NERRequest, background_tasks: BackgroundTasks): """Analyze text for entities and relationships""" start_time = datetime.utcnow() analysis_id = f"text_{int(start_time.timestamp())}" if not request.text or not request.text.strip(): raise HTTPException(status_code=400, detail="Text is required") try: language = detect_language(request.text) text_stats = get_text_stats(request.text) # Enhanced analysis analysis_result = await analyze_with_deepseek(request.text, language) # Generate embeddings if requested embeddings = [] if request.include_embeddings: embeddings = await generate_embeddings(request.text) # Create enhanced graph graph_data = create_enhanced_graph_data( analysis_result.get('entities', []), analysis_result.get('relationships', []) ) # Calculate ER statistics er_stats = calculate_er_stats( analysis_result.get('entities', []), analysis_result.get('relationships', []) ) # Generate export files if requested export_files = ExportFiles() if request.generate_graph_files: export_files = await generate_export_files( analysis_id, analysis_result.get('entities', []), analysis_result.get('relationships', []), graph_data, request.export_formats ) processing_time = (datetime.utcnow() - start_time).total_seconds() response_data = { "analysis_id": analysis_id, "source_text": request.text, "source_type": "text_input", "language": language, "entities": analysis_result.get('entities', []), "keywords": analysis_result.get('keywords', []), "relationships": analysis_result.get('relationships', []), "summary": analysis_result.get('summary', ''), "embeddings": embeddings, "graph_data": graph_data, "export_files": export_files, "text_stats": text_stats, "er_stats": er_stats, "processing_time": processing_time, "character_count": text_stats["character_count"], "word_count": text_stats["word_count"], "sentence_count": text_stats["sentence_count"] } # Save to database in background background_tasks.add_task(save_to_database, response_data) background_tasks.add_task(save_to_blob, analysis_id, response_data) return NERResponse( success=True, entity_relationship_stats=er_stats, **response_data ) except HTTPException: raise except Exception as e: logger.error(f"Text analysis failed: {e}") return NERResponse( success=False, analysis_id=analysis_id, source_text=request.text[:1000], source_type="text_input", language="unknown", entities=[], keywords=[], relationships=[], summary="", graph_data=GraphData(nodes=[], links=[], metadata={}), export_files=ExportFiles(), processing_time=(datetime.utcnow() - start_time).total_seconds(), character_count=0, word_count=0, sentence_count=0, entity_relationship_stats={}, error=str(e) ) @app.post("/analyze/file", response_model=NERResponse) async def analyze_file( file: UploadFile = File(...), extract_relationships: bool = Form(True), include_embeddings: bool = Form(True), include_summary: bool = Form(True), generate_graph_files: bool = Form(True), export_formats: str = Form("neo4j,json"), background_tasks: BackgroundTasks = None ): """Analyze uploaded file for entities and relationships""" start_time = datetime.utcnow() analysis_id = f"file_{int(start_time.timestamp())}" if not file.filename: raise HTTPException(status_code=400, detail="No filename") try: file_content = await file.read() if len(file_content) > config.MAX_FILE_SIZE: raise HTTPException(status_code=400, detail="File too large") file_ext = Path(file.filename).suffix.lower() export_format_list = export_formats.split(',') if export_formats else ["json"] if file_ext in config.SUPPORTED_TEXT_FORMATS: text = extract_text_from_file(file_content, file.filename) source_type = "text_file" elif file_ext in config.SUPPORTED_OCR_FORMATS: text = await get_text_from_ocr(file_content, file.filename) source_type = "ocr_file" else: raise HTTPException(status_code=400, detail=f"Unsupported format: {file_ext}") if not text.strip(): raise HTTPException(status_code=400, detail="No text extracted") language = detect_language(text) text_stats = get_text_stats(text) # Enhanced analysis analysis_result = await analyze_with_deepseek(text, language) # Generate embeddings embeddings = [] if include_embeddings: embeddings = await generate_embeddings(text) # Create enhanced graph graph_data = create_enhanced_graph_data( analysis_result.get('entities', []), analysis_result.get('relationships', []) ) # Calculate ER statistics er_stats = calculate_er_stats( analysis_result.get('entities', []), analysis_result.get('relationships', []) ) # Generate export files export_files = ExportFiles() if generate_graph_files: export_files = await generate_export_files( analysis_id, analysis_result.get('entities', []), analysis_result.get('relationships', []), graph_data, export_format_list ) processing_time = (datetime.utcnow() - start_time).total_seconds() response_data = { "analysis_id": analysis_id, "source_text": text, "source_type": source_type, "language": language, "entities": analysis_result.get('entities', []), "keywords": analysis_result.get('keywords', []), "relationships": analysis_result.get('relationships', []), "summary": analysis_result.get('summary', ''), "embeddings": embeddings, "graph_data": graph_data, "export_files": export_files, "text_stats": text_stats, "er_stats": er_stats, "processing_time": processing_time, "character_count": text_stats["character_count"], "word_count": text_stats["word_count"], "sentence_count": text_stats["sentence_count"] } # Save in background if background_tasks: background_tasks.add_task(save_to_database, response_data) background_tasks.add_task(save_to_blob, analysis_id, response_data) return NERResponse( success=True, entity_relationship_stats=er_stats, **response_data ) except HTTPException: raise except Exception as e: logger.error(f"File analysis failed: {e}") return NERResponse( success=False, analysis_id=analysis_id, source_text="", source_type="file_input", language="unknown", entities=[], keywords=[], relationships=[], summary="", graph_data=GraphData(nodes=[], links=[], metadata={}), export_files=ExportFiles(), processing_time=(datetime.utcnow() - start_time).total_seconds(), character_count=0, word_count=0, sentence_count=0, entity_relationship_stats={}, error=str(e) ) @app.post("/analyze/url", response_model=NERResponse) async def analyze_url(request: NERRequest, background_tasks: BackgroundTasks): """Analyze URL content for entities and relationships""" start_time = datetime.utcnow() analysis_id = f"url_{int(start_time.timestamp())}" if not request.url: raise HTTPException(status_code=400, detail="URL is required") try: text = await get_text_from_url(str(request.url)) if not text.strip(): raise HTTPException(status_code=400, detail="No text extracted from URL") language = detect_language(text) text_stats = get_text_stats(text) # Enhanced analysis analysis_result = await analyze_with_deepseek(text, language) # Generate embeddings embeddings = [] if request.include_embeddings: embeddings = await generate_embeddings(text) # Create enhanced graph graph_data = create_enhanced_graph_data( analysis_result.get('entities', []), analysis_result.get('relationships', []) ) # Calculate ER statistics er_stats = calculate_er_stats( analysis_result.get('entities', []), analysis_result.get('relationships', []) ) # Generate export files export_files = ExportFiles() if request.generate_graph_files: export_files = await generate_export_files( analysis_id, analysis_result.get('entities', []), analysis_result.get('relationships', []), graph_data, request.export_formats ) processing_time = (datetime.utcnow() - start_time).total_seconds() response_data = { "analysis_id": analysis_id, "source_text": text, "source_type": "url_content", "language": language, "entities": analysis_result.get('entities', []), "keywords": analysis_result.get('keywords', []), "relationships": analysis_result.get('relationships', []), "summary": analysis_result.get('summary', ''), "embeddings": embeddings, "graph_data": graph_data, "export_files": export_files, "text_stats": text_stats, "er_stats": er_stats, "processing_time": processing_time, "character_count": text_stats["character_count"], "word_count": text_stats["word_count"], "sentence_count": text_stats["sentence_count"] } # Save in background background_tasks.add_task(save_to_database, response_data) background_tasks.add_task(save_to_blob, analysis_id, response_data) return NERResponse( success=True, entity_relationship_stats=er_stats, **response_data ) except HTTPException: raise except Exception as e: logger.error(f"URL analysis failed: {e}") return NERResponse( success=False, analysis_id=analysis_id, source_text="", source_type="url_content", language="unknown", entities=[], keywords=[], relationships=[], summary="", graph_data=GraphData(nodes=[], links=[], metadata={}), export_files=ExportFiles(), processing_time=(datetime.utcnow() - start_time).total_seconds(), character_count=0, word_count=0, sentence_count=0, entity_relationship_stats={}, error=str(e) ) @app.get("/download/{analysis_id}/{file_type}") async def download_export_file(analysis_id: str, file_type: str): """Download specific export file for an analysis""" try: analysis_dir = EXPORT_DIR / analysis_id if not analysis_dir.exists(): raise HTTPException(status_code=404, detail=f"Analysis {analysis_id} not found") file_mapping = { "neo4j_nodes": "neo4j_nodes.csv", "neo4j_relationships": "neo4j_relationships.csv", "json": "analysis_export.json", "graphml": "graph_export.graphml" } if file_type not in file_mapping: raise HTTPException(status_code=400, detail=f"Invalid file type: {file_type}") file_path = analysis_dir / file_mapping[file_type] if not file_path.exists(): raise HTTPException(status_code=404, detail=f"File {file_type} not found") return FileResponse(path=file_path, filename=file_mapping[file_type]) except HTTPException: raise except Exception as e: logger.error(f"Download failed for {analysis_id}/{file_type}: {e}") raise HTTPException(status_code=500, detail=f"Download failed: {str(e)}") @app.get("/entity-types") async def get_entity_types(): """Get all supported entity types""" return { "success": True, "entity_types": config.ENTITY_TYPES, "total_count": len(config.ENTITY_TYPES) } @app.get("/relationship-types") async def get_relationship_types(): """Get all supported relationship types""" return { "success": True, "relationship_types": config.RELATIONSHIP_TYPES, "total_count": len(config.RELATIONSHIP_TYPES) } if __name__ == "__main__": print("🔧 Loading enhanced NER configuration...") print(f"🌐 Will start server on {config.HOST}:{config.PORT}") print(f"🏷️ Enhanced with {len(config.ENTITY_TYPES)} entity types") print(f"🔗 Enhanced with {len(config.RELATIONSHIP_TYPES)} relationship types") uvicorn.run( "ner_service:app", host=config.HOST, port=config.PORT, reload=config.DEBUG, log_level="info" )