SB-PoC / service /ner_service.py
Chirapath's picture
Upload 3 files
1628024 verified
#!/usr/bin/env python3
"""
Enhanced NER Analysis Service - Cleaned and Optimized
Advanced Named Entity Recognition with Thai language support,
relationship extraction, and graph database exports
"""
import os
import io
import json
import logging
import re
import csv
import tempfile
import zipfile
from datetime import datetime
from typing import Optional, List, Dict, Any, Union, Tuple
from pathlib import Path
from contextlib import asynccontextmanager
from collections import defaultdict
import xml.etree.ElementTree as ET
import httpx
import asyncpg
from azure.storage.blob import BlobServiceClient
from azure.core.credentials import AzureKeyCredential
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel, HttpUrl, field_validator
import uvicorn
import docx
from azure.ai.inference import ChatCompletionsClient
from azure.ai.inference.models import SystemMessage, UserMessage
from openai import AzureOpenAI
# Import unified configuration
try:
from configs import get_config
config = get_config().ner
unified_config = get_config()
print("✅ Using unified configuration")
except ImportError:
print("⚠️ Unified config not available, using fallback configuration")
# Fallback configuration
from dotenv import load_dotenv
load_dotenv()
class FallbackConfig:
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("NER_PORT", "8500"))
DEBUG = os.getenv("DEBUG", "False").lower() == "true"
# Database
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "")
POSTGRES_PORT = int(os.getenv("POSTGRES_PORT", "5432"))
POSTGRES_USER = os.getenv("POSTGRES_USER", "")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "")
POSTGRES_DATABASE = os.getenv("POSTGRES_DATABASE", "postgres")
# APIs
OCR_SERVICE_URL = os.getenv("OCR_SERVICE_URL", "http://localhost:8400")
DEEPSEEK_ENDPOINT = os.getenv("DEEPSEEK_ENDPOINT", "")
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY", "")
DEEPSEEK_MODEL = os.getenv("DEEPSEEK_MODEL", "DeepSeek-R1-0528")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "")
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY", "")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-large")
# Storage
AZURE_STORAGE_ACCOUNT_URL = os.getenv("AZURE_STORAGE_ACCOUNT_URL", "")
AZURE_BLOB_SAS_TOKEN = os.getenv("AZURE_BLOB_SAS_TOKEN", "")
BLOB_CONTAINER = os.getenv("BLOB_CONTAINER", "historylog")
# Limits
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
MAX_TEXT_LENGTH = 100000 # 100KB
SUPPORTED_TEXT_FORMATS = {'.txt', '.doc', '.docx', '.rtf'}
SUPPORTED_OCR_FORMATS = {'.pdf', '.jpg', '.jpeg', '.png', '.tiff', '.bmp', '.gif'}
ENTITY_TYPES = [
"PERSON", "ORGANIZATION", "LOCATION", "DATE", "TIME", "MONEY", "PRODUCT", "EVENT",
"VEHICLE", "SUSPICIOUS_OBJECT", "ILLEGAL_ACTIVITY", "EVIDENCE", "ILLEGAL_ITEM",
"WEAPON", "DRUG", "CHEMICAL", "DOCUMENT", "PHONE_NUMBER", "ADDRESS", "EMAIL"
]
RELATIONSHIP_TYPES = [
"works_for", "founded", "located_in", "part_of", "associated_with", "owns", "manages",
"ทำงานที่", "ก่อตั้ง", "ตั้งอยู่ที่", "เกี่ยวข้องกับ", "เป็นเจ้าของ",
"arrested_by", "investigated_by", "confiscated_from", "used_in", "evidence_of",
"จับกุมโดย", "สอบสวนโดย", "ยึดจาก", "หลักฐานของ"
]
config = FallbackConfig()
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Export directories
EXPORT_DIR = Path("exports")
EXPORT_DIR.mkdir(exist_ok=True)
# Global variables
pg_pool = None
vector_available = False
clients = {}
# Pydantic Models
class NERRequest(BaseModel):
text: Optional[str] = None
url: Optional[HttpUrl] = None
extract_relationships: bool = True
include_embeddings: bool = True
include_summary: bool = True
generate_graph_files: bool = True
export_formats: List[str] = ["neo4j", "json", "graphml"]
@field_validator('text')
@classmethod
def validate_text_length(cls, v):
if v and len(v) > config.MAX_TEXT_LENGTH:
raise ValueError(f"Text too long (max {config.MAX_TEXT_LENGTH} characters)")
return v
class MultiInputRequest(BaseModel):
texts: Optional[List[str]] = None
urls: Optional[List[HttpUrl]] = None
extract_relationships: bool = True
include_embeddings: bool = True
include_summary: bool = True
combine_results: bool = True
generate_graph_files: bool = True
export_formats: List[str] = ["neo4j", "json", "graphml"]
class EntityResult(BaseModel):
id: str
text: str
label: str
confidence: float
start_pos: int
end_pos: int
source_type: Optional[str] = None
source_index: Optional[int] = None
frequency: int = 1
importance_score: float = 0.0
metadata: Optional[Dict[str, Any]] = None
class RelationshipResult(BaseModel):
id: str
source_entity_id: str
target_entity_id: str
source_entity: str
target_entity: str
relationship_type: str
confidence: float
strength: float
context: str
evidence_count: int = 1
bidirectional: bool = False
metadata: Optional[Dict[str, Any]] = None
class NodeResult(BaseModel):
id: str
label: str
type: str
confidence: float
frequency: int = 1
importance_score: float = 0.0
properties: Dict[str, Any]
class LinkResult(BaseModel):
id: str
source: str
target: str
relationship: str
confidence: float
strength: float
evidence_count: int = 1
properties: Dict[str, Any]
class GraphData(BaseModel):
nodes: List[NodeResult]
links: List[LinkResult]
metadata: Dict[str, Any]
class ExportFiles(BaseModel):
neo4j_nodes: Optional[str] = None
neo4j_relationships: Optional[str] = None
json_export: Optional[str] = None
graphml_export: Optional[str] = None
csv_nodes: Optional[str] = None
csv_edges: Optional[str] = None
gexf_export: Optional[str] = None
analysis_report: Optional[str] = None
download_bundle: Optional[str] = None
class NERResponse(BaseModel):
success: bool
analysis_id: str
source_text: str
source_type: str
language: str
entities: List[EntityResult]
keywords: List[str]
relationships: List[RelationshipResult]
summary: str
embeddings: Optional[List[float]] = None
graph_data: GraphData
export_files: ExportFiles
processing_time: float
character_count: int
word_count: int
sentence_count: int
entity_relationship_stats: Dict[str, Any]
error: Optional[str] = None
class MultiNERResponse(BaseModel):
success: bool
analysis_id: str
combined_analysis: NERResponse
individual_analyses: List[NERResponse]
processing_time: float
total_sources: int
error: Optional[str] = None
# Utility Functions
def generate_unique_id(prefix: str = "item") -> str:
"""Generate unique ID with timestamp"""
return f"{prefix}_{int(datetime.utcnow().timestamp() * 1000)}"
def normalize_text(text: str) -> str:
"""Normalize text for comparison"""
return re.sub(r'\s+', ' ', text.strip().lower())
def calculate_text_similarity(text1: str, text2: str) -> float:
"""Calculate basic text similarity"""
norm1 = normalize_text(text1)
norm2 = normalize_text(text2)
if norm1 == norm2:
return 1.0
words1 = set(norm1.split())
words2 = set(norm2.split())
if not words1 and not words2:
return 1.0
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union) if union else 0.0
def deduplicate_entities(entities: List[Dict[str, Any]], similarity_threshold: float = 0.8) -> List[Dict[str, Any]]:
"""Remove duplicate entities based on text similarity"""
if not entities:
return []
deduplicated = []
processed_texts = set()
for entity in entities:
entity_text = entity.get('text', '').strip()
normalized_text = normalize_text(entity_text)
if not entity_text or normalized_text in processed_texts:
continue
is_duplicate = False
for existing_entity in deduplicated:
existing_text = existing_entity.get('text', '')
similarity = calculate_text_similarity(entity_text, existing_text)
if similarity >= similarity_threshold:
if entity.get('confidence', 0) > existing_entity.get('confidence', 0):
deduplicated.remove(existing_entity)
break
else:
is_duplicate = True
break
if not is_duplicate:
entity['id'] = entity.get('id', generate_unique_id('ent'))
deduplicated.append(entity)
processed_texts.add(normalized_text)
return deduplicated
def detect_language(text: str) -> str:
"""Enhanced language detection"""
if not text:
return "en"
thai_chars = len(re.findall(r'[ก-๙]', text))
english_chars = len(re.findall(r'[a-zA-Z]', text))
total_chars = thai_chars + english_chars
if total_chars == 0:
return "en"
thai_ratio = thai_chars / total_chars
if thai_ratio > 0.3:
return "th"
elif thai_ratio > 0.1:
return "mixed"
else:
return "en"
def get_text_stats(text: str) -> Dict[str, int]:
"""Get comprehensive text statistics"""
return {
"character_count": len(text),
"word_count": len(text.split()),
"sentence_count": len(re.findall(r'[.!?]+', text)),
"paragraph_count": len([p for p in text.split('\n\n') if p.strip()]),
"line_count": len(text.split('\n'))
}
# Client Management
def get_blob_client():
if clients.get('blob') is None and config.AZURE_STORAGE_ACCOUNT_URL and config.AZURE_BLOB_SAS_TOKEN:
try:
clients['blob'] = BlobServiceClient(
account_url=config.AZURE_STORAGE_ACCOUNT_URL,
credential=config.AZURE_BLOB_SAS_TOKEN
)
except Exception as e:
logger.error(f"Failed to initialize blob client: {e}")
return clients.get('blob')
def get_deepseek_client():
if clients.get('deepseek') is None and config.DEEPSEEK_ENDPOINT and config.DEEPSEEK_API_KEY:
try:
clients['deepseek'] = ChatCompletionsClient(
endpoint=config.DEEPSEEK_ENDPOINT,
credential=AzureKeyCredential(config.DEEPSEEK_API_KEY),
api_version="2024-05-01-preview"
)
except Exception as e:
logger.error(f"Failed to initialize DeepSeek client: {e}")
return clients.get('deepseek')
def get_openai_client():
if clients.get('openai') is None and config.AZURE_OPENAI_ENDPOINT and config.AZURE_OPENAI_API_KEY:
try:
clients['openai'] = AzureOpenAI(
api_version="2024-12-01-preview",
azure_endpoint=config.AZURE_OPENAI_ENDPOINT,
api_key=config.AZURE_OPENAI_API_KEY
)
except Exception as e:
logger.error(f"Failed to initialize OpenAI client: {e}")
return clients.get('openai')
# Database Operations
async def init_database():
global pg_pool, vector_available
logger.info("🔄 Connecting to database...")
try:
pg_pool = await asyncpg.create_pool(
host=config.POSTGRES_HOST,
port=config.POSTGRES_PORT,
user=config.POSTGRES_USER,
password=config.POSTGRES_PASSWORD,
database=config.POSTGRES_DATABASE,
ssl='require',
min_size=2,
max_size=10,
command_timeout=60
)
async with pg_pool.acquire() as conn:
logger.info("✅ Database connected")
# Check vector extension
try:
await conn.execute("CREATE EXTENSION IF NOT EXISTS vector;")
await conn.fetchval("SELECT '[1,2,3]'::vector(3)")
vector_available = True
logger.info("✅ Vector extension available")
except:
vector_available = False
logger.info("⚠️ Vector extension not available (using JSONB)")
# Create tables
await create_tables(conn)
logger.info("✅ Database setup complete")
return True
except Exception as e:
logger.error(f"❌ Database init failed: {e}")
return False
async def create_tables(conn):
"""Create enhanced database tables for ER model"""
await conn.execute("""
CREATE TABLE IF NOT EXISTS ner_analyses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
analysis_id VARCHAR(255) UNIQUE NOT NULL,
source_text TEXT NOT NULL,
source_type VARCHAR(50) NOT NULL,
language VARCHAR(10) DEFAULT 'en',
entities JSONB NOT NULL DEFAULT '[]',
keywords JSONB NOT NULL DEFAULT '[]',
relationships JSONB NOT NULL DEFAULT '[]',
summary TEXT DEFAULT '',
embeddings JSONB DEFAULT '[]',
graph_data JSONB DEFAULT '{}',
export_files JSONB DEFAULT '{}',
text_stats JSONB DEFAULT '{}',
er_stats JSONB DEFAULT '{}',
processing_time FLOAT DEFAULT 0,
entity_types JSONB DEFAULT '[]',
relationship_types JSONB DEFAULT '[]',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
await conn.execute("""
CREATE TABLE IF NOT EXISTS entities (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity_id VARCHAR(255) NOT NULL,
analysis_id VARCHAR(255) NOT NULL,
text VARCHAR(1000) NOT NULL,
label VARCHAR(100) NOT NULL,
confidence FLOAT DEFAULT 0,
start_pos INTEGER DEFAULT 0,
end_pos INTEGER DEFAULT 0,
frequency INTEGER DEFAULT 1,
importance_score FLOAT DEFAULT 0,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (analysis_id) REFERENCES ner_analyses(analysis_id) ON DELETE CASCADE
);
""")
await conn.execute("""
CREATE TABLE IF NOT EXISTS relationships (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
relationship_id VARCHAR(255) NOT NULL,
analysis_id VARCHAR(255) NOT NULL,
source_entity_id VARCHAR(255) NOT NULL,
target_entity_id VARCHAR(255) NOT NULL,
source_entity VARCHAR(1000) NOT NULL,
target_entity VARCHAR(1000) NOT NULL,
relationship_type VARCHAR(200) NOT NULL,
confidence FLOAT DEFAULT 0,
strength FLOAT DEFAULT 0,
context TEXT DEFAULT '',
evidence_count INTEGER DEFAULT 1,
bidirectional BOOLEAN DEFAULT FALSE,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (analysis_id) REFERENCES ner_analyses(analysis_id) ON DELETE CASCADE
);
""")
# Create indexes
try:
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_analysis_id ON ner_analyses(analysis_id);
CREATE INDEX IF NOT EXISTS idx_entities_analysis ON entities(analysis_id);
CREATE INDEX IF NOT EXISTS idx_relationships_analysis ON relationships(analysis_id);
""")
except:
pass
# Text Extraction
def extract_text_from_file(file_content: bytes, filename: str) -> str:
file_ext = Path(filename).suffix.lower()
if file_ext == '.txt':
return file_content.decode('utf-8', errors='ignore')
elif file_ext == '.docx':
doc = docx.Document(io.BytesIO(file_content))
return '\n'.join([p.text for p in doc.paragraphs])
else:
return file_content.decode('utf-8', errors='ignore')
async def get_text_from_ocr(file_content: bytes, filename: str) -> str:
try:
async with httpx.AsyncClient(timeout=300) as client:
files = {'file': (filename, file_content)}
response = await client.post(f"{config.OCR_SERVICE_URL}/ocr/upload", files=files)
if response.status_code == 200:
return response.json().get('content', '')
except Exception as e:
logger.error(f"OCR service error: {e}")
pass
raise HTTPException(status_code=500, detail="OCR processing failed")
async def get_text_from_url(url: str) -> str:
try:
async with httpx.AsyncClient(timeout=300) as client:
response = await client.post(f"{config.OCR_SERVICE_URL}/ocr/url",
json={"url": str(url), "extract_images": True})
if response.status_code == 200:
return response.json().get('content', '')
except Exception as e:
logger.error(f"URL processing error: {e}")
pass
raise HTTPException(status_code=500, detail="URL processing failed")
# Enhanced NER and Relationship Analysis
async def analyze_with_deepseek(text: str, language: str = None) -> Dict[str, Any]:
"""Enhanced analysis with improved relationship extraction"""
deepseek_client = get_deepseek_client()
if not deepseek_client:
logger.warning("DeepSeek not configured, using manual extraction")
return extract_manual_entities_and_relationships(text, language)
try:
if not language:
language = detect_language(text)
if language == "th":
system_prompt = """คุณเป็นผู้เชี่ยวชาญในการจดจำนามเอกลักษณ์และการสกัดความสัมพันธ์สำหรับภาษาไทย
วิเคราะห์ข้อความและสกัดข้อมูลดังนี้:
1. นามเอกลักษณ์ทุกประเภท (บุคคล องค์กร สถานที่ วันที่ เวลา เงิน ฯลฯ)
2. ความสัมพันธ์ระหว่างนามเอกลักษณ์ - ต้องสกัดทุกความสัมพันธ์ที่พบ
3. คำหลักสำคัญจากข้อความ
4. สรุปที่ครอบคลุม
ให้ผลลัพธ์เป็น JSON:
{
"entities": [{"text": "ข้อความ", "label": "ประเภท", "confidence": 0.95, "start_pos": 0, "end_pos": 10}],
"keywords": ["คำหลัก1", "คำหลัก2"],
"relationships": [{"source_entity": "A", "target_entity": "B", "relationship_type": "ประเภท", "confidence": 0.9, "context": "บริบท"}],
"summary": "สรุป"
}"""
else:
system_prompt = """You are an expert in Named Entity Recognition and relationship extraction.
Analyze the text and extract:
1. All named entities (people, organizations, locations, dates, money, etc.)
2. ALL relationships between entities - extract every relationship found
3. Important keywords from the text
4. Comprehensive summary
Return ONLY valid JSON:
{
"entities": [{"text": "entity text", "label": "TYPE", "confidence": 0.95, "start_pos": 0, "end_pos": 10}],
"keywords": ["keyword1", "keyword2"],
"relationships": [{"source_entity": "Entity A", "target_entity": "Entity B", "relationship_type": "relationship_type", "confidence": 0.9, "context": "context"}],
"summary": "Comprehensive summary"
}"""
user_prompt = f"วิเคราะห์ข้อความนี้:\n\n{text[:8000]}" if language == "th" else f"Analyze this text:\n\n{text[:8000]}"
response = deepseek_client.complete(
messages=[
SystemMessage(content=system_prompt),
UserMessage(content=user_prompt)
],
max_tokens=6000,
model=config.DEEPSEEK_MODEL,
temperature=0.1
)
result_text = response.choices[0].message.content.strip()
# Extract JSON from response
start_idx = result_text.find('{')
end_idx = result_text.rfind('}') + 1
if start_idx != -1 and end_idx > start_idx:
json_text = result_text[start_idx:end_idx]
try:
json_result = json.loads(json_text)
logger.info("✅ Successfully parsed JSON from DeepSeek")
except:
try:
fixed_json = json_text.replace("'", '"').replace('True', 'true').replace('False', 'false')
json_result = json.loads(fixed_json)
logger.info("✅ Successfully parsed fixed JSON")
except:
json_result = None
else:
json_result = None
if json_result:
entities = deduplicate_entities(json_result.get('entities', []))
keywords = json_result.get('keywords', [])
relationships = json_result.get('relationships', [])
summary = json_result.get('summary', '')
# Ensure relationships are extracted
if len(relationships) == 0 and len(entities) >= 2:
logger.warning("No relationships found by DeepSeek, applying rule-based extraction")
rule_based_relationships = extract_rule_based_relationships(entities, text, language)
relationships.extend(rule_based_relationships)
# Enhance relationships with IDs
for rel in relationships:
if 'id' not in rel:
rel['id'] = generate_unique_id('rel')
if 'strength' not in rel:
rel['strength'] = rel.get('confidence', 0.8)
if 'evidence_count' not in rel:
rel['evidence_count'] = 1
if 'bidirectional' not in rel:
rel['bidirectional'] = False
return {
"entities": entities,
"keywords": keywords[:20],
"relationships": relationships,
"summary": summary or f"Analysis of {len(text)} characters"
}
logger.warning("JSON parsing failed, using manual extraction")
return extract_manual_entities_and_relationships(text, language)
except Exception as e:
logger.error(f"DeepSeek analysis error: {e}")
return extract_manual_entities_and_relationships(text, language)
def extract_rule_based_relationships(entities: List[Dict], text: str, language: str) -> List[Dict]:
"""Extract relationships using rule-based approach"""
relationships = []
if len(entities) < 2:
return relationships
# Define relationship patterns
if language == "th":
patterns = [
(r'(.+?)\s*ทำงาน(?:ที่|ใน|กับ)\s*(.+)', 'ทำงานที่'),
(r'(.+?)\s*เป็น(?:เจ้าของ|ของ)\s*(.+)', 'เป็นเจ้าของ'),
(r'(.+?)\s*ตั้งอยู่(?:ที่|ใน)\s*(.+)', 'ตั้งอยู่ที่'),
(r'(.+?)\s*(?:จับกุม|จับ)\s*(.+)', 'จับกุมโดย'),
]
else:
patterns = [
(r'(.+?)\s*(?:works?\s+(?:for|at|in)|employed\s+by)\s*(.+)', 'works_for'),
(r'(.+?)\s*(?:owns?|possesses?)\s*(.+)', 'owns'),
(r'(.+?)\s*(?:located\s+(?:in|at)|based\s+in)\s*(.+)', 'located_in'),
(r'(.+?)\s*(?:arrested\s+by|detained\s+by)\s*(.+)', 'arrested_by'),
]
for pattern, rel_type in patterns:
for match in re.finditer(pattern, text, re.IGNORECASE | re.UNICODE):
source_text = match.group(1).strip()
target_text = match.group(2).strip()
source_entity = find_best_entity_match(source_text, entities)
target_entity = find_best_entity_match(target_text, entities)
if source_entity and target_entity and source_entity != target_entity:
relationship = {
'id': generate_unique_id('rel'),
'source_entity': source_entity['text'],
'target_entity': target_entity['text'],
'relationship_type': rel_type,
'confidence': 0.7,
'strength': 0.7,
'context': match.group(0),
'evidence_count': 1,
'bidirectional': False,
'metadata': {'extraction_method': 'rule_based'}
}
relationships.append(relationship)
return relationships
def find_best_entity_match(text: str, entities: List[Dict]) -> Optional[Dict]:
"""Find the best matching entity for given text"""
text_norm = normalize_text(text)
for entity in entities:
if normalize_text(entity['text']) == text_norm:
return entity
best_match = None
best_score = 0
for entity in entities:
score = calculate_text_similarity(text, entity['text'])
if score > best_score and score > 0.6:
best_score = score
best_match = entity
return best_match
def extract_manual_entities_and_relationships(text: str, language: str = None) -> Dict[str, Any]:
"""Enhanced manual extraction with relationship detection"""
if not language:
language = detect_language(text)
entities = []
keywords = []
# Enhanced patterns for different languages
if language == "th":
patterns = {
'PERSON': [r'(?:คุณ|นาย|นาง|นางสาว|ดร\.?)\s*[ก-๙\w\s]+'],
'ORGANIZATION': [r'บริษัท\s+[ก-๙\w\s]+(?:จำกัด|มหาชน)', r'สถานีตำรวจ[ก-๙\w\s]+'],
'LOCATION': [r'จังหวัด[ก-๙\w\s]+', r'กรุงเทพมหานคร|กรุงเทพฯ?'],
'MONEY': [r'\d+(?:,\d{3})*\s*(?:บาท|ล้านบาท|พันบาท)'],
'DATE': [r'\d{1,2}\/\d{1,2}\/\d{4}'],
}
words = re.findall(r'[ก-๙]+', text)
thai_stop_words = {'และ', 'หรือ', 'แต่', 'ใน', 'ที่', 'เพื่อ', 'กับ', 'จาก', 'โดย', 'ของ'}
keywords = [word for word in words if word not in thai_stop_words and len(word) > 2]
else:
patterns = {
'PERSON': [r'\b(?:Mr|Mrs|Ms|Dr|Prof)\.\s+[A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+)*'],
'ORGANIZATION': [r'\b[A-Z][a-zA-Z]+\s+(?:Inc|Corp|Company|Ltd|Co|LLC|Corporation|Limited|University)\b'],
'LOCATION': [r'\b(?:New York|Los Angeles|Chicago|Bangkok|London|Paris|Berlin)\b'],
'MONEY': [r'\$[\d,]+\.?\d*', r'\b\d+(?:,\d{3})*\s*(?:dollars?|USD|million|billion)\b'],
'DATE': [r'\b\d{1,2}\/\d{1,2}\/\d{4}\b'],
}
words = re.findall(r'\b[a-zA-Z]{3,}\b', text)
english_stop_words = {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
keywords = [word.lower() for word in words if word.lower() not in english_stop_words]
# Extract entities
for label, pattern_list in patterns.items():
for pattern in pattern_list:
for match in re.finditer(pattern, text, re.UNICODE | re.IGNORECASE):
entity_text = match.group().strip()
if len(entity_text) > 1:
entities.append({
"id": generate_unique_id('ent'),
"text": entity_text,
"label": label,
"confidence": 0.8,
"start_pos": match.start(),
"end_pos": match.end(),
"frequency": 1,
"importance_score": 0.7,
"metadata": {"source": "manual_extraction"}
})
# Deduplicate
entities = deduplicate_entities(entities)
keywords = list(set(keywords))[:20]
# Extract relationships
relationships = []
if len(entities) >= 2:
relationships = extract_rule_based_relationships(entities, text, language)
summary = f"Analysis of {len(text)} characters found {len(entities)} entities and {len(relationships)} relationships"
return {
"entities": entities,
"keywords": keywords,
"relationships": relationships,
"summary": summary
}
async def generate_embeddings(text: str) -> List[float]:
openai_client = get_openai_client()
if not openai_client:
return []
try:
response = openai_client.embeddings.create(
input=[text[:8000]],
model=config.EMBEDDING_MODEL,
dimensions=1536
)
return response.data[0].embedding
except Exception as e:
logger.error(f"Embedding failed: {e}")
return []
def create_enhanced_graph_data(entities: List[Dict], relationships: List[Dict]) -> GraphData:
"""Create enhanced graph data with comprehensive ER model"""
nodes = []
links = []
entity_map = {}
# Create nodes
for entity in entities:
node_id = entity.get('id', generate_unique_id('ent'))
entity_map[entity['text']] = node_id
node_properties = {
"original_text": entity['text'],
"entity_type": entity['label'],
"confidence": entity.get('confidence', 0.0),
"start_position": entity.get('start_pos', 0),
"end_position": entity.get('end_pos', 0),
"frequency": entity.get('frequency', 1),
"importance_score": entity.get('importance_score', 0.0),
"metadata": entity.get('metadata', {})
}
nodes.append(NodeResult(
id=node_id,
label=entity['text'],
type=entity['label'],
confidence=entity.get('confidence', 0.0),
frequency=entity.get('frequency', 1),
importance_score=entity.get('importance_score', 0.0),
properties=node_properties
))
# Create links
for rel in relationships:
source_id = entity_map.get(rel['source_entity'])
target_id = entity_map.get(rel['target_entity'])
if source_id and target_id:
link_id = rel.get('id', generate_unique_id('link'))
link_properties = {
"relationship_type": rel['relationship_type'],
"confidence": rel.get('confidence', 0.0),
"strength": rel.get('strength', rel.get('confidence', 0.0)),
"context": rel.get('context', ''),
"evidence_count": rel.get('evidence_count', 1),
"bidirectional": rel.get('bidirectional', False),
"metadata": rel.get('metadata', {})
}
links.append(LinkResult(
id=link_id,
source=source_id,
target=target_id,
relationship=rel['relationship_type'],
confidence=rel.get('confidence', 0.0),
strength=rel.get('strength', rel.get('confidence', 0.0)),
evidence_count=rel.get('evidence_count', 1),
properties=link_properties
))
# Calculate metadata
entity_types = defaultdict(int)
relationship_types = defaultdict(int)
for entity in entities:
entity_types[entity['label']] += 1
for rel in relationships:
relationship_types[rel['relationship_type']] += 1
metadata = {
"total_entities": len(entities),
"total_relationships": len(relationships),
"entity_type_distribution": dict(entity_types),
"relationship_type_distribution": dict(relationship_types),
"graph_density": len(relationships) / (len(entities) * (len(entities) - 1) / 2) if len(entities) > 1 else 0,
"average_entity_confidence": sum(entity.get('confidence', 0) for entity in entities) / len(entities) if entities else 0,
"average_relationship_confidence": sum(rel.get('confidence', 0) for rel in relationships) / len(relationships) if relationships else 0,
"unique_entity_types": len(entity_types),
"unique_relationship_types": len(relationship_types)
}
return GraphData(
nodes=nodes,
links=links,
metadata=metadata
)
# Export Functions (simplified)
async def generate_export_files(analysis_id: str, entities: List[Dict], relationships: List[Dict],
graph_data: GraphData, formats: List[str]) -> ExportFiles:
"""Generate export files for various formats"""
export_files = ExportFiles()
analysis_dir = EXPORT_DIR / analysis_id
analysis_dir.mkdir(exist_ok=True)
try:
if "neo4j" in formats:
nodes_file, rels_file = await generate_neo4j_csv(analysis_dir, entities, relationships)
export_files.neo4j_nodes = str(nodes_file)
export_files.neo4j_relationships = str(rels_file)
if "json" in formats:
json_file = await generate_json_export(analysis_dir, entities, relationships, graph_data)
export_files.json_export = str(json_file)
if "graphml" in formats:
graphml_file = await generate_graphml_export(analysis_dir, entities, relationships)
export_files.graphml_export = str(graphml_file)
logger.info(f"✅ Generated export files for analysis {analysis_id}")
except Exception as e:
logger.error(f"❌ Export file generation failed: {e}")
return export_files
async def generate_neo4j_csv(export_dir: Path, entities: List[Dict], relationships: List[Dict]) -> Tuple[Path, Path]:
"""Generate Neo4j compatible CSV files"""
nodes_file = export_dir / "neo4j_nodes.csv"
with open(nodes_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
'nodeId:ID', 'text', 'label:LABEL', 'confidence:float',
'frequency:int', 'importance:float'
])
for entity in entities:
writer.writerow([
entity.get('id', generate_unique_id('ent')),
entity['text'],
entity['label'],
entity.get('confidence', 0.0),
entity.get('frequency', 1),
entity.get('importance_score', 0.0)
])
rels_file = export_dir / "neo4j_relationships.csv"
entity_map = {entity['text']: entity.get('id', generate_unique_id('ent')) for entity in entities}
with open(rels_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
':START_ID', ':END_ID', ':TYPE', 'confidence:float',
'strength:float', 'context'
])
for rel in relationships:
source_id = entity_map.get(rel['source_entity'])
target_id = entity_map.get(rel['target_entity'])
if source_id and target_id:
writer.writerow([
source_id,
target_id,
rel['relationship_type'].upper().replace(' ', '_'),
rel.get('confidence', 0.0),
rel.get('strength', rel.get('confidence', 0.0)),
rel.get('context', '')
])
return nodes_file, rels_file
async def generate_json_export(export_dir: Path, entities: List[Dict], relationships: List[Dict], graph_data: GraphData) -> Path:
"""Generate comprehensive JSON export"""
json_file = export_dir / "analysis_export.json"
export_data = {
"metadata": {
"export_timestamp": datetime.utcnow().isoformat(),
"format_version": "1.0",
"total_entities": len(entities),
"total_relationships": len(relationships)
},
"entities": entities,
"relationships": relationships,
"graph_data": graph_data.dict(),
"statistics": {
"entity_types": list(set(e['label'] for e in entities)),
"relationship_types": list(set(r['relationship_type'] for r in relationships)),
"average_confidence": sum(e.get('confidence', 0) for e in entities) / len(entities) if entities else 0
}
}
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
return json_file
async def generate_graphml_export(export_dir: Path, entities: List[Dict], relationships: List[Dict]) -> Path:
"""Generate GraphML format"""
graphml_file = export_dir / "graph_export.graphml"
# Create GraphML structure
root = ET.Element('graphml')
root.set('xmlns', 'http://graphml.graphdrawing.org/xmlns')
# Define attributes
ET.SubElement(root, 'key', id='label', **{'for': 'node', 'attr.name': 'label', 'attr.type': 'string'})
ET.SubElement(root, 'key', id='type', **{'for': 'node', 'attr.name': 'type', 'attr.type': 'string'})
ET.SubElement(root, 'key', id='rel_type', **{'for': 'edge', 'attr.name': 'relationship', 'attr.type': 'string'})
graph = ET.SubElement(root, 'graph', id='G', edgedefault='directed')
# Add nodes
entity_map = {}
for entity in entities:
node_id = entity.get('id', generate_unique_id('ent'))
entity_map[entity['text']] = node_id
node = ET.SubElement(graph, 'node', id=node_id)
label_data = ET.SubElement(node, 'data', key='label')
label_data.text = entity['text']
type_data = ET.SubElement(node, 'data', key='type')
type_data.text = entity['label']
# Add edges
for i, rel in enumerate(relationships):
source_id = entity_map.get(rel['source_entity'])
target_id = entity_map.get(rel['target_entity'])
if source_id and target_id:
edge = ET.SubElement(graph, 'edge', id=f"e{i}", source=source_id, target=target_id)
rel_data = ET.SubElement(edge, 'data', key='rel_type')
rel_data.text = rel['relationship_type']
# Write to file
tree = ET.ElementTree(root)
tree.write(graphml_file, encoding='utf-8', xml_declaration=True)
return graphml_file
def calculate_er_stats(entities: List[Dict], relationships: List[Dict]) -> Dict[str, Any]:
"""Calculate Entity-Relationship statistics"""
if not entities:
return {}
entity_types = defaultdict(int)
relationship_types = defaultdict(int)
for entity in entities:
entity_types[entity['label']] += 1
for rel in relationships:
relationship_types[rel['relationship_type']] += 1
return {
"total_entities": len(entities),
"total_relationships": len(relationships),
"entity_type_distribution": dict(entity_types),
"relationship_type_distribution": dict(relationship_types),
"graph_density": len(relationships) / (len(entities) * (len(entities) - 1) / 2) if len(entities) > 1 else 0,
"unique_entity_types": len(entity_types),
"unique_relationship_types": len(relationship_types)
}
async def save_to_database(data: Dict[str, Any]) -> bool:
if not pg_pool:
logger.error("No database pool available")
return False
try:
async with pg_pool.acquire() as conn:
await conn.execute("""
INSERT INTO ner_analyses (
analysis_id, source_text, source_type, language, entities, keywords,
relationships, summary, embeddings, graph_data, export_files, text_stats,
er_stats, processing_time, entity_types, relationship_types
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (analysis_id) DO UPDATE SET
entities = EXCLUDED.entities,
relationships = EXCLUDED.relationships,
summary = EXCLUDED.summary
""",
data['analysis_id'],
data['source_text'][:10000],
data['source_type'],
data['language'],
json.dumps(data['entities'], ensure_ascii=False),
json.dumps(data['keywords'], ensure_ascii=False),
json.dumps(data['relationships'], ensure_ascii=False),
data['summary'],
json.dumps(data.get('embeddings', [])),
json.dumps(data.get('graph_data', {}), ensure_ascii=False, default=str),
json.dumps(data.get('export_files', {}), ensure_ascii=False, default=str),
json.dumps(data.get('text_stats', {})),
json.dumps(data.get('er_stats', {})),
float(data.get('processing_time', 0)),
json.dumps(list(set(entity.get('label', '') for entity in data.get('entities', [])))),
json.dumps(list(set(rel.get('relationship_type', '') for rel in data.get('relationships', []))))
)
logger.info(f"✅ Analysis {data['analysis_id']} saved to database")
return True
except Exception as e:
logger.error(f"❌ DB save failed for {data.get('analysis_id', 'unknown')}: {e}")
return False
async def save_to_blob(analysis_id: str, data: Dict[str, Any]) -> bool:
blob_client = get_blob_client()
if not blob_client:
return False
try:
blob_name = f"ner_analysis/{analysis_id}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
blob_client_obj = blob_client.get_blob_client(container=config.BLOB_CONTAINER, blob=blob_name)
blob_client_obj.upload_blob(json.dumps(data, indent=2, ensure_ascii=False, default=str), overwrite=True)
return True
except Exception as e:
logger.error(f"Blob save failed: {e}")
return False
# App Lifecycle
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("🚀 Starting Enhanced NER Analysis Service...")
logger.info("🔄 Database initialization...")
db_ok = await init_database()
if not db_ok:
logger.error("❌ Database initialization failed!")
raise RuntimeError("Database initialization failed")
logger.info("🔄 Initializing API clients...")
get_deepseek_client()
get_openai_client()
get_blob_client()
logger.info("🔄 Creating export directories...")
EXPORT_DIR.mkdir(exist_ok=True)
logger.info("🎉 Enhanced NER Analysis Service is ready!")
logger.info(f"📡 Server running on http://{config.HOST}:{config.PORT}")
yield
logger.info("🛑 Shutting down...")
if pg_pool:
await pg_pool.close()
logger.info("✅ Database connections closed")
# FastAPI App
app = FastAPI(
title="Enhanced NER Analysis Service",
description="Advanced Named Entity Recognition with relationship extraction and graph exports",
version="2.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# API Endpoints
@app.get("/")
async def root():
deepseek_available = bool(config.DEEPSEEK_ENDPOINT and config.DEEPSEEK_API_KEY)
openai_available = bool(config.AZURE_OPENAI_ENDPOINT and config.AZURE_OPENAI_API_KEY)
blob_available = bool(config.AZURE_STORAGE_ACCOUNT_URL and config.AZURE_BLOB_SAS_TOKEN)
return {
"message": "Enhanced NER Analysis Service",
"version": "2.0.0",
"status": "operational",
"supported_entities": config.ENTITY_TYPES,
"supported_relationships": config.RELATIONSHIP_TYPES[:10],
"export_formats": ["neo4j", "json", "graphml"],
"features": {
"ner_analysis": True,
"relationship_extraction": True,
"thai_language_support": True,
"graph_database_export": True,
"embedding_generation": openai_available,
"deepseek_analysis": deepseek_available,
"blob_storage": blob_available
}
}
@app.get("/health")
async def health():
deepseek_available = bool(config.DEEPSEEK_ENDPOINT and config.DEEPSEEK_API_KEY)
openai_available = bool(config.AZURE_OPENAI_ENDPOINT and config.AZURE_OPENAI_API_KEY)
blob_available = bool(config.AZURE_STORAGE_ACCOUNT_URL and config.AZURE_BLOB_SAS_TOKEN)
return {
"status": "healthy",
"service": "NER Analysis Service",
"version": "2.0.0",
"database": pg_pool is not None,
"vector_extension": vector_available,
"deepseek": deepseek_available,
"openai": openai_available,
"blob_storage": blob_available,
"supported_entity_count": len(config.ENTITY_TYPES),
"supported_relationship_count": len(config.RELATIONSHIP_TYPES),
"export_formats": ["neo4j", "json", "graphml"]
}
@app.post("/analyze/text", response_model=NERResponse)
async def analyze_text(request: NERRequest, background_tasks: BackgroundTasks):
"""Analyze text for entities and relationships"""
start_time = datetime.utcnow()
analysis_id = f"text_{int(start_time.timestamp())}"
if not request.text or not request.text.strip():
raise HTTPException(status_code=400, detail="Text is required")
try:
language = detect_language(request.text)
text_stats = get_text_stats(request.text)
# Enhanced analysis
analysis_result = await analyze_with_deepseek(request.text, language)
# Generate embeddings if requested
embeddings = []
if request.include_embeddings:
embeddings = await generate_embeddings(request.text)
# Create enhanced graph
graph_data = create_enhanced_graph_data(
analysis_result.get('entities', []),
analysis_result.get('relationships', [])
)
# Calculate ER statistics
er_stats = calculate_er_stats(
analysis_result.get('entities', []),
analysis_result.get('relationships', [])
)
# Generate export files if requested
export_files = ExportFiles()
if request.generate_graph_files:
export_files = await generate_export_files(
analysis_id,
analysis_result.get('entities', []),
analysis_result.get('relationships', []),
graph_data,
request.export_formats
)
processing_time = (datetime.utcnow() - start_time).total_seconds()
response_data = {
"analysis_id": analysis_id,
"source_text": request.text,
"source_type": "text_input",
"language": language,
"entities": analysis_result.get('entities', []),
"keywords": analysis_result.get('keywords', []),
"relationships": analysis_result.get('relationships', []),
"summary": analysis_result.get('summary', ''),
"embeddings": embeddings,
"graph_data": graph_data,
"export_files": export_files,
"text_stats": text_stats,
"er_stats": er_stats,
"processing_time": processing_time,
"character_count": text_stats["character_count"],
"word_count": text_stats["word_count"],
"sentence_count": text_stats["sentence_count"]
}
# Save to database in background
background_tasks.add_task(save_to_database, response_data)
background_tasks.add_task(save_to_blob, analysis_id, response_data)
return NERResponse(
success=True,
entity_relationship_stats=er_stats,
**response_data
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Text analysis failed: {e}")
return NERResponse(
success=False,
analysis_id=analysis_id,
source_text=request.text[:1000],
source_type="text_input",
language="unknown",
entities=[],
keywords=[],
relationships=[],
summary="",
graph_data=GraphData(nodes=[], links=[], metadata={}),
export_files=ExportFiles(),
processing_time=(datetime.utcnow() - start_time).total_seconds(),
character_count=0,
word_count=0,
sentence_count=0,
entity_relationship_stats={},
error=str(e)
)
@app.post("/analyze/file", response_model=NERResponse)
async def analyze_file(
file: UploadFile = File(...),
extract_relationships: bool = Form(True),
include_embeddings: bool = Form(True),
include_summary: bool = Form(True),
generate_graph_files: bool = Form(True),
export_formats: str = Form("neo4j,json"),
background_tasks: BackgroundTasks = None
):
"""Analyze uploaded file for entities and relationships"""
start_time = datetime.utcnow()
analysis_id = f"file_{int(start_time.timestamp())}"
if not file.filename:
raise HTTPException(status_code=400, detail="No filename")
try:
file_content = await file.read()
if len(file_content) > config.MAX_FILE_SIZE:
raise HTTPException(status_code=400, detail="File too large")
file_ext = Path(file.filename).suffix.lower()
export_format_list = export_formats.split(',') if export_formats else ["json"]
if file_ext in config.SUPPORTED_TEXT_FORMATS:
text = extract_text_from_file(file_content, file.filename)
source_type = "text_file"
elif file_ext in config.SUPPORTED_OCR_FORMATS:
text = await get_text_from_ocr(file_content, file.filename)
source_type = "ocr_file"
else:
raise HTTPException(status_code=400, detail=f"Unsupported format: {file_ext}")
if not text.strip():
raise HTTPException(status_code=400, detail="No text extracted")
language = detect_language(text)
text_stats = get_text_stats(text)
# Enhanced analysis
analysis_result = await analyze_with_deepseek(text, language)
# Generate embeddings
embeddings = []
if include_embeddings:
embeddings = await generate_embeddings(text)
# Create enhanced graph
graph_data = create_enhanced_graph_data(
analysis_result.get('entities', []),
analysis_result.get('relationships', [])
)
# Calculate ER statistics
er_stats = calculate_er_stats(
analysis_result.get('entities', []),
analysis_result.get('relationships', [])
)
# Generate export files
export_files = ExportFiles()
if generate_graph_files:
export_files = await generate_export_files(
analysis_id,
analysis_result.get('entities', []),
analysis_result.get('relationships', []),
graph_data,
export_format_list
)
processing_time = (datetime.utcnow() - start_time).total_seconds()
response_data = {
"analysis_id": analysis_id,
"source_text": text,
"source_type": source_type,
"language": language,
"entities": analysis_result.get('entities', []),
"keywords": analysis_result.get('keywords', []),
"relationships": analysis_result.get('relationships', []),
"summary": analysis_result.get('summary', ''),
"embeddings": embeddings,
"graph_data": graph_data,
"export_files": export_files,
"text_stats": text_stats,
"er_stats": er_stats,
"processing_time": processing_time,
"character_count": text_stats["character_count"],
"word_count": text_stats["word_count"],
"sentence_count": text_stats["sentence_count"]
}
# Save in background
if background_tasks:
background_tasks.add_task(save_to_database, response_data)
background_tasks.add_task(save_to_blob, analysis_id, response_data)
return NERResponse(
success=True,
entity_relationship_stats=er_stats,
**response_data
)
except HTTPException:
raise
except Exception as e:
logger.error(f"File analysis failed: {e}")
return NERResponse(
success=False,
analysis_id=analysis_id,
source_text="",
source_type="file_input",
language="unknown",
entities=[],
keywords=[],
relationships=[],
summary="",
graph_data=GraphData(nodes=[], links=[], metadata={}),
export_files=ExportFiles(),
processing_time=(datetime.utcnow() - start_time).total_seconds(),
character_count=0,
word_count=0,
sentence_count=0,
entity_relationship_stats={},
error=str(e)
)
@app.post("/analyze/url", response_model=NERResponse)
async def analyze_url(request: NERRequest, background_tasks: BackgroundTasks):
"""Analyze URL content for entities and relationships"""
start_time = datetime.utcnow()
analysis_id = f"url_{int(start_time.timestamp())}"
if not request.url:
raise HTTPException(status_code=400, detail="URL is required")
try:
text = await get_text_from_url(str(request.url))
if not text.strip():
raise HTTPException(status_code=400, detail="No text extracted from URL")
language = detect_language(text)
text_stats = get_text_stats(text)
# Enhanced analysis
analysis_result = await analyze_with_deepseek(text, language)
# Generate embeddings
embeddings = []
if request.include_embeddings:
embeddings = await generate_embeddings(text)
# Create enhanced graph
graph_data = create_enhanced_graph_data(
analysis_result.get('entities', []),
analysis_result.get('relationships', [])
)
# Calculate ER statistics
er_stats = calculate_er_stats(
analysis_result.get('entities', []),
analysis_result.get('relationships', [])
)
# Generate export files
export_files = ExportFiles()
if request.generate_graph_files:
export_files = await generate_export_files(
analysis_id,
analysis_result.get('entities', []),
analysis_result.get('relationships', []),
graph_data,
request.export_formats
)
processing_time = (datetime.utcnow() - start_time).total_seconds()
response_data = {
"analysis_id": analysis_id,
"source_text": text,
"source_type": "url_content",
"language": language,
"entities": analysis_result.get('entities', []),
"keywords": analysis_result.get('keywords', []),
"relationships": analysis_result.get('relationships', []),
"summary": analysis_result.get('summary', ''),
"embeddings": embeddings,
"graph_data": graph_data,
"export_files": export_files,
"text_stats": text_stats,
"er_stats": er_stats,
"processing_time": processing_time,
"character_count": text_stats["character_count"],
"word_count": text_stats["word_count"],
"sentence_count": text_stats["sentence_count"]
}
# Save in background
background_tasks.add_task(save_to_database, response_data)
background_tasks.add_task(save_to_blob, analysis_id, response_data)
return NERResponse(
success=True,
entity_relationship_stats=er_stats,
**response_data
)
except HTTPException:
raise
except Exception as e:
logger.error(f"URL analysis failed: {e}")
return NERResponse(
success=False,
analysis_id=analysis_id,
source_text="",
source_type="url_content",
language="unknown",
entities=[],
keywords=[],
relationships=[],
summary="",
graph_data=GraphData(nodes=[], links=[], metadata={}),
export_files=ExportFiles(),
processing_time=(datetime.utcnow() - start_time).total_seconds(),
character_count=0,
word_count=0,
sentence_count=0,
entity_relationship_stats={},
error=str(e)
)
@app.get("/download/{analysis_id}/{file_type}")
async def download_export_file(analysis_id: str, file_type: str):
"""Download specific export file for an analysis"""
try:
analysis_dir = EXPORT_DIR / analysis_id
if not analysis_dir.exists():
raise HTTPException(status_code=404, detail=f"Analysis {analysis_id} not found")
file_mapping = {
"neo4j_nodes": "neo4j_nodes.csv",
"neo4j_relationships": "neo4j_relationships.csv",
"json": "analysis_export.json",
"graphml": "graph_export.graphml"
}
if file_type not in file_mapping:
raise HTTPException(status_code=400, detail=f"Invalid file type: {file_type}")
file_path = analysis_dir / file_mapping[file_type]
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"File {file_type} not found")
return FileResponse(path=file_path, filename=file_mapping[file_type])
except HTTPException:
raise
except Exception as e:
logger.error(f"Download failed for {analysis_id}/{file_type}: {e}")
raise HTTPException(status_code=500, detail=f"Download failed: {str(e)}")
@app.get("/entity-types")
async def get_entity_types():
"""Get all supported entity types"""
return {
"success": True,
"entity_types": config.ENTITY_TYPES,
"total_count": len(config.ENTITY_TYPES)
}
@app.get("/relationship-types")
async def get_relationship_types():
"""Get all supported relationship types"""
return {
"success": True,
"relationship_types": config.RELATIONSHIP_TYPES,
"total_count": len(config.RELATIONSHIP_TYPES)
}
if __name__ == "__main__":
print("🔧 Loading enhanced NER configuration...")
print(f"🌐 Will start server on {config.HOST}:{config.PORT}")
print(f"🏷️ Enhanced with {len(config.ENTITY_TYPES)} entity types")
print(f"🔗 Enhanced with {len(config.RELATIONSHIP_TYPES)} relationship types")
uvicorn.run(
"ner_service:app",
host=config.HOST,
port=config.PORT,
reload=config.DEBUG,
log_level="info"
)