File size: 4,057 Bytes
5876f53
d19a515
 
1d75bf0
 
 
 
 
 
 
5876f53
1d75bf0
5876f53
 
056dbb4
 
 
5876f53
d19a515
 
 
 
 
5876f53
 
 
 
d19a515
5876f53
 
d19a515
5876f53
 
 
d6836a4
 
 
d19a515
5876f53
 
d19a515
5876f53
d19a515
 
 
5876f53
 
 
 
 
 
 
 
d19a515
5876f53
 
 
d19a515
 
5876f53
 
 
 
 
 
d19a515
5876f53
 
 
 
 
 
f353888
5876f53
 
 
d19a515
5876f53
 
 
056dbb4
5876f53
 
 
 
f353888
5876f53
f353888
 
 
 
5876f53
d19a515
5876f53
 
 
056dbb4
5876f53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d19a515
5876f53
 
 
 
 
 
 
 
d19a515
 
 
 
 
 
 
 
 
5876f53
d19a515
 
5876f53
 
d19a515
5876f53
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import asyncio
import sys
import os
import pipmaster as pm

if not pm.is_installed("psycopg-pool"):
    pm.install("psycopg-pool")
    pm.install("psycopg[binary,pool]")
if not pm.is_installed("asyncpg"):
    pm.install("asyncpg")

import asyncpg
import psycopg
from psycopg_pool import AsyncConnectionPool

from ..kg.postgres_impl import PostgreSQLDB, PGGraphStorage
from ..namespace import NameSpace

DB = "rag"
USER = "rag"
PASSWORD = "rag"
HOST = "localhost"
PORT = "15432"
os.environ["AGE_GRAPH_NAME"] = "dickens"

if sys.platform.startswith("win"):
    import asyncio.windows_events

    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())


async def get_pool():
    return await asyncpg.create_pool(
        f"postgres://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}",
        min_size=10,
        max_size=10,
        max_queries=5000,
        max_inactive_connection_lifetime=300.0,
    )


async def main1():
    connection_string = (
        f"dbname='{DB}' user='{USER}' password='{PASSWORD}' host='{HOST}' port={PORT}"
    )
    pool = AsyncConnectionPool(connection_string, open=False)
    await pool.open()

    try:
        conn = await pool.getconn(timeout=10)
        async with conn.cursor() as curs:
            try:
                await curs.execute('SET search_path = ag_catalog, "$user", public')
                await curs.execute("SELECT create_graph('dickens-2')")
                await conn.commit()
                print("create_graph success")
            except (
                psycopg.errors.InvalidSchemaName,
                psycopg.errors.UniqueViolation,
            ):
                print("create_graph already exists")
                await conn.rollback()
    finally:
        pass


db = PostgreSQLDB(
    config={
        "host": "localhost",
        "port": 15432,
        "user": "rag",
        "password": "rag",
        "database": "r1",
    }
)


async def query_with_age():
    await db.initdb()
    graph = PGGraphStorage(
        namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION,
        global_config={},
        embedding_func=None,
    )
    graph.db = db
    res = await graph.get_node('"A CHRISTMAS CAROL"')
    print("Node is: ", res)
    res = await graph.get_edge('"A CHRISTMAS CAROL"', "PROJECT GUTENBERG")
    print("Edge is: ", res)
    res = await graph.get_node_edges('"SCROOGE"')
    print("Node Edges are: ", res)


async def create_edge_with_age():
    await db.initdb()
    graph = PGGraphStorage(
        namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION,
        global_config={},
        embedding_func=None,
    )
    graph.db = db
    await graph.upsert_node('"THE CRATCHITS"', {"hello": "world"})
    await graph.upsert_node('"THE GIRLS"', {"world": "hello"})
    await graph.upsert_edge(
        '"THE CRATCHITS"',
        '"THE GIRLS"',
        edge_data={
            "weight": 7.0,
            "description": '"The girls are part of the Cratchit family, contributing to their collective efforts and shared experiences.',
            "keywords": '"family, collective effort"',
            "source_id": "chunk-1d4b58de5429cd1261370c231c8673e8",
        },
    )
    res = await graph.get_edge("THE CRATCHITS", '"THE GIRLS"')
    print("Edge is: ", res)


async def main():
    pool = await get_pool()
    sql = r"SELECT * FROM ag_catalog.cypher('dickens', $$ MATCH (n:帅哥) RETURN n $$) AS (n ag_catalog.agtype)"
    # cypher = "MATCH (n:how_are_you_doing) RETURN n"
    async with pool.acquire() as conn:
        try:
            await conn.execute(
                """SET search_path = ag_catalog, "$user", public;select create_graph('dickens')"""
            )
        except asyncpg.exceptions.InvalidSchemaNameError:
            print("create_graph already exists")
        # stmt = await conn.prepare(sql)
        row = await conn.fetch(sql)
        print("row is: ", row)

        row = await conn.fetchrow("select '100'::int + 200 as result")
        print(row)  # <Record result=300>


if __name__ == "__main__":
    asyncio.run(query_with_age())