File size: 4,284 Bytes
129cd69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from typing import List, Optional, Union

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, messages_from_dict

from langchain.utils import get_from_env


class Neo4jChatMessageHistory(BaseChatMessageHistory):
    """Chat message history stored in a Neo4j database."""

    def __init__(
        self,
        session_id: Union[str, int],
        url: Optional[str] = None,
        username: Optional[str] = None,
        password: Optional[str] = None,
        database: str = "neo4j",
        node_label: str = "Session",
        window: int = 3,
    ):
        try:
            import neo4j
        except ImportError:
            raise ValueError(
                "Could not import neo4j python package. "
                "Please install it with `pip install neo4j`."
            )

        # Make sure session id is not null
        if not session_id:
            raise ValueError("Please ensure that the session_id parameter is provided")

        url = get_from_env("url", "NEO4J_URI", url)
        username = get_from_env("username", "NEO4J_USERNAME", username)
        password = get_from_env("password", "NEO4J_PASSWORD", password)
        database = get_from_env("database", "NEO4J_DATABASE", database)

        self._driver = neo4j.GraphDatabase.driver(url, auth=(username, password))
        self._database = database
        self._session_id = session_id
        self._node_label = node_label
        self._window = window

        # Verify connection
        try:
            self._driver.verify_connectivity()
        except neo4j.exceptions.ServiceUnavailable:
            raise ValueError(
                "Could not connect to Neo4j database. "
                "Please ensure that the url is correct"
            )
        except neo4j.exceptions.AuthError:
            raise ValueError(
                "Could not connect to Neo4j database. "
                "Please ensure that the username and password are correct"
            )
        # Create session node
        self._driver.execute_query(
            f"MERGE (s:`{self._node_label}` {{id:$session_id}})",
            {"session_id": self._session_id},
        ).summary

    @property
    def messages(self) -> List[BaseMessage]:  # type: ignore
        """Retrieve the messages from Neo4j"""
        query = (
            f"MATCH (s:`{self._node_label}`)-[:LAST_MESSAGE]->(last_message) "
            "WHERE s.id = $session_id MATCH p=(last_message)<-[:NEXT*0.."
            f"{self._window*2}]-() WITH p, length(p) AS length "
            "ORDER BY length DESC LIMIT 1 UNWIND reverse(nodes(p)) AS node "
            "RETURN {data:{content: node.content}, type:node.type} AS result"
        )
        records, _, _ = self._driver.execute_query(
            query, {"session_id": self._session_id}
        )

        messages = messages_from_dict([el["result"] for el in records])
        return messages

    def add_message(self, message: BaseMessage) -> None:
        """Append the message to the record in Neo4j"""
        query = (
            f"MATCH (s:`{self._node_label}`) WHERE s.id = $session_id "
            "OPTIONAL MATCH (s)-[lm:LAST_MESSAGE]->(last_message) "
            "CREATE (s)-[:LAST_MESSAGE]->(new:Message) "
            "SET new += {type:$type, content:$content} "
            "WITH new, lm, last_message WHERE last_message IS NOT NULL "
            "CREATE (last_message)-[:NEXT]->(new) "
            "DELETE lm"
        )
        self._driver.execute_query(
            query,
            {
                "type": message.type,
                "content": message.content,
                "session_id": self._session_id,
            },
        ).summary

    def clear(self) -> None:
        """Clear session memory from Neo4j"""
        query = (
            f"MATCH (s:`{self._node_label}`)-[:LAST_MESSAGE]->(last_message) "
            "WHERE s.id = $session_id MATCH p=(last_message)<-[:NEXT]-() "
            "WITH p, length(p) AS length ORDER BY length DESC LIMIT 1 "
            "UNWIND nodes(p) as node DETACH DELETE node;"
        )
        self._driver.execute_query(query, {"session_id": self._session_id}).summary

    def __del__(self) -> None:
        if self._driver:
            self._driver.close()