Spandan Roy
Phase 2 Complete: Multi-Agent System with Neo4j and LangChain
a09cc94
"""Neo4j AuraDB Knowledge Graph Client"""
import os
from neo4j import GraphDatabase
from typing import List, Dict, Any, Optional
import logging
from datetime import datetime
# Ensure environment variables from a .env file are loaded when this module is imported.
# Some callers import this module before calling load_dotenv(); loading here makes the
# client robust to that ordering. If python-dotenv isn't available, just rely on the
# existing environment variables.
try:
from dotenv import load_dotenv
load_dotenv()
except Exception:
# dotenv not installed or failed to load; continue silently
pass
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Neo4jClient:
"""Client for interacting with Neo4j AuraDB Knowledge Graph"""
def __init__(self):
"""Initialize Neo4j connection with environment variables"""
self.uri = os.getenv('NEO4J_URI')
self.user = os.getenv('NEO4J_USER', 'neo4j')
self.password = os.getenv('NEO4J_PASSWORD')
self.driver = None
if not self.uri or not self.password:
raise ValueError(
"Neo4j credentials missing! Set NEO4J_URI and NEO4J_PASSWORD "
"in .env file"
)
def connect(self):
"""Establish connection to Neo4j AuraDB"""
try:
self.driver = GraphDatabase.driver(
self.uri,
auth=(self.user, self.password),
max_connection_lifetime=3600
)
# Test connection
with self.driver.session() as session:
result = session.run("RETURN 1 AS test")
result.single()
logger.info("βœ… Connected to Neo4j AuraDB")
return True
except Exception as e:
logger.error(f"❌ Neo4j AuraDB connection failed: {e}")
logger.error(f"URI: {self.uri}")
logger.error(f"User: {self.user}")
return False
def close(self):
"""Close Neo4j connection"""
if self.driver:
self.driver.close()
logger.info("Neo4j connection closed")
def execute_query(self, query: str, parameters: Dict = None):
"""Execute a Cypher query and return results"""
if not self.driver:
logger.error("Not connected to Neo4j")
return None
try:
with self.driver.session() as session:
result = session.run(query, parameters or {})
return [record.data() for record in result]
except Exception as e:
logger.error(f"Query execution failed: {e}")
logger.error(f"Query: {query}")
return None
def create_bug(self, bug_data: Dict[str, Any]) -> Optional[Dict]:
"""Create a new bug node in the knowledge graph"""
query = """
CREATE (b:Bug {
id: $id,
title: $title,
description: $description,
priority: $priority,
severity: $severity,
category: $category,
status: 'Open',
component: $component,
reporter: $reporter,
created_at: datetime(),
estimated_hours: $estimated_hours
})
RETURN b
"""
result = self.execute_query(query, bug_data)
if result:
logger.info(f"βœ… Created bug: {bug_data['id']}")
return result[0]
return None
def link_bug_to_component(self, bug_id: str, component_name: str):
"""Link a bug to its affected component"""
query = """
MATCH (b:Bug {id: $bug_id}), (c:Component {name: $component_name})
CREATE (b)-[:AFFECTS]->(c)
RETURN b, c
"""
result = self.execute_query(query, {
'bug_id': bug_id,
'component_name': component_name
})
if result:
logger.info(f"βœ… Linked {bug_id} to {component_name}")
return True
return False
def get_developers_by_component(self, component_name: str) -> List[Dict]:
"""Find developers with expertise in a specific component"""
query = """
MATCH (d:Developer)-[e:EXPERT_IN]->(c:Component {name: $component_name})
WHERE d.current_workload < d.max_capacity
RETURN d.id AS dev_id,
d.name AS name,
d.email AS email,
d.expertise AS expertise,
d.skill_level AS skill_level,
d.current_workload AS current_workload,
d.max_capacity AS max_capacity,
d.avg_resolution_time_hours AS avg_resolution_time,
d.success_rate AS success_rate,
e.proficiency AS proficiency,
e.years_experience AS years_experience
ORDER BY e.proficiency DESC, d.current_workload ASC
LIMIT 5
"""
result = self.execute_query(query, {'component_name': component_name})
if result:
logger.info(f"βœ… Found {len(result)} developers for {component_name}")
return result
return []
def get_developer_workload(self, dev_id: str) -> Optional[Dict]:
"""Get current workload information for a developer"""
query = """
MATCH (d:Developer {id: $dev_id})
OPTIONAL MATCH (d)-[:ASSIGNED_TO]->(b:Bug {status: 'Open'})
WITH d, COUNT(b) AS active_bugs
RETURN d.id AS dev_id,
d.name AS name,
d.current_workload AS current_workload,
d.max_capacity AS max_capacity,
active_bugs,
(d.max_capacity - d.current_workload) AS available_capacity
"""
result = self.execute_query(query, {'dev_id': dev_id})
if result:
return result[0]
return None
def assign_bug_to_developer(self, bug_id: str, dev_id: str,
assignment_reason: str) -> bool:
"""Assign a bug to a developer"""
query = """
MATCH (b:Bug {id: $bug_id}), (d:Developer {id: $dev_id})
CREATE (d)-[:ASSIGNED_TO {
assigned_at: datetime(),
reason: $reason,
status: 'Active'
}]->(b)
SET d.current_workload = d.current_workload + 1,
b.status = 'Assigned',
b.assigned_at = datetime()
RETURN b, d
"""
result = self.execute_query(query, {
'bug_id': bug_id,
'dev_id': dev_id,
'reason': assignment_reason
})
if result:
logger.info(f"βœ… Assigned {bug_id} to {dev_id}")
return True
return False
def get_all_open_bugs(self) -> List[Dict]:
"""Get all open bugs that need assignment"""
query = """
MATCH (b:Bug {status: 'Open'})
OPTIONAL MATCH (b)-[:AFFECTS]->(c:Component)
RETURN b.id AS bug_id,
b.title AS title,
b.priority AS priority,
b.category AS category,
b.component AS component,
c.name AS component_name,
b.estimated_hours AS estimated_hours
ORDER BY
CASE b.priority
WHEN 'P0' THEN 0
WHEN 'P1' THEN 1
WHEN 'P2' THEN 2
WHEN 'P3' THEN 3
ELSE 4
END
"""
result = self.execute_query(query)
if result:
logger.info(f"βœ… Found {len(result)} open bugs")
return result
return []
def get_graph_statistics(self) -> Dict:
"""Get knowledge graph statistics"""
queries = {
'total_bugs': "MATCH (b:Bug) RETURN COUNT(b) AS count",
'open_bugs': "MATCH (b:Bug {status: 'Open'}) RETURN COUNT(b) AS count",
'total_developers': "MATCH (d:Developer) RETURN COUNT(d) AS count",
'available_developers': """
MATCH (d:Developer)
WHERE d.current_workload < d.max_capacity
RETURN COUNT(d) AS count
""",
'total_components': "MATCH (c:Component) RETURN COUNT(c) AS count",
'total_teams': "MATCH (t:Team) RETURN COUNT(t) AS count"
}
stats = {}
for key, query in queries.items():
result = self.execute_query(query)
if result:
stats[key] = result[0]['count']
else:
stats[key] = 0
return stats
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
client = Neo4jClient()
if client.connect():
print("\nπŸŽ‰ Neo4j AuraDB connection successful!")
# Test: Get graph statistics
print("\nπŸ“Š Knowledge Graph Statistics:")
stats = client.get_graph_statistics()
for key, value in stats.items():
print(f" {key}: {value}")
# Test: Find developers for a component
print("\nπŸ‘₯ Developers for 'Authentication' component:")
devs = client.get_developers_by_component('Authentication')
for dev in devs:
print(f" - {dev['name']} (proficiency: {dev['proficiency']:.2f})")
# Test: Get open bugs
print("\nπŸ› Open Bugs:")
bugs = client.get_all_open_bugs()
for bug in bugs[:3]:
print(f" - {bug['bug_id']}: {bug['title']}")
client.close()