Spaces:
Running
Running
from dataclasses import dataclass | |
from typing import Union, Generic, TypeVar | |
from graphgen.models.embed.embedding import EmbeddingFunc | |
T = TypeVar("T") | |
class StorageNameSpace: | |
working_dir: str = None | |
namespace: str = None | |
async def index_done_callback(self): | |
"""commit the storage operations after indexing""" | |
async def query_done_callback(self): | |
"""commit the storage operations after querying""" | |
class BaseKVStorage(Generic[T], StorageNameSpace): | |
embedding_func: EmbeddingFunc = None | |
async def all_keys(self) -> list[str]: | |
raise NotImplementedError | |
async def get_by_id(self, id: str) -> Union[T, None]: | |
raise NotImplementedError | |
async def get_by_ids( | |
self, ids: list[str], fields: Union[set[str], None] = None | |
) -> list[Union[T, None]]: | |
raise NotImplementedError | |
async def filter_keys(self, data: list[str]) -> set[str]: | |
"""return un-exist keys""" | |
raise NotImplementedError | |
async def upsert(self, data: dict[str, T]): | |
raise NotImplementedError | |
async def drop(self): | |
raise NotImplementedError | |
class BaseGraphStorage(StorageNameSpace): | |
embedding_func: EmbeddingFunc = None | |
async def has_node(self, node_id: str) -> bool: | |
raise NotImplementedError | |
async def has_edge(self, source_node_id: str, target_node_id: str) -> bool: | |
raise NotImplementedError | |
async def node_degree(self, node_id: str) -> int: | |
raise NotImplementedError | |
async def edge_degree(self, src_id: str, tgt_id: str) -> int: | |
raise NotImplementedError | |
async def get_node(self, node_id: str) -> Union[dict, None]: | |
raise NotImplementedError | |
async def update_node(self, node_id: str, node_data: dict[str, str]): | |
raise NotImplementedError | |
async def get_all_nodes(self) -> Union[list[dict], None]: | |
raise NotImplementedError | |
async def get_edge( | |
self, source_node_id: str, target_node_id: str | |
) -> Union[dict, None]: | |
raise NotImplementedError | |
async def update_edge(self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]): | |
raise NotImplementedError | |
async def get_all_edges(self) -> Union[list[dict], None]: | |
raise NotImplementedError | |
async def get_node_edges( | |
self, source_node_id: str | |
) -> Union[list[tuple[str, str]], None]: | |
raise NotImplementedError | |
async def upsert_node(self, node_id: str, node_data: dict[str, str]): | |
raise NotImplementedError | |
async def upsert_edge( | |
self, source_node_id: str, target_node_id: str, edge_data: dict[str, str] | |
): | |
raise NotImplementedError | |
async def delete_node(self, node_id: str): | |
raise NotImplementedError | |