Spaces:
Runtime error
Runtime error
from __future__ import annotations | |
import logging | |
import re | |
from typing import TYPE_CHECKING, Any, List, Optional, Pattern | |
from urllib.parse import urlparse | |
import numpy as np | |
logger = logging.getLogger(__name__) | |
if TYPE_CHECKING: | |
from redis.client import Redis as RedisType | |
def _array_to_buffer(array: List[float], dtype: Any = np.float32) -> bytes: | |
return np.array(array).astype(dtype).tobytes() | |
def _buffer_to_array(buffer: bytes, dtype: Any = np.float32) -> List[float]: | |
return np.frombuffer(buffer, dtype=dtype).tolist() | |
class TokenEscaper: | |
""" | |
Escape punctuation within an input string. | |
""" | |
# Characters that RediSearch requires us to escape during queries. | |
# Source: https://redis.io/docs/stack/search/reference/escaping/#the-rules-of-text-field-tokenization | |
DEFAULT_ESCAPED_CHARS = r"[,.<>{}\[\]\\\"\':;!@#$%^&*()\-+=~\/ ]" | |
def __init__(self, escape_chars_re: Optional[Pattern] = None): | |
if escape_chars_re: | |
self.escaped_chars_re = escape_chars_re | |
else: | |
self.escaped_chars_re = re.compile(self.DEFAULT_ESCAPED_CHARS) | |
def escape(self, value: str) -> str: | |
if not isinstance(value, str): | |
raise TypeError( | |
"Value must be a string object for token escaping." | |
f"Got type {type(value)}" | |
) | |
def escape_symbol(match: re.Match) -> str: | |
value = match.group(0) | |
return f"\\{value}" | |
return self.escaped_chars_re.sub(escape_symbol, value) | |
def check_redis_module_exist(client: RedisType, required_modules: List[dict]) -> None: | |
"""Check if the correct Redis modules are installed.""" | |
installed_modules = client.module_list() | |
installed_modules = { | |
module[b"name"].decode("utf-8"): module for module in installed_modules | |
} | |
for module in required_modules: | |
if module["name"] in installed_modules and int( | |
installed_modules[module["name"]][b"ver"] | |
) >= int(module["ver"]): | |
return | |
# otherwise raise error | |
error_message = ( | |
"Redis cannot be used as a vector database without RediSearch >=2.4" | |
"Please head to https://redis.io/docs/stack/search/quick_start/" | |
"to know more about installing the RediSearch module within Redis Stack." | |
) | |
logger.error(error_message) | |
raise ValueError(error_message) | |
def get_client(redis_url: str, **kwargs: Any) -> RedisType: | |
"""Get a redis client from the connection url given. This helper accepts | |
urls for Redis server (TCP with/without TLS or UnixSocket) as well as | |
Redis Sentinel connections. | |
Redis Cluster is not supported. | |
Before creating a connection the existence of the database driver is checked | |
an and ValueError raised otherwise | |
To use, you should have the ``redis`` python package installed. | |
Example: | |
.. code-block:: python | |
from langchain.utilities.redis import get_client | |
redis_client = get_client( | |
redis_url="redis://username:password@localhost:6379" | |
index_name="my-index", | |
embedding_function=embeddings.embed_query, | |
) | |
To use a redis replication setup with multiple redis server and redis sentinels | |
set "redis_url" to "redis+sentinel://" scheme. With this url format a path is | |
needed holding the name of the redis service within the sentinels to get the | |
correct redis server connection. The default service name is "mymaster". The | |
optional second part of the path is the redis db number to connect to. | |
An optional username or password is used for booth connections to the rediserver | |
and the sentinel, different passwords for server and sentinel are not supported. | |
And as another constraint only one sentinel instance can be given: | |
Example: | |
.. code-block:: python | |
from langchain.utilities.redis import get_client | |
redis_client = get_client( | |
redis_url="redis+sentinel://username:password@sentinelhost:26379/mymaster/0" | |
index_name="my-index", | |
embedding_function=embeddings.embed_query, | |
) | |
""" | |
# Initialize with necessary components. | |
try: | |
import redis | |
except ImportError: | |
raise ImportError( | |
"Could not import redis python package. " | |
"Please install it with `pip install redis>=4.1.0`." | |
) | |
# check if normal redis:// or redis+sentinel:// url | |
if redis_url.startswith("redis+sentinel"): | |
redis_client = _redis_sentinel_client(redis_url, **kwargs) | |
elif redis_url.startswith("rediss+sentinel"): # sentinel with TLS support enables | |
kwargs["ssl"] = True | |
if "ssl_cert_reqs" not in kwargs: | |
kwargs["ssl_cert_reqs"] = "none" | |
redis_client = _redis_sentinel_client(redis_url, **kwargs) | |
else: | |
# connect to redis server from url, reconnect with cluster client if needed | |
redis_client = redis.from_url(redis_url, **kwargs) | |
if _check_for_cluster(redis_client): | |
redis_client.close() | |
redis_client = _redis_cluster_client(redis_url, **kwargs) | |
return redis_client | |
def _redis_sentinel_client(redis_url: str, **kwargs: Any) -> RedisType: | |
"""helper method to parse an (un-official) redis+sentinel url | |
and create a Sentinel connection to fetch the final redis client | |
connection to a replica-master for read-write operations. | |
If username and/or password for authentication is given the | |
same credentials are used for the Redis Sentinel as well as Redis Server. | |
With this implementation using a redis url only it is not possible | |
to use different data for authentication on booth systems. | |
""" | |
import redis | |
parsed_url = urlparse(redis_url) | |
# sentinel needs list with (host, port) tuple, use default port if none available | |
sentinel_list = [(parsed_url.hostname or "localhost", parsed_url.port or 26379)] | |
if parsed_url.path: | |
# "/mymaster/0" first part is service name, optional second part is db number | |
path_parts = parsed_url.path.split("/") | |
service_name = path_parts[1] or "mymaster" | |
if len(path_parts) > 2: | |
kwargs["db"] = path_parts[2] | |
else: | |
service_name = "mymaster" | |
sentinel_args = {} | |
if parsed_url.password: | |
sentinel_args["password"] = parsed_url.password | |
kwargs["password"] = parsed_url.password | |
if parsed_url.username: | |
sentinel_args["username"] = parsed_url.username | |
kwargs["username"] = parsed_url.username | |
# check for all SSL related properties and copy them into sentinel_kwargs too, | |
# add client_name also | |
for arg in kwargs: | |
if arg.startswith("ssl") or arg == "client_name": | |
sentinel_args[arg] = kwargs[arg] | |
# sentinel user/pass is part of sentinel_kwargs, user/pass for redis server | |
# connection as direct parameter in kwargs | |
sentinel_client = redis.sentinel.Sentinel( | |
sentinel_list, sentinel_kwargs=sentinel_args, **kwargs | |
) | |
# redis server might have password but not sentinel - fetch this error and try | |
# again without pass, everything else cannot be handled here -> user needed | |
try: | |
sentinel_client.execute_command("ping") | |
except redis.exceptions.AuthenticationError as ae: | |
if "no password is set" in ae.args[0]: | |
logger.warning( | |
"Redis sentinel connection configured with password but Sentinel \ | |
answered NO PASSWORD NEEDED - Please check Sentinel configuration" | |
) | |
sentinel_client = redis.sentinel.Sentinel(sentinel_list, **kwargs) | |
else: | |
raise ae | |
return sentinel_client.master_for(service_name) | |
def _check_for_cluster(redis_client: RedisType) -> bool: | |
import redis | |
try: | |
cluster_info = redis_client.info("cluster") | |
return cluster_info["cluster_enabled"] == 1 | |
except redis.exceptions.RedisError: | |
return False | |
def _redis_cluster_client(redis_url: str, **kwargs: Any) -> RedisType: | |
from redis.cluster import RedisCluster | |
return RedisCluster.from_url(redis_url, **kwargs) | |