Spaces:
Sleeping
Sleeping
File size: 4,377 Bytes
92c34be |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
from neo4j import GraphDatabase
class ChainGuardGraphDB:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
"""Close the database connection."""
self.driver.close()
def create_node(self, label, properties):
"""Create a node with the specified label and properties."""
with self.driver.session() as session:
session.write_transaction(self._create_and_return_node, label, properties)
@staticmethod
def _create_and_return_node(tx, label, properties):
query = (
f"CREATE (n:{label} {{"
+ ", ".join([f"{k}: ${k}" for k in properties.keys()])
+ "}}) RETURN n"
)
result = tx.run(query, **properties)
return result.single()[0]
def create_relationship(self, node1_label, node1_property, node2_label, node2_property, relationship_type):
"""Create a relationship between two nodes."""
with self.driver.session() as session:
session.write_transaction(
self._create_and_return_relationship,
node1_label, node1_property, node2_label, node2_property, relationship_type
)
@staticmethod
def _create_and_return_relationship(tx, node1_label, node1_property, node2_label, node2_property, relationship_type):
query = (
f"MATCH (a:{node1_label} {{name: $node1_property}}), (b:{node2_label} {{name: $node2_property}}) "
f"CREATE (a)-[r:{relationship_type}]->(b) "
"RETURN r"
)
result = tx.run(query, node1_property=node1_property, node2_property=node2_property)
return result.single()[0]
def find_related_anomalies(self, transaction_name):
"""Find all anomalies related to a specific blockchain transaction."""
with self.driver.session() as session:
result = session.run(
"MATCH (t:BlockchainTransaction {name: $transaction_name})-[:DETECTED_IN]->(a:Anomaly) "
"RETURN a.name, a.type, a.severity",
transaction_name=transaction_name
)
return [record for record in result]
def find_blockchain_transactions(self, anomaly_name):
"""Find all blockchain transactions related to a specific anomaly."""
with self.driver.session() as session:
result = session.run(
"MATCH (a:Anomaly {name: $anomaly_name})<-[:DETECTED_IN]-(t:BlockchainTransaction) "
"RETURN t.name, t.amount, t.timestamp",
anomaly_name=anomaly_name
)
return [record for record in result]
def validate_blockchain(self):
"""Validate the blockchain (this is a simplified example)."""
with self.driver.session() as session:
result = session.run(
"MATCH (b:BlockchainTransaction) "
"RETURN COUNT(b) as transaction_count"
)
count = result.single()["transaction_count"]
# Simplified validation: checks if there are transactions in the blockchain
return count > 0
# Example usage
if __name__ == "__main__":
# Connect to the database
db = ChainGuardGraphDB(uri="bolt://localhost:7687", user="neo4j", password="your_password")
# Create nodes
db.create_node("BlockchainTransaction", {"name": "Tx1", "amount": 100, "timestamp": "2024-09-01"})
db.create_node("Anomaly", {"name": "Anomaly1", "type": "Network", "severity": "High"})
# Create relationships
db.create_relationship("BlockchainTransaction", "Tx1", "Anomaly", "Anomaly1", "DETECTED_IN")
# Query related anomalies for a transaction
related_anomalies = db.find_related_anomalies("Tx1")
for anomaly in related_anomalies:
print(f"Related Anomaly: {anomaly['a.name']}, Type: {anomaly['a.type']}, Severity: {anomaly['a.severity']}")
# Query related transactions for an anomaly
related_transactions = db.find_blockchain_transactions("Anomaly1")
for tx in related_transactions:
print(f"Related Transaction: {tx['t.name']}, Amount: {tx['t.amount']}, Timestamp: {tx['t.timestamp']}")
# Validate the blockchain
is_valid = db.validate_blockchain()
print(f"Blockchain valid: {is_valid}")
# Close the connection
db.close()
|