|
|
""" |
|
|
src/storage/neo4j_graph.py |
|
|
Knowledge graph for event relationships and entity tracking |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from typing import Dict, Any, List, Optional |
|
|
|
|
|
logger = logging.getLogger("neo4j_graph") |
|
|
|
|
|
try: |
|
|
from neo4j import GraphDatabase |
|
|
|
|
|
NEO4J_AVAILABLE = True |
|
|
except ImportError: |
|
|
NEO4J_AVAILABLE = False |
|
|
logger.warning("[Neo4j] Package not installed. Knowledge graph disabled.") |
|
|
|
|
|
from .config import config |
|
|
|
|
|
|
|
|
class Neo4jGraph: |
|
|
""" |
|
|
Knowledge graph for tracking: |
|
|
- Event nodes with properties |
|
|
- Entity nodes (companies, politicians, locations) |
|
|
- Relationships (SIMILAR_TO, FOLLOWS, MENTIONS) |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.driver = None |
|
|
|
|
|
if not NEO4J_AVAILABLE or not config.NEO4J_ENABLED: |
|
|
logger.info("[Neo4j] Disabled (set NEO4J_ENABLED=true to enable)") |
|
|
return |
|
|
|
|
|
try: |
|
|
self._init_driver() |
|
|
self._create_indexes() |
|
|
logger.info(f"[Neo4j] Connected to {config.NEO4J_URI}") |
|
|
except Exception as e: |
|
|
logger.error(f"[Neo4j] Connection failed: {e}") |
|
|
self.driver = None |
|
|
|
|
|
def _init_driver(self): |
|
|
"""Initialize Neo4j driver""" |
|
|
self.driver = GraphDatabase.driver( |
|
|
config.NEO4J_URI, auth=(config.NEO4J_USER, config.NEO4J_PASSWORD) |
|
|
) |
|
|
|
|
|
|
|
|
self.driver.verify_connectivity() |
|
|
|
|
|
def _create_indexes(self): |
|
|
"""Create indexes for faster queries""" |
|
|
if not self.driver: |
|
|
return |
|
|
|
|
|
with self.driver.session() as session: |
|
|
|
|
|
session.run( |
|
|
"CREATE INDEX event_id_index IF NOT EXISTS FOR (e:Event) ON (e.event_id)" |
|
|
) |
|
|
|
|
|
|
|
|
session.run( |
|
|
"CREATE INDEX entity_name_index IF NOT EXISTS FOR (ent:Entity) ON (ent.name)" |
|
|
) |
|
|
|
|
|
|
|
|
session.run( |
|
|
"CREATE INDEX domain_index IF NOT EXISTS FOR (d:Domain) ON (d.name)" |
|
|
) |
|
|
|
|
|
def add_event( |
|
|
self, |
|
|
event_id: str, |
|
|
domain: str, |
|
|
summary: str, |
|
|
severity: str, |
|
|
impact_type: str, |
|
|
confidence_score: float, |
|
|
timestamp: str, |
|
|
metadata: Optional[Dict[str, Any]] = None, |
|
|
): |
|
|
"""Add event node to knowledge graph""" |
|
|
if not self.driver: |
|
|
return |
|
|
|
|
|
with self.driver.session() as session: |
|
|
query = """ |
|
|
MERGE (e:Event {event_id: $event_id}) |
|
|
SET e.domain = $domain, |
|
|
e.summary = $summary, |
|
|
e.severity = $severity, |
|
|
e.impact_type = $impact_type, |
|
|
e.confidence_score = $confidence_score, |
|
|
e.timestamp = $timestamp, |
|
|
e.created_at = datetime() |
|
|
|
|
|
MERGE (d:Domain {name: $domain}) |
|
|
MERGE (e)-[:BELONGS_TO]->(d) |
|
|
|
|
|
RETURN e.event_id as created_id |
|
|
""" |
|
|
|
|
|
result = session.run( |
|
|
query, |
|
|
event_id=event_id, |
|
|
domain=domain, |
|
|
summary=summary[:2000], |
|
|
severity=severity, |
|
|
impact_type=impact_type, |
|
|
confidence_score=confidence_score, |
|
|
timestamp=timestamp, |
|
|
) |
|
|
|
|
|
created = result.single() |
|
|
if created: |
|
|
logger.debug(f"[Neo4j] Created event: {event_id[:8]}...") |
|
|
|
|
|
def link_similar_events(self, event_id_1: str, event_id_2: str, similarity: float): |
|
|
"""Create SIMILAR_TO relationship between events""" |
|
|
if not self.driver: |
|
|
return |
|
|
|
|
|
with self.driver.session() as session: |
|
|
query = """ |
|
|
MATCH (e1:Event {event_id: $id1}) |
|
|
MATCH (e2:Event {event_id: $id2}) |
|
|
MERGE (e1)-[r:SIMILAR_TO]-(e2) |
|
|
SET r.similarity = $similarity, |
|
|
r.created_at = datetime() |
|
|
""" |
|
|
|
|
|
session.run(query, id1=event_id_1, id2=event_id_2, similarity=similarity) |
|
|
logger.debug( |
|
|
f"[Neo4j] Linked similar events: {event_id_1[:8]}... <-> {event_id_2[:8]}..." |
|
|
) |
|
|
|
|
|
def link_temporal_sequence(self, earlier_event_id: str, later_event_id: str): |
|
|
"""Create FOLLOWS relationship for temporal sequence""" |
|
|
if not self.driver: |
|
|
return |
|
|
|
|
|
with self.driver.session() as session: |
|
|
query = """ |
|
|
MATCH (e1:Event {event_id: $earlier_id}) |
|
|
MATCH (e2:Event {event_id: $later_id}) |
|
|
WHERE datetime(e2.timestamp) > datetime(e1.timestamp) |
|
|
MERGE (e1)-[r:FOLLOWS]->(e2) |
|
|
SET r.created_at = datetime() |
|
|
""" |
|
|
|
|
|
session.run(query, earlier_id=earlier_event_id, later_id=later_event_id) |
|
|
|
|
|
def get_event_clusters(self, min_cluster_size: int = 2) -> List[Dict[str, Any]]: |
|
|
"""Find clusters of similar events""" |
|
|
if not self.driver: |
|
|
return [] |
|
|
|
|
|
with self.driver.session() as session: |
|
|
query = """ |
|
|
MATCH (e1:Event)-[:SIMILAR_TO]-(e2:Event) |
|
|
WITH e1, COLLECT(e2) as similar_events |
|
|
WHERE SIZE(similar_events) >= $min_size |
|
|
RETURN e1.event_id as event_id, |
|
|
e1.summary as summary, |
|
|
SIZE(similar_events) as cluster_size |
|
|
ORDER BY cluster_size DESC |
|
|
LIMIT 10 |
|
|
""" |
|
|
|
|
|
results = session.run(query, min_size=min_cluster_size) |
|
|
|
|
|
clusters = [] |
|
|
for record in results: |
|
|
clusters.append( |
|
|
{ |
|
|
"event_id": record["event_id"], |
|
|
"summary": record["summary"], |
|
|
"cluster_size": record["cluster_size"], |
|
|
} |
|
|
) |
|
|
|
|
|
return clusters |
|
|
|
|
|
def get_domain_stats(self) -> List[Dict[str, Any]]: |
|
|
"""Get event count by domain""" |
|
|
if not self.driver: |
|
|
return [] |
|
|
|
|
|
with self.driver.session() as session: |
|
|
query = """ |
|
|
MATCH (e:Event)-[:BELONGS_TO]->(d:Domain) |
|
|
RETURN d.name as domain, |
|
|
COUNT(e) as event_count |
|
|
ORDER BY event_count DESC |
|
|
""" |
|
|
|
|
|
results = session.run(query) |
|
|
|
|
|
stats = [] |
|
|
for record in results: |
|
|
stats.append( |
|
|
{"domain": record["domain"], "event_count": record["event_count"]} |
|
|
) |
|
|
|
|
|
return stats |
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
"""Get graph statistics""" |
|
|
if not self.driver: |
|
|
return {"status": "disabled"} |
|
|
|
|
|
try: |
|
|
with self.driver.session() as session: |
|
|
|
|
|
event_count = session.run( |
|
|
"MATCH (e:Event) RETURN COUNT(e) as count" |
|
|
).single()["count"] |
|
|
domain_count = session.run( |
|
|
"MATCH (d:Domain) RETURN COUNT(d) as count" |
|
|
).single()["count"] |
|
|
|
|
|
|
|
|
similar_count = session.run( |
|
|
"MATCH ()-[r:SIMILAR_TO]-() RETURN COUNT(r) as count" |
|
|
).single()["count"] |
|
|
|
|
|
return { |
|
|
"status": "active", |
|
|
"total_events": event_count, |
|
|
"total_domains": domain_count, |
|
|
"similarity_links": similar_count, |
|
|
"uri": config.NEO4J_URI, |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"[Neo4j] Stats error: {e}") |
|
|
return {"status": "error", "error": str(e)} |
|
|
|
|
|
def close(self): |
|
|
"""Close Neo4j driver connection""" |
|
|
if self.driver: |
|
|
self.driver.close() |
|
|
logger.info("[Neo4j] Connection closed") |
|
|
|