Spaces:
Runtime error
Runtime error
File size: 5,221 Bytes
129cd69 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
from __future__ import annotations
import uuid
from abc import ABC, abstractmethod
from typing import List, Optional, Sequence
NAMESPACE_UUID = uuid.UUID(int=1984)
class RecordManager(ABC):
"""An abstract base class representing the interface for a record manager."""
def __init__(
self,
namespace: str,
) -> None:
"""Initialize the record manager.
Args:
namespace (str): The namespace for the record manager.
"""
self.namespace = namespace
@abstractmethod
def create_schema(self) -> None:
"""Create the database schema for the record manager."""
@abstractmethod
async def acreate_schema(self) -> None:
"""Create the database schema for the record manager."""
@abstractmethod
def get_time(self) -> float:
"""Get the current server time as a high resolution timestamp!
It's important to get this from the server to ensure a monotonic clock,
otherwise there may be data loss when cleaning up old documents!
Returns:
The current server time as a float timestamp.
"""
@abstractmethod
async def aget_time(self) -> float:
"""Get the current server time as a high resolution timestamp!
It's important to get this from the server to ensure a monotonic clock,
otherwise there may be data loss when cleaning up old documents!
Returns:
The current server time as a float timestamp.
"""
@abstractmethod
def update(
self,
keys: Sequence[str],
*,
group_ids: Optional[Sequence[Optional[str]]] = None,
time_at_least: Optional[float] = None,
) -> None:
"""Upsert records into the database.
Args:
keys: A list of record keys to upsert.
group_ids: A list of group IDs corresponding to the keys.
time_at_least: if provided, updates should only happen if the
updated_at field is at least this time.
Raises:
ValueError: If the length of keys doesn't match the length of group_ids.
"""
@abstractmethod
async def aupdate(
self,
keys: Sequence[str],
*,
group_ids: Optional[Sequence[Optional[str]]] = None,
time_at_least: Optional[float] = None,
) -> None:
"""Upsert records into the database.
Args:
keys: A list of record keys to upsert.
group_ids: A list of group IDs corresponding to the keys.
time_at_least: if provided, updates should only happen if the
updated_at field is at least this time.
Raises:
ValueError: If the length of keys doesn't match the length of group_ids.
"""
@abstractmethod
def exists(self, keys: Sequence[str]) -> List[bool]:
"""Check if the provided keys exist in the database.
Args:
keys: A list of keys to check.
Returns:
A list of boolean values indicating the existence of each key.
"""
@abstractmethod
async def aexists(self, keys: Sequence[str]) -> List[bool]:
"""Check if the provided keys exist in the database.
Args:
keys: A list of keys to check.
Returns:
A list of boolean values indicating the existence of each key.
"""
@abstractmethod
def list_keys(
self,
*,
before: Optional[float] = None,
after: Optional[float] = None,
group_ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
) -> List[str]:
"""List records in the database based on the provided filters.
Args:
before: Filter to list records updated before this time.
after: Filter to list records updated after this time.
group_ids: Filter to list records with specific group IDs.
limit: optional limit on the number of records to return.
Returns:
A list of keys for the matching records.
"""
@abstractmethod
async def alist_keys(
self,
*,
before: Optional[float] = None,
after: Optional[float] = None,
group_ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
) -> List[str]:
"""List records in the database based on the provided filters.
Args:
before: Filter to list records updated before this time.
after: Filter to list records updated after this time.
group_ids: Filter to list records with specific group IDs.
limit: optional limit on the number of records to return.
Returns:
A list of keys for the matching records.
"""
@abstractmethod
def delete_keys(self, keys: Sequence[str]) -> None:
"""Delete specified records from the database.
Args:
keys: A list of keys to delete.
"""
@abstractmethod
async def adelete_keys(self, keys: Sequence[str]) -> None:
"""Delete specified records from the database.
Args:
keys: A list of keys to delete.
"""
|