|
|
|
"""
|
|
Enhanced NER Analysis Service - Cleaned and Optimized
|
|
Advanced Named Entity Recognition with Thai language support,
|
|
relationship extraction, and graph database exports
|
|
"""
|
|
|
|
import os
|
|
import io
|
|
import json
|
|
import logging
|
|
import re
|
|
import csv
|
|
import tempfile
|
|
import zipfile
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict, Any, Union, Tuple
|
|
from pathlib import Path
|
|
from contextlib import asynccontextmanager
|
|
from collections import defaultdict
|
|
import xml.etree.ElementTree as ET
|
|
|
|
import httpx
|
|
import asyncpg
|
|
from azure.storage.blob import BlobServiceClient
|
|
from azure.core.credentials import AzureKeyCredential
|
|
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, BackgroundTasks
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse
|
|
from pydantic import BaseModel, HttpUrl, field_validator
|
|
import uvicorn
|
|
import docx
|
|
from azure.ai.inference import ChatCompletionsClient
|
|
from azure.ai.inference.models import SystemMessage, UserMessage
|
|
from openai import AzureOpenAI
|
|
|
|
|
|
try:
|
|
from configs import get_config
|
|
config = get_config().ner
|
|
unified_config = get_config()
|
|
print("✅ Using unified configuration")
|
|
except ImportError:
|
|
print("⚠️ Unified config not available, using fallback configuration")
|
|
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
class FallbackConfig:
|
|
HOST = os.getenv("HOST", "0.0.0.0")
|
|
PORT = int(os.getenv("NER_PORT", "8500"))
|
|
DEBUG = os.getenv("DEBUG", "False").lower() == "true"
|
|
|
|
|
|
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "")
|
|
POSTGRES_PORT = int(os.getenv("POSTGRES_PORT", "5432"))
|
|
POSTGRES_USER = os.getenv("POSTGRES_USER", "")
|
|
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "")
|
|
POSTGRES_DATABASE = os.getenv("POSTGRES_DATABASE", "postgres")
|
|
|
|
|
|
OCR_SERVICE_URL = os.getenv("OCR_SERVICE_URL", "http://localhost:8400")
|
|
DEEPSEEK_ENDPOINT = os.getenv("DEEPSEEK_ENDPOINT", "")
|
|
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY", "")
|
|
DEEPSEEK_MODEL = os.getenv("DEEPSEEK_MODEL", "DeepSeek-R1-0528")
|
|
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "")
|
|
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY", "")
|
|
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-large")
|
|
|
|
|
|
AZURE_STORAGE_ACCOUNT_URL = os.getenv("AZURE_STORAGE_ACCOUNT_URL", "")
|
|
AZURE_BLOB_SAS_TOKEN = os.getenv("AZURE_BLOB_SAS_TOKEN", "")
|
|
BLOB_CONTAINER = os.getenv("BLOB_CONTAINER", "historylog")
|
|
|
|
|
|
MAX_FILE_SIZE = 50 * 1024 * 1024
|
|
MAX_TEXT_LENGTH = 100000
|
|
|
|
SUPPORTED_TEXT_FORMATS = {'.txt', '.doc', '.docx', '.rtf'}
|
|
SUPPORTED_OCR_FORMATS = {'.pdf', '.jpg', '.jpeg', '.png', '.tiff', '.bmp', '.gif'}
|
|
|
|
ENTITY_TYPES = [
|
|
"PERSON", "ORGANIZATION", "LOCATION", "DATE", "TIME", "MONEY", "PRODUCT", "EVENT",
|
|
"VEHICLE", "SUSPICIOUS_OBJECT", "ILLEGAL_ACTIVITY", "EVIDENCE", "ILLEGAL_ITEM",
|
|
"WEAPON", "DRUG", "CHEMICAL", "DOCUMENT", "PHONE_NUMBER", "ADDRESS", "EMAIL"
|
|
]
|
|
|
|
RELATIONSHIP_TYPES = [
|
|
"works_for", "founded", "located_in", "part_of", "associated_with", "owns", "manages",
|
|
"ทำงานที่", "ก่อตั้ง", "ตั้งอยู่ที่", "เกี่ยวข้องกับ", "เป็นเจ้าของ",
|
|
"arrested_by", "investigated_by", "confiscated_from", "used_in", "evidence_of",
|
|
"จับกุมโดย", "สอบสวนโดย", "ยึดจาก", "หลักฐานของ"
|
|
]
|
|
|
|
config = FallbackConfig()
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
EXPORT_DIR = Path("exports")
|
|
EXPORT_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
pg_pool = None
|
|
vector_available = False
|
|
clients = {}
|
|
|
|
|
|
class NERRequest(BaseModel):
|
|
text: Optional[str] = None
|
|
url: Optional[HttpUrl] = None
|
|
extract_relationships: bool = True
|
|
include_embeddings: bool = True
|
|
include_summary: bool = True
|
|
generate_graph_files: bool = True
|
|
export_formats: List[str] = ["neo4j", "json", "graphml"]
|
|
|
|
@field_validator('text')
|
|
@classmethod
|
|
def validate_text_length(cls, v):
|
|
if v and len(v) > config.MAX_TEXT_LENGTH:
|
|
raise ValueError(f"Text too long (max {config.MAX_TEXT_LENGTH} characters)")
|
|
return v
|
|
|
|
class MultiInputRequest(BaseModel):
|
|
texts: Optional[List[str]] = None
|
|
urls: Optional[List[HttpUrl]] = None
|
|
extract_relationships: bool = True
|
|
include_embeddings: bool = True
|
|
include_summary: bool = True
|
|
combine_results: bool = True
|
|
generate_graph_files: bool = True
|
|
export_formats: List[str] = ["neo4j", "json", "graphml"]
|
|
|
|
class EntityResult(BaseModel):
|
|
id: str
|
|
text: str
|
|
label: str
|
|
confidence: float
|
|
start_pos: int
|
|
end_pos: int
|
|
source_type: Optional[str] = None
|
|
source_index: Optional[int] = None
|
|
frequency: int = 1
|
|
importance_score: float = 0.0
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
class RelationshipResult(BaseModel):
|
|
id: str
|
|
source_entity_id: str
|
|
target_entity_id: str
|
|
source_entity: str
|
|
target_entity: str
|
|
relationship_type: str
|
|
confidence: float
|
|
strength: float
|
|
context: str
|
|
evidence_count: int = 1
|
|
bidirectional: bool = False
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
class NodeResult(BaseModel):
|
|
id: str
|
|
label: str
|
|
type: str
|
|
confidence: float
|
|
frequency: int = 1
|
|
importance_score: float = 0.0
|
|
properties: Dict[str, Any]
|
|
|
|
class LinkResult(BaseModel):
|
|
id: str
|
|
source: str
|
|
target: str
|
|
relationship: str
|
|
confidence: float
|
|
strength: float
|
|
evidence_count: int = 1
|
|
properties: Dict[str, Any]
|
|
|
|
class GraphData(BaseModel):
|
|
nodes: List[NodeResult]
|
|
links: List[LinkResult]
|
|
metadata: Dict[str, Any]
|
|
|
|
class ExportFiles(BaseModel):
|
|
neo4j_nodes: Optional[str] = None
|
|
neo4j_relationships: Optional[str] = None
|
|
json_export: Optional[str] = None
|
|
graphml_export: Optional[str] = None
|
|
csv_nodes: Optional[str] = None
|
|
csv_edges: Optional[str] = None
|
|
gexf_export: Optional[str] = None
|
|
analysis_report: Optional[str] = None
|
|
download_bundle: Optional[str] = None
|
|
|
|
class NERResponse(BaseModel):
|
|
success: bool
|
|
analysis_id: str
|
|
source_text: str
|
|
source_type: str
|
|
language: str
|
|
entities: List[EntityResult]
|
|
keywords: List[str]
|
|
relationships: List[RelationshipResult]
|
|
summary: str
|
|
embeddings: Optional[List[float]] = None
|
|
graph_data: GraphData
|
|
export_files: ExportFiles
|
|
processing_time: float
|
|
character_count: int
|
|
word_count: int
|
|
sentence_count: int
|
|
entity_relationship_stats: Dict[str, Any]
|
|
error: Optional[str] = None
|
|
|
|
class MultiNERResponse(BaseModel):
|
|
success: bool
|
|
analysis_id: str
|
|
combined_analysis: NERResponse
|
|
individual_analyses: List[NERResponse]
|
|
processing_time: float
|
|
total_sources: int
|
|
error: Optional[str] = None
|
|
|
|
|
|
def generate_unique_id(prefix: str = "item") -> str:
|
|
"""Generate unique ID with timestamp"""
|
|
return f"{prefix}_{int(datetime.utcnow().timestamp() * 1000)}"
|
|
|
|
def normalize_text(text: str) -> str:
|
|
"""Normalize text for comparison"""
|
|
return re.sub(r'\s+', ' ', text.strip().lower())
|
|
|
|
def calculate_text_similarity(text1: str, text2: str) -> float:
|
|
"""Calculate basic text similarity"""
|
|
norm1 = normalize_text(text1)
|
|
norm2 = normalize_text(text2)
|
|
|
|
if norm1 == norm2:
|
|
return 1.0
|
|
|
|
words1 = set(norm1.split())
|
|
words2 = set(norm2.split())
|
|
|
|
if not words1 and not words2:
|
|
return 1.0
|
|
if not words1 or not words2:
|
|
return 0.0
|
|
|
|
intersection = words1.intersection(words2)
|
|
union = words1.union(words2)
|
|
|
|
return len(intersection) / len(union) if union else 0.0
|
|
|
|
def deduplicate_entities(entities: List[Dict[str, Any]], similarity_threshold: float = 0.8) -> List[Dict[str, Any]]:
|
|
"""Remove duplicate entities based on text similarity"""
|
|
if not entities:
|
|
return []
|
|
|
|
deduplicated = []
|
|
processed_texts = set()
|
|
|
|
for entity in entities:
|
|
entity_text = entity.get('text', '').strip()
|
|
normalized_text = normalize_text(entity_text)
|
|
|
|
if not entity_text or normalized_text in processed_texts:
|
|
continue
|
|
|
|
is_duplicate = False
|
|
for existing_entity in deduplicated:
|
|
existing_text = existing_entity.get('text', '')
|
|
similarity = calculate_text_similarity(entity_text, existing_text)
|
|
|
|
if similarity >= similarity_threshold:
|
|
if entity.get('confidence', 0) > existing_entity.get('confidence', 0):
|
|
deduplicated.remove(existing_entity)
|
|
break
|
|
else:
|
|
is_duplicate = True
|
|
break
|
|
|
|
if not is_duplicate:
|
|
entity['id'] = entity.get('id', generate_unique_id('ent'))
|
|
deduplicated.append(entity)
|
|
processed_texts.add(normalized_text)
|
|
|
|
return deduplicated
|
|
|
|
def detect_language(text: str) -> str:
|
|
"""Enhanced language detection"""
|
|
if not text:
|
|
return "en"
|
|
|
|
thai_chars = len(re.findall(r'[ก-๙]', text))
|
|
english_chars = len(re.findall(r'[a-zA-Z]', text))
|
|
total_chars = thai_chars + english_chars
|
|
|
|
if total_chars == 0:
|
|
return "en"
|
|
|
|
thai_ratio = thai_chars / total_chars
|
|
|
|
if thai_ratio > 0.3:
|
|
return "th"
|
|
elif thai_ratio > 0.1:
|
|
return "mixed"
|
|
else:
|
|
return "en"
|
|
|
|
def get_text_stats(text: str) -> Dict[str, int]:
|
|
"""Get comprehensive text statistics"""
|
|
return {
|
|
"character_count": len(text),
|
|
"word_count": len(text.split()),
|
|
"sentence_count": len(re.findall(r'[.!?]+', text)),
|
|
"paragraph_count": len([p for p in text.split('\n\n') if p.strip()]),
|
|
"line_count": len(text.split('\n'))
|
|
}
|
|
|
|
|
|
def get_blob_client():
|
|
if clients.get('blob') is None and config.AZURE_STORAGE_ACCOUNT_URL and config.AZURE_BLOB_SAS_TOKEN:
|
|
try:
|
|
clients['blob'] = BlobServiceClient(
|
|
account_url=config.AZURE_STORAGE_ACCOUNT_URL,
|
|
credential=config.AZURE_BLOB_SAS_TOKEN
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize blob client: {e}")
|
|
return clients.get('blob')
|
|
|
|
def get_deepseek_client():
|
|
if clients.get('deepseek') is None and config.DEEPSEEK_ENDPOINT and config.DEEPSEEK_API_KEY:
|
|
try:
|
|
clients['deepseek'] = ChatCompletionsClient(
|
|
endpoint=config.DEEPSEEK_ENDPOINT,
|
|
credential=AzureKeyCredential(config.DEEPSEEK_API_KEY),
|
|
api_version="2024-05-01-preview"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize DeepSeek client: {e}")
|
|
return clients.get('deepseek')
|
|
|
|
def get_openai_client():
|
|
if clients.get('openai') is None and config.AZURE_OPENAI_ENDPOINT and config.AZURE_OPENAI_API_KEY:
|
|
try:
|
|
clients['openai'] = AzureOpenAI(
|
|
api_version="2024-12-01-preview",
|
|
azure_endpoint=config.AZURE_OPENAI_ENDPOINT,
|
|
api_key=config.AZURE_OPENAI_API_KEY
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize OpenAI client: {e}")
|
|
return clients.get('openai')
|
|
|
|
|
|
async def init_database():
|
|
global pg_pool, vector_available
|
|
|
|
logger.info("🔄 Connecting to database...")
|
|
try:
|
|
pg_pool = await asyncpg.create_pool(
|
|
host=config.POSTGRES_HOST,
|
|
port=config.POSTGRES_PORT,
|
|
user=config.POSTGRES_USER,
|
|
password=config.POSTGRES_PASSWORD,
|
|
database=config.POSTGRES_DATABASE,
|
|
ssl='require',
|
|
min_size=2,
|
|
max_size=10,
|
|
command_timeout=60
|
|
)
|
|
|
|
async with pg_pool.acquire() as conn:
|
|
logger.info("✅ Database connected")
|
|
|
|
|
|
try:
|
|
await conn.execute("CREATE EXTENSION IF NOT EXISTS vector;")
|
|
await conn.fetchval("SELECT '[1,2,3]'::vector(3)")
|
|
vector_available = True
|
|
logger.info("✅ Vector extension available")
|
|
except:
|
|
vector_available = False
|
|
logger.info("⚠️ Vector extension not available (using JSONB)")
|
|
|
|
|
|
await create_tables(conn)
|
|
logger.info("✅ Database setup complete")
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"❌ Database init failed: {e}")
|
|
return False
|
|
|
|
async def create_tables(conn):
|
|
"""Create enhanced database tables for ER model"""
|
|
|
|
await conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS ner_analyses (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
analysis_id VARCHAR(255) UNIQUE NOT NULL,
|
|
source_text TEXT NOT NULL,
|
|
source_type VARCHAR(50) NOT NULL,
|
|
language VARCHAR(10) DEFAULT 'en',
|
|
entities JSONB NOT NULL DEFAULT '[]',
|
|
keywords JSONB NOT NULL DEFAULT '[]',
|
|
relationships JSONB NOT NULL DEFAULT '[]',
|
|
summary TEXT DEFAULT '',
|
|
embeddings JSONB DEFAULT '[]',
|
|
graph_data JSONB DEFAULT '{}',
|
|
export_files JSONB DEFAULT '{}',
|
|
text_stats JSONB DEFAULT '{}',
|
|
er_stats JSONB DEFAULT '{}',
|
|
processing_time FLOAT DEFAULT 0,
|
|
entity_types JSONB DEFAULT '[]',
|
|
relationship_types JSONB DEFAULT '[]',
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
""")
|
|
|
|
await conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS entities (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
entity_id VARCHAR(255) NOT NULL,
|
|
analysis_id VARCHAR(255) NOT NULL,
|
|
text VARCHAR(1000) NOT NULL,
|
|
label VARCHAR(100) NOT NULL,
|
|
confidence FLOAT DEFAULT 0,
|
|
start_pos INTEGER DEFAULT 0,
|
|
end_pos INTEGER DEFAULT 0,
|
|
frequency INTEGER DEFAULT 1,
|
|
importance_score FLOAT DEFAULT 0,
|
|
metadata JSONB DEFAULT '{}',
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (analysis_id) REFERENCES ner_analyses(analysis_id) ON DELETE CASCADE
|
|
);
|
|
""")
|
|
|
|
await conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS relationships (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
relationship_id VARCHAR(255) NOT NULL,
|
|
analysis_id VARCHAR(255) NOT NULL,
|
|
source_entity_id VARCHAR(255) NOT NULL,
|
|
target_entity_id VARCHAR(255) NOT NULL,
|
|
source_entity VARCHAR(1000) NOT NULL,
|
|
target_entity VARCHAR(1000) NOT NULL,
|
|
relationship_type VARCHAR(200) NOT NULL,
|
|
confidence FLOAT DEFAULT 0,
|
|
strength FLOAT DEFAULT 0,
|
|
context TEXT DEFAULT '',
|
|
evidence_count INTEGER DEFAULT 1,
|
|
bidirectional BOOLEAN DEFAULT FALSE,
|
|
metadata JSONB DEFAULT '{}',
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (analysis_id) REFERENCES ner_analyses(analysis_id) ON DELETE CASCADE
|
|
);
|
|
""")
|
|
|
|
|
|
try:
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_analysis_id ON ner_analyses(analysis_id);
|
|
CREATE INDEX IF NOT EXISTS idx_entities_analysis ON entities(analysis_id);
|
|
CREATE INDEX IF NOT EXISTS idx_relationships_analysis ON relationships(analysis_id);
|
|
""")
|
|
except:
|
|
pass
|
|
|
|
|
|
def extract_text_from_file(file_content: bytes, filename: str) -> str:
|
|
file_ext = Path(filename).suffix.lower()
|
|
|
|
if file_ext == '.txt':
|
|
return file_content.decode('utf-8', errors='ignore')
|
|
elif file_ext == '.docx':
|
|
doc = docx.Document(io.BytesIO(file_content))
|
|
return '\n'.join([p.text for p in doc.paragraphs])
|
|
else:
|
|
return file_content.decode('utf-8', errors='ignore')
|
|
|
|
async def get_text_from_ocr(file_content: bytes, filename: str) -> str:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300) as client:
|
|
files = {'file': (filename, file_content)}
|
|
response = await client.post(f"{config.OCR_SERVICE_URL}/ocr/upload", files=files)
|
|
if response.status_code == 200:
|
|
return response.json().get('content', '')
|
|
except Exception as e:
|
|
logger.error(f"OCR service error: {e}")
|
|
pass
|
|
raise HTTPException(status_code=500, detail="OCR processing failed")
|
|
|
|
async def get_text_from_url(url: str) -> str:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300) as client:
|
|
response = await client.post(f"{config.OCR_SERVICE_URL}/ocr/url",
|
|
json={"url": str(url), "extract_images": True})
|
|
if response.status_code == 200:
|
|
return response.json().get('content', '')
|
|
except Exception as e:
|
|
logger.error(f"URL processing error: {e}")
|
|
pass
|
|
raise HTTPException(status_code=500, detail="URL processing failed")
|
|
|
|
|
|
async def analyze_with_deepseek(text: str, language: str = None) -> Dict[str, Any]:
|
|
"""Enhanced analysis with improved relationship extraction"""
|
|
deepseek_client = get_deepseek_client()
|
|
if not deepseek_client:
|
|
logger.warning("DeepSeek not configured, using manual extraction")
|
|
return extract_manual_entities_and_relationships(text, language)
|
|
|
|
try:
|
|
if not language:
|
|
language = detect_language(text)
|
|
|
|
if language == "th":
|
|
system_prompt = """คุณเป็นผู้เชี่ยวชาญในการจดจำนามเอกลักษณ์และการสกัดความสัมพันธ์สำหรับภาษาไทย
|
|
|
|
วิเคราะห์ข้อความและสกัดข้อมูลดังนี้:
|
|
1. นามเอกลักษณ์ทุกประเภท (บุคคล องค์กร สถานที่ วันที่ เวลา เงิน ฯลฯ)
|
|
2. ความสัมพันธ์ระหว่างนามเอกลักษณ์ - ต้องสกัดทุกความสัมพันธ์ที่พบ
|
|
3. คำหลักสำคัญจากข้อความ
|
|
4. สรุปที่ครอบคลุม
|
|
|
|
ให้ผลลัพธ์เป็น JSON:
|
|
{
|
|
"entities": [{"text": "ข้อความ", "label": "ประเภท", "confidence": 0.95, "start_pos": 0, "end_pos": 10}],
|
|
"keywords": ["คำหลัก1", "คำหลัก2"],
|
|
"relationships": [{"source_entity": "A", "target_entity": "B", "relationship_type": "ประเภท", "confidence": 0.9, "context": "บริบท"}],
|
|
"summary": "สรุป"
|
|
}"""
|
|
else:
|
|
system_prompt = """You are an expert in Named Entity Recognition and relationship extraction.
|
|
|
|
Analyze the text and extract:
|
|
1. All named entities (people, organizations, locations, dates, money, etc.)
|
|
2. ALL relationships between entities - extract every relationship found
|
|
3. Important keywords from the text
|
|
4. Comprehensive summary
|
|
|
|
Return ONLY valid JSON:
|
|
{
|
|
"entities": [{"text": "entity text", "label": "TYPE", "confidence": 0.95, "start_pos": 0, "end_pos": 10}],
|
|
"keywords": ["keyword1", "keyword2"],
|
|
"relationships": [{"source_entity": "Entity A", "target_entity": "Entity B", "relationship_type": "relationship_type", "confidence": 0.9, "context": "context"}],
|
|
"summary": "Comprehensive summary"
|
|
}"""
|
|
|
|
user_prompt = f"วิเคราะห์ข้อความนี้:\n\n{text[:8000]}" if language == "th" else f"Analyze this text:\n\n{text[:8000]}"
|
|
|
|
response = deepseek_client.complete(
|
|
messages=[
|
|
SystemMessage(content=system_prompt),
|
|
UserMessage(content=user_prompt)
|
|
],
|
|
max_tokens=6000,
|
|
model=config.DEEPSEEK_MODEL,
|
|
temperature=0.1
|
|
)
|
|
|
|
result_text = response.choices[0].message.content.strip()
|
|
|
|
|
|
start_idx = result_text.find('{')
|
|
end_idx = result_text.rfind('}') + 1
|
|
if start_idx != -1 and end_idx > start_idx:
|
|
json_text = result_text[start_idx:end_idx]
|
|
try:
|
|
json_result = json.loads(json_text)
|
|
logger.info("✅ Successfully parsed JSON from DeepSeek")
|
|
except:
|
|
try:
|
|
fixed_json = json_text.replace("'", '"').replace('True', 'true').replace('False', 'false')
|
|
json_result = json.loads(fixed_json)
|
|
logger.info("✅ Successfully parsed fixed JSON")
|
|
except:
|
|
json_result = None
|
|
else:
|
|
json_result = None
|
|
|
|
if json_result:
|
|
entities = deduplicate_entities(json_result.get('entities', []))
|
|
keywords = json_result.get('keywords', [])
|
|
relationships = json_result.get('relationships', [])
|
|
summary = json_result.get('summary', '')
|
|
|
|
|
|
if len(relationships) == 0 and len(entities) >= 2:
|
|
logger.warning("No relationships found by DeepSeek, applying rule-based extraction")
|
|
rule_based_relationships = extract_rule_based_relationships(entities, text, language)
|
|
relationships.extend(rule_based_relationships)
|
|
|
|
|
|
for rel in relationships:
|
|
if 'id' not in rel:
|
|
rel['id'] = generate_unique_id('rel')
|
|
if 'strength' not in rel:
|
|
rel['strength'] = rel.get('confidence', 0.8)
|
|
if 'evidence_count' not in rel:
|
|
rel['evidence_count'] = 1
|
|
if 'bidirectional' not in rel:
|
|
rel['bidirectional'] = False
|
|
|
|
return {
|
|
"entities": entities,
|
|
"keywords": keywords[:20],
|
|
"relationships": relationships,
|
|
"summary": summary or f"Analysis of {len(text)} characters"
|
|
}
|
|
|
|
logger.warning("JSON parsing failed, using manual extraction")
|
|
return extract_manual_entities_and_relationships(text, language)
|
|
|
|
except Exception as e:
|
|
logger.error(f"DeepSeek analysis error: {e}")
|
|
return extract_manual_entities_and_relationships(text, language)
|
|
|
|
def extract_rule_based_relationships(entities: List[Dict], text: str, language: str) -> List[Dict]:
|
|
"""Extract relationships using rule-based approach"""
|
|
relationships = []
|
|
|
|
if len(entities) < 2:
|
|
return relationships
|
|
|
|
|
|
if language == "th":
|
|
patterns = [
|
|
(r'(.+?)\s*ทำงาน(?:ที่|ใน|กับ)\s*(.+)', 'ทำงานที่'),
|
|
(r'(.+?)\s*เป็น(?:เจ้าของ|ของ)\s*(.+)', 'เป็นเจ้าของ'),
|
|
(r'(.+?)\s*ตั้งอยู่(?:ที่|ใน)\s*(.+)', 'ตั้งอยู่ที่'),
|
|
(r'(.+?)\s*(?:จับกุม|จับ)\s*(.+)', 'จับกุมโดย'),
|
|
]
|
|
else:
|
|
patterns = [
|
|
(r'(.+?)\s*(?:works?\s+(?:for|at|in)|employed\s+by)\s*(.+)', 'works_for'),
|
|
(r'(.+?)\s*(?:owns?|possesses?)\s*(.+)', 'owns'),
|
|
(r'(.+?)\s*(?:located\s+(?:in|at)|based\s+in)\s*(.+)', 'located_in'),
|
|
(r'(.+?)\s*(?:arrested\s+by|detained\s+by)\s*(.+)', 'arrested_by'),
|
|
]
|
|
|
|
for pattern, rel_type in patterns:
|
|
for match in re.finditer(pattern, text, re.IGNORECASE | re.UNICODE):
|
|
source_text = match.group(1).strip()
|
|
target_text = match.group(2).strip()
|
|
|
|
source_entity = find_best_entity_match(source_text, entities)
|
|
target_entity = find_best_entity_match(target_text, entities)
|
|
|
|
if source_entity and target_entity and source_entity != target_entity:
|
|
relationship = {
|
|
'id': generate_unique_id('rel'),
|
|
'source_entity': source_entity['text'],
|
|
'target_entity': target_entity['text'],
|
|
'relationship_type': rel_type,
|
|
'confidence': 0.7,
|
|
'strength': 0.7,
|
|
'context': match.group(0),
|
|
'evidence_count': 1,
|
|
'bidirectional': False,
|
|
'metadata': {'extraction_method': 'rule_based'}
|
|
}
|
|
relationships.append(relationship)
|
|
|
|
return relationships
|
|
|
|
def find_best_entity_match(text: str, entities: List[Dict]) -> Optional[Dict]:
|
|
"""Find the best matching entity for given text"""
|
|
text_norm = normalize_text(text)
|
|
|
|
for entity in entities:
|
|
if normalize_text(entity['text']) == text_norm:
|
|
return entity
|
|
|
|
best_match = None
|
|
best_score = 0
|
|
|
|
for entity in entities:
|
|
score = calculate_text_similarity(text, entity['text'])
|
|
if score > best_score and score > 0.6:
|
|
best_score = score
|
|
best_match = entity
|
|
|
|
return best_match
|
|
|
|
def extract_manual_entities_and_relationships(text: str, language: str = None) -> Dict[str, Any]:
|
|
"""Enhanced manual extraction with relationship detection"""
|
|
if not language:
|
|
language = detect_language(text)
|
|
|
|
entities = []
|
|
keywords = []
|
|
|
|
|
|
if language == "th":
|
|
patterns = {
|
|
'PERSON': [r'(?:คุณ|นาย|นาง|นางสาว|ดร\.?)\s*[ก-๙\w\s]+'],
|
|
'ORGANIZATION': [r'บริษัท\s+[ก-๙\w\s]+(?:จำกัด|มหาชน)', r'สถานีตำรวจ[ก-๙\w\s]+'],
|
|
'LOCATION': [r'จังหวัด[ก-๙\w\s]+', r'กรุงเทพมหานคร|กรุงเทพฯ?'],
|
|
'MONEY': [r'\d+(?:,\d{3})*\s*(?:บาท|ล้านบาท|พันบาท)'],
|
|
'DATE': [r'\d{1,2}\/\d{1,2}\/\d{4}'],
|
|
}
|
|
words = re.findall(r'[ก-๙]+', text)
|
|
thai_stop_words = {'และ', 'หรือ', 'แต่', 'ใน', 'ที่', 'เพื่อ', 'กับ', 'จาก', 'โดย', 'ของ'}
|
|
keywords = [word for word in words if word not in thai_stop_words and len(word) > 2]
|
|
else:
|
|
patterns = {
|
|
'PERSON': [r'\b(?:Mr|Mrs|Ms|Dr|Prof)\.\s+[A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+)*'],
|
|
'ORGANIZATION': [r'\b[A-Z][a-zA-Z]+\s+(?:Inc|Corp|Company|Ltd|Co|LLC|Corporation|Limited|University)\b'],
|
|
'LOCATION': [r'\b(?:New York|Los Angeles|Chicago|Bangkok|London|Paris|Berlin)\b'],
|
|
'MONEY': [r'\$[\d,]+\.?\d*', r'\b\d+(?:,\d{3})*\s*(?:dollars?|USD|million|billion)\b'],
|
|
'DATE': [r'\b\d{1,2}\/\d{1,2}\/\d{4}\b'],
|
|
}
|
|
words = re.findall(r'\b[a-zA-Z]{3,}\b', text)
|
|
english_stop_words = {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
|
|
keywords = [word.lower() for word in words if word.lower() not in english_stop_words]
|
|
|
|
|
|
for label, pattern_list in patterns.items():
|
|
for pattern in pattern_list:
|
|
for match in re.finditer(pattern, text, re.UNICODE | re.IGNORECASE):
|
|
entity_text = match.group().strip()
|
|
if len(entity_text) > 1:
|
|
entities.append({
|
|
"id": generate_unique_id('ent'),
|
|
"text": entity_text,
|
|
"label": label,
|
|
"confidence": 0.8,
|
|
"start_pos": match.start(),
|
|
"end_pos": match.end(),
|
|
"frequency": 1,
|
|
"importance_score": 0.7,
|
|
"metadata": {"source": "manual_extraction"}
|
|
})
|
|
|
|
|
|
entities = deduplicate_entities(entities)
|
|
keywords = list(set(keywords))[:20]
|
|
|
|
|
|
relationships = []
|
|
if len(entities) >= 2:
|
|
relationships = extract_rule_based_relationships(entities, text, language)
|
|
|
|
summary = f"Analysis of {len(text)} characters found {len(entities)} entities and {len(relationships)} relationships"
|
|
|
|
return {
|
|
"entities": entities,
|
|
"keywords": keywords,
|
|
"relationships": relationships,
|
|
"summary": summary
|
|
}
|
|
|
|
async def generate_embeddings(text: str) -> List[float]:
|
|
openai_client = get_openai_client()
|
|
if not openai_client:
|
|
return []
|
|
|
|
try:
|
|
response = openai_client.embeddings.create(
|
|
input=[text[:8000]],
|
|
model=config.EMBEDDING_MODEL,
|
|
dimensions=1536
|
|
)
|
|
return response.data[0].embedding
|
|
except Exception as e:
|
|
logger.error(f"Embedding failed: {e}")
|
|
return []
|
|
|
|
def create_enhanced_graph_data(entities: List[Dict], relationships: List[Dict]) -> GraphData:
|
|
"""Create enhanced graph data with comprehensive ER model"""
|
|
nodes = []
|
|
links = []
|
|
entity_map = {}
|
|
|
|
|
|
for entity in entities:
|
|
node_id = entity.get('id', generate_unique_id('ent'))
|
|
entity_map[entity['text']] = node_id
|
|
|
|
node_properties = {
|
|
"original_text": entity['text'],
|
|
"entity_type": entity['label'],
|
|
"confidence": entity.get('confidence', 0.0),
|
|
"start_position": entity.get('start_pos', 0),
|
|
"end_position": entity.get('end_pos', 0),
|
|
"frequency": entity.get('frequency', 1),
|
|
"importance_score": entity.get('importance_score', 0.0),
|
|
"metadata": entity.get('metadata', {})
|
|
}
|
|
|
|
nodes.append(NodeResult(
|
|
id=node_id,
|
|
label=entity['text'],
|
|
type=entity['label'],
|
|
confidence=entity.get('confidence', 0.0),
|
|
frequency=entity.get('frequency', 1),
|
|
importance_score=entity.get('importance_score', 0.0),
|
|
properties=node_properties
|
|
))
|
|
|
|
|
|
for rel in relationships:
|
|
source_id = entity_map.get(rel['source_entity'])
|
|
target_id = entity_map.get(rel['target_entity'])
|
|
|
|
if source_id and target_id:
|
|
link_id = rel.get('id', generate_unique_id('link'))
|
|
|
|
link_properties = {
|
|
"relationship_type": rel['relationship_type'],
|
|
"confidence": rel.get('confidence', 0.0),
|
|
"strength": rel.get('strength', rel.get('confidence', 0.0)),
|
|
"context": rel.get('context', ''),
|
|
"evidence_count": rel.get('evidence_count', 1),
|
|
"bidirectional": rel.get('bidirectional', False),
|
|
"metadata": rel.get('metadata', {})
|
|
}
|
|
|
|
links.append(LinkResult(
|
|
id=link_id,
|
|
source=source_id,
|
|
target=target_id,
|
|
relationship=rel['relationship_type'],
|
|
confidence=rel.get('confidence', 0.0),
|
|
strength=rel.get('strength', rel.get('confidence', 0.0)),
|
|
evidence_count=rel.get('evidence_count', 1),
|
|
properties=link_properties
|
|
))
|
|
|
|
|
|
entity_types = defaultdict(int)
|
|
relationship_types = defaultdict(int)
|
|
|
|
for entity in entities:
|
|
entity_types[entity['label']] += 1
|
|
|
|
for rel in relationships:
|
|
relationship_types[rel['relationship_type']] += 1
|
|
|
|
metadata = {
|
|
"total_entities": len(entities),
|
|
"total_relationships": len(relationships),
|
|
"entity_type_distribution": dict(entity_types),
|
|
"relationship_type_distribution": dict(relationship_types),
|
|
"graph_density": len(relationships) / (len(entities) * (len(entities) - 1) / 2) if len(entities) > 1 else 0,
|
|
"average_entity_confidence": sum(entity.get('confidence', 0) for entity in entities) / len(entities) if entities else 0,
|
|
"average_relationship_confidence": sum(rel.get('confidence', 0) for rel in relationships) / len(relationships) if relationships else 0,
|
|
"unique_entity_types": len(entity_types),
|
|
"unique_relationship_types": len(relationship_types)
|
|
}
|
|
|
|
return GraphData(
|
|
nodes=nodes,
|
|
links=links,
|
|
metadata=metadata
|
|
)
|
|
|
|
|
|
async def generate_export_files(analysis_id: str, entities: List[Dict], relationships: List[Dict],
|
|
graph_data: GraphData, formats: List[str]) -> ExportFiles:
|
|
"""Generate export files for various formats"""
|
|
|
|
export_files = ExportFiles()
|
|
analysis_dir = EXPORT_DIR / analysis_id
|
|
analysis_dir.mkdir(exist_ok=True)
|
|
|
|
try:
|
|
if "neo4j" in formats:
|
|
nodes_file, rels_file = await generate_neo4j_csv(analysis_dir, entities, relationships)
|
|
export_files.neo4j_nodes = str(nodes_file)
|
|
export_files.neo4j_relationships = str(rels_file)
|
|
|
|
if "json" in formats:
|
|
json_file = await generate_json_export(analysis_dir, entities, relationships, graph_data)
|
|
export_files.json_export = str(json_file)
|
|
|
|
if "graphml" in formats:
|
|
graphml_file = await generate_graphml_export(analysis_dir, entities, relationships)
|
|
export_files.graphml_export = str(graphml_file)
|
|
|
|
logger.info(f"✅ Generated export files for analysis {analysis_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Export file generation failed: {e}")
|
|
|
|
return export_files
|
|
|
|
async def generate_neo4j_csv(export_dir: Path, entities: List[Dict], relationships: List[Dict]) -> Tuple[Path, Path]:
|
|
"""Generate Neo4j compatible CSV files"""
|
|
|
|
nodes_file = export_dir / "neo4j_nodes.csv"
|
|
with open(nodes_file, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow([
|
|
'nodeId:ID', 'text', 'label:LABEL', 'confidence:float',
|
|
'frequency:int', 'importance:float'
|
|
])
|
|
|
|
for entity in entities:
|
|
writer.writerow([
|
|
entity.get('id', generate_unique_id('ent')),
|
|
entity['text'],
|
|
entity['label'],
|
|
entity.get('confidence', 0.0),
|
|
entity.get('frequency', 1),
|
|
entity.get('importance_score', 0.0)
|
|
])
|
|
|
|
rels_file = export_dir / "neo4j_relationships.csv"
|
|
entity_map = {entity['text']: entity.get('id', generate_unique_id('ent')) for entity in entities}
|
|
|
|
with open(rels_file, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow([
|
|
':START_ID', ':END_ID', ':TYPE', 'confidence:float',
|
|
'strength:float', 'context'
|
|
])
|
|
|
|
for rel in relationships:
|
|
source_id = entity_map.get(rel['source_entity'])
|
|
target_id = entity_map.get(rel['target_entity'])
|
|
|
|
if source_id and target_id:
|
|
writer.writerow([
|
|
source_id,
|
|
target_id,
|
|
rel['relationship_type'].upper().replace(' ', '_'),
|
|
rel.get('confidence', 0.0),
|
|
rel.get('strength', rel.get('confidence', 0.0)),
|
|
rel.get('context', '')
|
|
])
|
|
|
|
return nodes_file, rels_file
|
|
|
|
async def generate_json_export(export_dir: Path, entities: List[Dict], relationships: List[Dict], graph_data: GraphData) -> Path:
|
|
"""Generate comprehensive JSON export"""
|
|
|
|
json_file = export_dir / "analysis_export.json"
|
|
|
|
export_data = {
|
|
"metadata": {
|
|
"export_timestamp": datetime.utcnow().isoformat(),
|
|
"format_version": "1.0",
|
|
"total_entities": len(entities),
|
|
"total_relationships": len(relationships)
|
|
},
|
|
"entities": entities,
|
|
"relationships": relationships,
|
|
"graph_data": graph_data.dict(),
|
|
"statistics": {
|
|
"entity_types": list(set(e['label'] for e in entities)),
|
|
"relationship_types": list(set(r['relationship_type'] for r in relationships)),
|
|
"average_confidence": sum(e.get('confidence', 0) for e in entities) / len(entities) if entities else 0
|
|
}
|
|
}
|
|
|
|
with open(json_file, 'w', encoding='utf-8') as f:
|
|
json.dump(export_data, f, indent=2, ensure_ascii=False)
|
|
|
|
return json_file
|
|
|
|
async def generate_graphml_export(export_dir: Path, entities: List[Dict], relationships: List[Dict]) -> Path:
|
|
"""Generate GraphML format"""
|
|
|
|
graphml_file = export_dir / "graph_export.graphml"
|
|
|
|
|
|
root = ET.Element('graphml')
|
|
root.set('xmlns', 'http://graphml.graphdrawing.org/xmlns')
|
|
|
|
|
|
ET.SubElement(root, 'key', id='label', **{'for': 'node', 'attr.name': 'label', 'attr.type': 'string'})
|
|
ET.SubElement(root, 'key', id='type', **{'for': 'node', 'attr.name': 'type', 'attr.type': 'string'})
|
|
ET.SubElement(root, 'key', id='rel_type', **{'for': 'edge', 'attr.name': 'relationship', 'attr.type': 'string'})
|
|
|
|
graph = ET.SubElement(root, 'graph', id='G', edgedefault='directed')
|
|
|
|
|
|
entity_map = {}
|
|
for entity in entities:
|
|
node_id = entity.get('id', generate_unique_id('ent'))
|
|
entity_map[entity['text']] = node_id
|
|
|
|
node = ET.SubElement(graph, 'node', id=node_id)
|
|
|
|
label_data = ET.SubElement(node, 'data', key='label')
|
|
label_data.text = entity['text']
|
|
|
|
type_data = ET.SubElement(node, 'data', key='type')
|
|
type_data.text = entity['label']
|
|
|
|
|
|
for i, rel in enumerate(relationships):
|
|
source_id = entity_map.get(rel['source_entity'])
|
|
target_id = entity_map.get(rel['target_entity'])
|
|
|
|
if source_id and target_id:
|
|
edge = ET.SubElement(graph, 'edge', id=f"e{i}", source=source_id, target=target_id)
|
|
|
|
rel_data = ET.SubElement(edge, 'data', key='rel_type')
|
|
rel_data.text = rel['relationship_type']
|
|
|
|
|
|
tree = ET.ElementTree(root)
|
|
tree.write(graphml_file, encoding='utf-8', xml_declaration=True)
|
|
|
|
return graphml_file
|
|
|
|
def calculate_er_stats(entities: List[Dict], relationships: List[Dict]) -> Dict[str, Any]:
|
|
"""Calculate Entity-Relationship statistics"""
|
|
|
|
if not entities:
|
|
return {}
|
|
|
|
entity_types = defaultdict(int)
|
|
relationship_types = defaultdict(int)
|
|
|
|
for entity in entities:
|
|
entity_types[entity['label']] += 1
|
|
|
|
for rel in relationships:
|
|
relationship_types[rel['relationship_type']] += 1
|
|
|
|
return {
|
|
"total_entities": len(entities),
|
|
"total_relationships": len(relationships),
|
|
"entity_type_distribution": dict(entity_types),
|
|
"relationship_type_distribution": dict(relationship_types),
|
|
"graph_density": len(relationships) / (len(entities) * (len(entities) - 1) / 2) if len(entities) > 1 else 0,
|
|
"unique_entity_types": len(entity_types),
|
|
"unique_relationship_types": len(relationship_types)
|
|
}
|
|
|
|
async def save_to_database(data: Dict[str, Any]) -> bool:
|
|
if not pg_pool:
|
|
logger.error("No database pool available")
|
|
return False
|
|
|
|
try:
|
|
async with pg_pool.acquire() as conn:
|
|
await conn.execute("""
|
|
INSERT INTO ner_analyses (
|
|
analysis_id, source_text, source_type, language, entities, keywords,
|
|
relationships, summary, embeddings, graph_data, export_files, text_stats,
|
|
er_stats, processing_time, entity_types, relationship_types
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
|
ON CONFLICT (analysis_id) DO UPDATE SET
|
|
entities = EXCLUDED.entities,
|
|
relationships = EXCLUDED.relationships,
|
|
summary = EXCLUDED.summary
|
|
""",
|
|
data['analysis_id'],
|
|
data['source_text'][:10000],
|
|
data['source_type'],
|
|
data['language'],
|
|
json.dumps(data['entities'], ensure_ascii=False),
|
|
json.dumps(data['keywords'], ensure_ascii=False),
|
|
json.dumps(data['relationships'], ensure_ascii=False),
|
|
data['summary'],
|
|
json.dumps(data.get('embeddings', [])),
|
|
json.dumps(data.get('graph_data', {}), ensure_ascii=False, default=str),
|
|
json.dumps(data.get('export_files', {}), ensure_ascii=False, default=str),
|
|
json.dumps(data.get('text_stats', {})),
|
|
json.dumps(data.get('er_stats', {})),
|
|
float(data.get('processing_time', 0)),
|
|
json.dumps(list(set(entity.get('label', '') for entity in data.get('entities', [])))),
|
|
json.dumps(list(set(rel.get('relationship_type', '') for rel in data.get('relationships', []))))
|
|
)
|
|
|
|
logger.info(f"✅ Analysis {data['analysis_id']} saved to database")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"❌ DB save failed for {data.get('analysis_id', 'unknown')}: {e}")
|
|
return False
|
|
|
|
async def save_to_blob(analysis_id: str, data: Dict[str, Any]) -> bool:
|
|
blob_client = get_blob_client()
|
|
if not blob_client:
|
|
return False
|
|
|
|
try:
|
|
blob_name = f"ner_analysis/{analysis_id}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
|
|
blob_client_obj = blob_client.get_blob_client(container=config.BLOB_CONTAINER, blob=blob_name)
|
|
blob_client_obj.upload_blob(json.dumps(data, indent=2, ensure_ascii=False, default=str), overwrite=True)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Blob save failed: {e}")
|
|
return False
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
logger.info("🚀 Starting Enhanced NER Analysis Service...")
|
|
|
|
logger.info("🔄 Database initialization...")
|
|
db_ok = await init_database()
|
|
if not db_ok:
|
|
logger.error("❌ Database initialization failed!")
|
|
raise RuntimeError("Database initialization failed")
|
|
|
|
logger.info("🔄 Initializing API clients...")
|
|
get_deepseek_client()
|
|
get_openai_client()
|
|
get_blob_client()
|
|
|
|
logger.info("🔄 Creating export directories...")
|
|
EXPORT_DIR.mkdir(exist_ok=True)
|
|
|
|
logger.info("🎉 Enhanced NER Analysis Service is ready!")
|
|
logger.info(f"📡 Server running on http://{config.HOST}:{config.PORT}")
|
|
|
|
yield
|
|
|
|
logger.info("🛑 Shutting down...")
|
|
if pg_pool:
|
|
await pg_pool.close()
|
|
logger.info("✅ Database connections closed")
|
|
|
|
|
|
app = FastAPI(
|
|
title="Enhanced NER Analysis Service",
|
|
description="Advanced Named Entity Recognition with relationship extraction and graph exports",
|
|
version="2.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
deepseek_available = bool(config.DEEPSEEK_ENDPOINT and config.DEEPSEEK_API_KEY)
|
|
openai_available = bool(config.AZURE_OPENAI_ENDPOINT and config.AZURE_OPENAI_API_KEY)
|
|
blob_available = bool(config.AZURE_STORAGE_ACCOUNT_URL and config.AZURE_BLOB_SAS_TOKEN)
|
|
|
|
return {
|
|
"message": "Enhanced NER Analysis Service",
|
|
"version": "2.0.0",
|
|
"status": "operational",
|
|
"supported_entities": config.ENTITY_TYPES,
|
|
"supported_relationships": config.RELATIONSHIP_TYPES[:10],
|
|
"export_formats": ["neo4j", "json", "graphml"],
|
|
"features": {
|
|
"ner_analysis": True,
|
|
"relationship_extraction": True,
|
|
"thai_language_support": True,
|
|
"graph_database_export": True,
|
|
"embedding_generation": openai_available,
|
|
"deepseek_analysis": deepseek_available,
|
|
"blob_storage": blob_available
|
|
}
|
|
}
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
deepseek_available = bool(config.DEEPSEEK_ENDPOINT and config.DEEPSEEK_API_KEY)
|
|
openai_available = bool(config.AZURE_OPENAI_ENDPOINT and config.AZURE_OPENAI_API_KEY)
|
|
blob_available = bool(config.AZURE_STORAGE_ACCOUNT_URL and config.AZURE_BLOB_SAS_TOKEN)
|
|
|
|
return {
|
|
"status": "healthy",
|
|
"service": "NER Analysis Service",
|
|
"version": "2.0.0",
|
|
"database": pg_pool is not None,
|
|
"vector_extension": vector_available,
|
|
"deepseek": deepseek_available,
|
|
"openai": openai_available,
|
|
"blob_storage": blob_available,
|
|
"supported_entity_count": len(config.ENTITY_TYPES),
|
|
"supported_relationship_count": len(config.RELATIONSHIP_TYPES),
|
|
"export_formats": ["neo4j", "json", "graphml"]
|
|
}
|
|
|
|
@app.post("/analyze/text", response_model=NERResponse)
|
|
async def analyze_text(request: NERRequest, background_tasks: BackgroundTasks):
|
|
"""Analyze text for entities and relationships"""
|
|
start_time = datetime.utcnow()
|
|
analysis_id = f"text_{int(start_time.timestamp())}"
|
|
|
|
if not request.text or not request.text.strip():
|
|
raise HTTPException(status_code=400, detail="Text is required")
|
|
|
|
try:
|
|
language = detect_language(request.text)
|
|
text_stats = get_text_stats(request.text)
|
|
|
|
|
|
analysis_result = await analyze_with_deepseek(request.text, language)
|
|
|
|
|
|
embeddings = []
|
|
if request.include_embeddings:
|
|
embeddings = await generate_embeddings(request.text)
|
|
|
|
|
|
graph_data = create_enhanced_graph_data(
|
|
analysis_result.get('entities', []),
|
|
analysis_result.get('relationships', [])
|
|
)
|
|
|
|
|
|
er_stats = calculate_er_stats(
|
|
analysis_result.get('entities', []),
|
|
analysis_result.get('relationships', [])
|
|
)
|
|
|
|
|
|
export_files = ExportFiles()
|
|
if request.generate_graph_files:
|
|
export_files = await generate_export_files(
|
|
analysis_id,
|
|
analysis_result.get('entities', []),
|
|
analysis_result.get('relationships', []),
|
|
graph_data,
|
|
request.export_formats
|
|
)
|
|
|
|
processing_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
response_data = {
|
|
"analysis_id": analysis_id,
|
|
"source_text": request.text,
|
|
"source_type": "text_input",
|
|
"language": language,
|
|
"entities": analysis_result.get('entities', []),
|
|
"keywords": analysis_result.get('keywords', []),
|
|
"relationships": analysis_result.get('relationships', []),
|
|
"summary": analysis_result.get('summary', ''),
|
|
"embeddings": embeddings,
|
|
"graph_data": graph_data,
|
|
"export_files": export_files,
|
|
"text_stats": text_stats,
|
|
"er_stats": er_stats,
|
|
"processing_time": processing_time,
|
|
"character_count": text_stats["character_count"],
|
|
"word_count": text_stats["word_count"],
|
|
"sentence_count": text_stats["sentence_count"]
|
|
}
|
|
|
|
|
|
background_tasks.add_task(save_to_database, response_data)
|
|
background_tasks.add_task(save_to_blob, analysis_id, response_data)
|
|
|
|
return NERResponse(
|
|
success=True,
|
|
entity_relationship_stats=er_stats,
|
|
**response_data
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Text analysis failed: {e}")
|
|
return NERResponse(
|
|
success=False,
|
|
analysis_id=analysis_id,
|
|
source_text=request.text[:1000],
|
|
source_type="text_input",
|
|
language="unknown",
|
|
entities=[],
|
|
keywords=[],
|
|
relationships=[],
|
|
summary="",
|
|
graph_data=GraphData(nodes=[], links=[], metadata={}),
|
|
export_files=ExportFiles(),
|
|
processing_time=(datetime.utcnow() - start_time).total_seconds(),
|
|
character_count=0,
|
|
word_count=0,
|
|
sentence_count=0,
|
|
entity_relationship_stats={},
|
|
error=str(e)
|
|
)
|
|
|
|
@app.post("/analyze/file", response_model=NERResponse)
|
|
async def analyze_file(
|
|
file: UploadFile = File(...),
|
|
extract_relationships: bool = Form(True),
|
|
include_embeddings: bool = Form(True),
|
|
include_summary: bool = Form(True),
|
|
generate_graph_files: bool = Form(True),
|
|
export_formats: str = Form("neo4j,json"),
|
|
background_tasks: BackgroundTasks = None
|
|
):
|
|
"""Analyze uploaded file for entities and relationships"""
|
|
start_time = datetime.utcnow()
|
|
analysis_id = f"file_{int(start_time.timestamp())}"
|
|
|
|
if not file.filename:
|
|
raise HTTPException(status_code=400, detail="No filename")
|
|
|
|
try:
|
|
file_content = await file.read()
|
|
if len(file_content) > config.MAX_FILE_SIZE:
|
|
raise HTTPException(status_code=400, detail="File too large")
|
|
|
|
file_ext = Path(file.filename).suffix.lower()
|
|
export_format_list = export_formats.split(',') if export_formats else ["json"]
|
|
|
|
if file_ext in config.SUPPORTED_TEXT_FORMATS:
|
|
text = extract_text_from_file(file_content, file.filename)
|
|
source_type = "text_file"
|
|
elif file_ext in config.SUPPORTED_OCR_FORMATS:
|
|
text = await get_text_from_ocr(file_content, file.filename)
|
|
source_type = "ocr_file"
|
|
else:
|
|
raise HTTPException(status_code=400, detail=f"Unsupported format: {file_ext}")
|
|
|
|
if not text.strip():
|
|
raise HTTPException(status_code=400, detail="No text extracted")
|
|
|
|
language = detect_language(text)
|
|
text_stats = get_text_stats(text)
|
|
|
|
|
|
analysis_result = await analyze_with_deepseek(text, language)
|
|
|
|
|
|
embeddings = []
|
|
if include_embeddings:
|
|
embeddings = await generate_embeddings(text)
|
|
|
|
|
|
graph_data = create_enhanced_graph_data(
|
|
analysis_result.get('entities', []),
|
|
analysis_result.get('relationships', [])
|
|
)
|
|
|
|
|
|
er_stats = calculate_er_stats(
|
|
analysis_result.get('entities', []),
|
|
analysis_result.get('relationships', [])
|
|
)
|
|
|
|
|
|
export_files = ExportFiles()
|
|
if generate_graph_files:
|
|
export_files = await generate_export_files(
|
|
analysis_id,
|
|
analysis_result.get('entities', []),
|
|
analysis_result.get('relationships', []),
|
|
graph_data,
|
|
export_format_list
|
|
)
|
|
|
|
processing_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
response_data = {
|
|
"analysis_id": analysis_id,
|
|
"source_text": text,
|
|
"source_type": source_type,
|
|
"language": language,
|
|
"entities": analysis_result.get('entities', []),
|
|
"keywords": analysis_result.get('keywords', []),
|
|
"relationships": analysis_result.get('relationships', []),
|
|
"summary": analysis_result.get('summary', ''),
|
|
"embeddings": embeddings,
|
|
"graph_data": graph_data,
|
|
"export_files": export_files,
|
|
"text_stats": text_stats,
|
|
"er_stats": er_stats,
|
|
"processing_time": processing_time,
|
|
"character_count": text_stats["character_count"],
|
|
"word_count": text_stats["word_count"],
|
|
"sentence_count": text_stats["sentence_count"]
|
|
}
|
|
|
|
|
|
if background_tasks:
|
|
background_tasks.add_task(save_to_database, response_data)
|
|
background_tasks.add_task(save_to_blob, analysis_id, response_data)
|
|
|
|
return NERResponse(
|
|
success=True,
|
|
entity_relationship_stats=er_stats,
|
|
**response_data
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"File analysis failed: {e}")
|
|
return NERResponse(
|
|
success=False,
|
|
analysis_id=analysis_id,
|
|
source_text="",
|
|
source_type="file_input",
|
|
language="unknown",
|
|
entities=[],
|
|
keywords=[],
|
|
relationships=[],
|
|
summary="",
|
|
graph_data=GraphData(nodes=[], links=[], metadata={}),
|
|
export_files=ExportFiles(),
|
|
processing_time=(datetime.utcnow() - start_time).total_seconds(),
|
|
character_count=0,
|
|
word_count=0,
|
|
sentence_count=0,
|
|
entity_relationship_stats={},
|
|
error=str(e)
|
|
)
|
|
|
|
@app.post("/analyze/url", response_model=NERResponse)
|
|
async def analyze_url(request: NERRequest, background_tasks: BackgroundTasks):
|
|
"""Analyze URL content for entities and relationships"""
|
|
start_time = datetime.utcnow()
|
|
analysis_id = f"url_{int(start_time.timestamp())}"
|
|
|
|
if not request.url:
|
|
raise HTTPException(status_code=400, detail="URL is required")
|
|
|
|
try:
|
|
text = await get_text_from_url(str(request.url))
|
|
|
|
if not text.strip():
|
|
raise HTTPException(status_code=400, detail="No text extracted from URL")
|
|
|
|
language = detect_language(text)
|
|
text_stats = get_text_stats(text)
|
|
|
|
|
|
analysis_result = await analyze_with_deepseek(text, language)
|
|
|
|
|
|
embeddings = []
|
|
if request.include_embeddings:
|
|
embeddings = await generate_embeddings(text)
|
|
|
|
|
|
graph_data = create_enhanced_graph_data(
|
|
analysis_result.get('entities', []),
|
|
analysis_result.get('relationships', [])
|
|
)
|
|
|
|
|
|
er_stats = calculate_er_stats(
|
|
analysis_result.get('entities', []),
|
|
analysis_result.get('relationships', [])
|
|
)
|
|
|
|
|
|
export_files = ExportFiles()
|
|
if request.generate_graph_files:
|
|
export_files = await generate_export_files(
|
|
analysis_id,
|
|
analysis_result.get('entities', []),
|
|
analysis_result.get('relationships', []),
|
|
graph_data,
|
|
request.export_formats
|
|
)
|
|
|
|
processing_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
response_data = {
|
|
"analysis_id": analysis_id,
|
|
"source_text": text,
|
|
"source_type": "url_content",
|
|
"language": language,
|
|
"entities": analysis_result.get('entities', []),
|
|
"keywords": analysis_result.get('keywords', []),
|
|
"relationships": analysis_result.get('relationships', []),
|
|
"summary": analysis_result.get('summary', ''),
|
|
"embeddings": embeddings,
|
|
"graph_data": graph_data,
|
|
"export_files": export_files,
|
|
"text_stats": text_stats,
|
|
"er_stats": er_stats,
|
|
"processing_time": processing_time,
|
|
"character_count": text_stats["character_count"],
|
|
"word_count": text_stats["word_count"],
|
|
"sentence_count": text_stats["sentence_count"]
|
|
}
|
|
|
|
|
|
background_tasks.add_task(save_to_database, response_data)
|
|
background_tasks.add_task(save_to_blob, analysis_id, response_data)
|
|
|
|
return NERResponse(
|
|
success=True,
|
|
entity_relationship_stats=er_stats,
|
|
**response_data
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"URL analysis failed: {e}")
|
|
return NERResponse(
|
|
success=False,
|
|
analysis_id=analysis_id,
|
|
source_text="",
|
|
source_type="url_content",
|
|
language="unknown",
|
|
entities=[],
|
|
keywords=[],
|
|
relationships=[],
|
|
summary="",
|
|
graph_data=GraphData(nodes=[], links=[], metadata={}),
|
|
export_files=ExportFiles(),
|
|
processing_time=(datetime.utcnow() - start_time).total_seconds(),
|
|
character_count=0,
|
|
word_count=0,
|
|
sentence_count=0,
|
|
entity_relationship_stats={},
|
|
error=str(e)
|
|
)
|
|
|
|
@app.get("/download/{analysis_id}/{file_type}")
|
|
async def download_export_file(analysis_id: str, file_type: str):
|
|
"""Download specific export file for an analysis"""
|
|
try:
|
|
analysis_dir = EXPORT_DIR / analysis_id
|
|
|
|
if not analysis_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Analysis {analysis_id} not found")
|
|
|
|
file_mapping = {
|
|
"neo4j_nodes": "neo4j_nodes.csv",
|
|
"neo4j_relationships": "neo4j_relationships.csv",
|
|
"json": "analysis_export.json",
|
|
"graphml": "graph_export.graphml"
|
|
}
|
|
|
|
if file_type not in file_mapping:
|
|
raise HTTPException(status_code=400, detail=f"Invalid file type: {file_type}")
|
|
|
|
file_path = analysis_dir / file_mapping[file_type]
|
|
|
|
if not file_path.exists():
|
|
raise HTTPException(status_code=404, detail=f"File {file_type} not found")
|
|
|
|
return FileResponse(path=file_path, filename=file_mapping[file_type])
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Download failed for {analysis_id}/{file_type}: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Download failed: {str(e)}")
|
|
|
|
@app.get("/entity-types")
|
|
async def get_entity_types():
|
|
"""Get all supported entity types"""
|
|
return {
|
|
"success": True,
|
|
"entity_types": config.ENTITY_TYPES,
|
|
"total_count": len(config.ENTITY_TYPES)
|
|
}
|
|
|
|
@app.get("/relationship-types")
|
|
async def get_relationship_types():
|
|
"""Get all supported relationship types"""
|
|
return {
|
|
"success": True,
|
|
"relationship_types": config.RELATIONSHIP_TYPES,
|
|
"total_count": len(config.RELATIONSHIP_TYPES)
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
print("🔧 Loading enhanced NER configuration...")
|
|
print(f"🌐 Will start server on {config.HOST}:{config.PORT}")
|
|
print(f"🏷️ Enhanced with {len(config.ENTITY_TYPES)} entity types")
|
|
print(f"🔗 Enhanced with {len(config.RELATIONSHIP_TYPES)} relationship types")
|
|
|
|
uvicorn.run(
|
|
"ner_service:app",
|
|
host=config.HOST,
|
|
port=config.PORT,
|
|
reload=config.DEBUG,
|
|
log_level="info"
|
|
) |