GraphGen / graphgen /models /storage /base_storage.py
chenzihong-gavin
init
acd7cf4
from dataclasses import dataclass
from typing import Union, Generic, TypeVar
from graphgen.models.embed.embedding import EmbeddingFunc
T = TypeVar("T")
@dataclass
class StorageNameSpace:
working_dir: str = None
namespace: str = None
async def index_done_callback(self):
"""commit the storage operations after indexing"""
async def query_done_callback(self):
"""commit the storage operations after querying"""
@dataclass
class BaseKVStorage(Generic[T], StorageNameSpace):
embedding_func: EmbeddingFunc = None
async def all_keys(self) -> list[str]:
raise NotImplementedError
async def get_by_id(self, id: str) -> Union[T, None]:
raise NotImplementedError
async def get_by_ids(
self, ids: list[str], fields: Union[set[str], None] = None
) -> list[Union[T, None]]:
raise NotImplementedError
async def filter_keys(self, data: list[str]) -> set[str]:
"""return un-exist keys"""
raise NotImplementedError
async def upsert(self, data: dict[str, T]):
raise NotImplementedError
async def drop(self):
raise NotImplementedError
@dataclass
class BaseGraphStorage(StorageNameSpace):
embedding_func: EmbeddingFunc = None
async def has_node(self, node_id: str) -> bool:
raise NotImplementedError
async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
raise NotImplementedError
async def node_degree(self, node_id: str) -> int:
raise NotImplementedError
async def edge_degree(self, src_id: str, tgt_id: str) -> int:
raise NotImplementedError
async def get_node(self, node_id: str) -> Union[dict, None]:
raise NotImplementedError
async def update_node(self, node_id: str, node_data: dict[str, str]):
raise NotImplementedError
async def get_all_nodes(self) -> Union[list[dict], None]:
raise NotImplementedError
async def get_edge(
self, source_node_id: str, target_node_id: str
) -> Union[dict, None]:
raise NotImplementedError
async def update_edge(self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]):
raise NotImplementedError
async def get_all_edges(self) -> Union[list[dict], None]:
raise NotImplementedError
async def get_node_edges(
self, source_node_id: str
) -> Union[list[tuple[str, str]], None]:
raise NotImplementedError
async def upsert_node(self, node_id: str, node_data: dict[str, str]):
raise NotImplementedError
async def upsert_edge(
self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
):
raise NotImplementedError
async def delete_node(self, node_id: str):
raise NotImplementedError