Spaces:
Sleeping
Sleeping
| """Neo4j AuraDB Knowledge Graph Client""" | |
| import os | |
| from neo4j import GraphDatabase | |
| from typing import List, Dict, Any, Optional | |
| import logging | |
| from datetime import datetime | |
| # Ensure environment variables from a .env file are loaded when this module is imported. | |
| # Some callers import this module before calling load_dotenv(); loading here makes the | |
| # client robust to that ordering. If python-dotenv isn't available, just rely on the | |
| # existing environment variables. | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except Exception: | |
| # dotenv not installed or failed to load; continue silently | |
| pass | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class Neo4jClient: | |
| """Client for interacting with Neo4j AuraDB Knowledge Graph""" | |
| def __init__(self): | |
| """Initialize Neo4j connection with environment variables""" | |
| self.uri = os.getenv('NEO4J_URI') | |
| self.user = os.getenv('NEO4J_USER', 'neo4j') | |
| self.password = os.getenv('NEO4J_PASSWORD') | |
| self.driver = None | |
| if not self.uri or not self.password: | |
| raise ValueError( | |
| "Neo4j credentials missing! Set NEO4J_URI and NEO4J_PASSWORD " | |
| "in .env file" | |
| ) | |
| def connect(self): | |
| """Establish connection to Neo4j AuraDB""" | |
| try: | |
| self.driver = GraphDatabase.driver( | |
| self.uri, | |
| auth=(self.user, self.password), | |
| max_connection_lifetime=3600 | |
| ) | |
| # Test connection | |
| with self.driver.session() as session: | |
| result = session.run("RETURN 1 AS test") | |
| result.single() | |
| logger.info("β Connected to Neo4j AuraDB") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Neo4j AuraDB connection failed: {e}") | |
| logger.error(f"URI: {self.uri}") | |
| logger.error(f"User: {self.user}") | |
| return False | |
| def close(self): | |
| """Close Neo4j connection""" | |
| if self.driver: | |
| self.driver.close() | |
| logger.info("Neo4j connection closed") | |
| def execute_query(self, query: str, parameters: Dict = None): | |
| """Execute a Cypher query and return results""" | |
| if not self.driver: | |
| logger.error("Not connected to Neo4j") | |
| return None | |
| try: | |
| with self.driver.session() as session: | |
| result = session.run(query, parameters or {}) | |
| return [record.data() for record in result] | |
| except Exception as e: | |
| logger.error(f"Query execution failed: {e}") | |
| logger.error(f"Query: {query}") | |
| return None | |
| def create_bug(self, bug_data: Dict[str, Any]) -> Optional[Dict]: | |
| """Create a new bug node in the knowledge graph""" | |
| query = """ | |
| CREATE (b:Bug { | |
| id: $id, | |
| title: $title, | |
| description: $description, | |
| priority: $priority, | |
| severity: $severity, | |
| category: $category, | |
| status: 'Open', | |
| component: $component, | |
| reporter: $reporter, | |
| created_at: datetime(), | |
| estimated_hours: $estimated_hours | |
| }) | |
| RETURN b | |
| """ | |
| result = self.execute_query(query, bug_data) | |
| if result: | |
| logger.info(f"β Created bug: {bug_data['id']}") | |
| return result[0] | |
| return None | |
| def link_bug_to_component(self, bug_id: str, component_name: str): | |
| """Link a bug to its affected component""" | |
| query = """ | |
| MATCH (b:Bug {id: $bug_id}), (c:Component {name: $component_name}) | |
| CREATE (b)-[:AFFECTS]->(c) | |
| RETURN b, c | |
| """ | |
| result = self.execute_query(query, { | |
| 'bug_id': bug_id, | |
| 'component_name': component_name | |
| }) | |
| if result: | |
| logger.info(f"β Linked {bug_id} to {component_name}") | |
| return True | |
| return False | |
| def get_developers_by_component(self, component_name: str) -> List[Dict]: | |
| """Find developers with expertise in a specific component""" | |
| query = """ | |
| MATCH (d:Developer)-[e:EXPERT_IN]->(c:Component {name: $component_name}) | |
| WHERE d.current_workload < d.max_capacity | |
| RETURN d.id AS dev_id, | |
| d.name AS name, | |
| d.email AS email, | |
| d.expertise AS expertise, | |
| d.skill_level AS skill_level, | |
| d.current_workload AS current_workload, | |
| d.max_capacity AS max_capacity, | |
| d.avg_resolution_time_hours AS avg_resolution_time, | |
| d.success_rate AS success_rate, | |
| e.proficiency AS proficiency, | |
| e.years_experience AS years_experience | |
| ORDER BY e.proficiency DESC, d.current_workload ASC | |
| LIMIT 5 | |
| """ | |
| result = self.execute_query(query, {'component_name': component_name}) | |
| if result: | |
| logger.info(f"β Found {len(result)} developers for {component_name}") | |
| return result | |
| return [] | |
| def get_developer_workload(self, dev_id: str) -> Optional[Dict]: | |
| """Get current workload information for a developer""" | |
| query = """ | |
| MATCH (d:Developer {id: $dev_id}) | |
| OPTIONAL MATCH (d)-[:ASSIGNED_TO]->(b:Bug {status: 'Open'}) | |
| WITH d, COUNT(b) AS active_bugs | |
| RETURN d.id AS dev_id, | |
| d.name AS name, | |
| d.current_workload AS current_workload, | |
| d.max_capacity AS max_capacity, | |
| active_bugs, | |
| (d.max_capacity - d.current_workload) AS available_capacity | |
| """ | |
| result = self.execute_query(query, {'dev_id': dev_id}) | |
| if result: | |
| return result[0] | |
| return None | |
| def assign_bug_to_developer(self, bug_id: str, dev_id: str, | |
| assignment_reason: str) -> bool: | |
| """Assign a bug to a developer""" | |
| query = """ | |
| MATCH (b:Bug {id: $bug_id}), (d:Developer {id: $dev_id}) | |
| CREATE (d)-[:ASSIGNED_TO { | |
| assigned_at: datetime(), | |
| reason: $reason, | |
| status: 'Active' | |
| }]->(b) | |
| SET d.current_workload = d.current_workload + 1, | |
| b.status = 'Assigned', | |
| b.assigned_at = datetime() | |
| RETURN b, d | |
| """ | |
| result = self.execute_query(query, { | |
| 'bug_id': bug_id, | |
| 'dev_id': dev_id, | |
| 'reason': assignment_reason | |
| }) | |
| if result: | |
| logger.info(f"β Assigned {bug_id} to {dev_id}") | |
| return True | |
| return False | |
| def get_all_open_bugs(self) -> List[Dict]: | |
| """Get all open bugs that need assignment""" | |
| query = """ | |
| MATCH (b:Bug {status: 'Open'}) | |
| OPTIONAL MATCH (b)-[:AFFECTS]->(c:Component) | |
| RETURN b.id AS bug_id, | |
| b.title AS title, | |
| b.priority AS priority, | |
| b.category AS category, | |
| b.component AS component, | |
| c.name AS component_name, | |
| b.estimated_hours AS estimated_hours | |
| ORDER BY | |
| CASE b.priority | |
| WHEN 'P0' THEN 0 | |
| WHEN 'P1' THEN 1 | |
| WHEN 'P2' THEN 2 | |
| WHEN 'P3' THEN 3 | |
| ELSE 4 | |
| END | |
| """ | |
| result = self.execute_query(query) | |
| if result: | |
| logger.info(f"β Found {len(result)} open bugs") | |
| return result | |
| return [] | |
| def get_graph_statistics(self) -> Dict: | |
| """Get knowledge graph statistics""" | |
| queries = { | |
| 'total_bugs': "MATCH (b:Bug) RETURN COUNT(b) AS count", | |
| 'open_bugs': "MATCH (b:Bug {status: 'Open'}) RETURN COUNT(b) AS count", | |
| 'total_developers': "MATCH (d:Developer) RETURN COUNT(d) AS count", | |
| 'available_developers': """ | |
| MATCH (d:Developer) | |
| WHERE d.current_workload < d.max_capacity | |
| RETURN COUNT(d) AS count | |
| """, | |
| 'total_components': "MATCH (c:Component) RETURN COUNT(c) AS count", | |
| 'total_teams': "MATCH (t:Team) RETURN COUNT(t) AS count" | |
| } | |
| stats = {} | |
| for key, query in queries.items(): | |
| result = self.execute_query(query) | |
| if result: | |
| stats[key] = result[0]['count'] | |
| else: | |
| stats[key] = 0 | |
| return stats | |
| if __name__ == "__main__": | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| client = Neo4jClient() | |
| if client.connect(): | |
| print("\nπ Neo4j AuraDB connection successful!") | |
| # Test: Get graph statistics | |
| print("\nπ Knowledge Graph Statistics:") | |
| stats = client.get_graph_statistics() | |
| for key, value in stats.items(): | |
| print(f" {key}: {value}") | |
| # Test: Find developers for a component | |
| print("\nπ₯ Developers for 'Authentication' component:") | |
| devs = client.get_developers_by_component('Authentication') | |
| for dev in devs: | |
| print(f" - {dev['name']} (proficiency: {dev['proficiency']:.2f})") | |
| # Test: Get open bugs | |
| print("\nπ Open Bugs:") | |
| bugs = client.get_all_open_bugs() | |
| for bug in bugs[:3]: | |
| print(f" - {bug['bug_id']}: {bug['title']}") | |
| client.close() |