Spaces:
Runtime error
Runtime error
| """Wrapper around Redis vector database.""" | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import uuid | |
| from typing import Any, Callable, Iterable, List, Mapping, Optional | |
| import numpy as np | |
| from redis.client import Redis as RedisType | |
| from langchain.docstore.document import Document | |
| from langchain.embeddings.base import Embeddings | |
| from langchain.utils import get_from_dict_or_env | |
| from langchain.vectorstores.base import VectorStore | |
| logger = logging.getLogger() | |
| def _check_redis_module_exist(client: RedisType, module: str) -> bool: | |
| return module in [m["name"] for m in client.info().get("modules", {"name": ""})] | |
| class Redis(VectorStore): | |
| def __init__( | |
| self, | |
| redis_url: str, | |
| index_name: str, | |
| embedding_function: Callable, | |
| **kwargs: Any, | |
| ): | |
| """Initialize with necessary components.""" | |
| try: | |
| import redis | |
| except ImportError: | |
| raise ValueError( | |
| "Could not import redis python package. " | |
| "Please install it with `pip install redis`." | |
| ) | |
| self.embedding_function = embedding_function | |
| self.index_name = index_name | |
| try: | |
| redis_client = redis.from_url(redis_url, **kwargs) | |
| except ValueError as e: | |
| raise ValueError(f"Your redis connected error: {e}") | |
| # check if redis add redisearch module | |
| if not _check_redis_module_exist(redis_client, "search"): | |
| raise ValueError( | |
| "Could not use redis directly, you need to add search module" | |
| "Please refer [RediSearch](https://redis.io/docs/stack/search/quick_start/)" # noqa | |
| ) | |
| self.client = redis_client | |
| def add_texts( | |
| self, | |
| texts: Iterable[str], | |
| metadatas: Optional[List[dict]] = None, | |
| **kwargs: Any, | |
| ) -> List[str]: | |
| # `prefix`: Maybe in the future we can let the user choose the index_name. | |
| prefix = "doc" # prefix for the document keys | |
| ids = [] | |
| # Check if index exists | |
| for i, text in enumerate(texts): | |
| key = f"{prefix}:{uuid.uuid4().hex}" | |
| metadata = metadatas[i] if metadatas else {} | |
| self.client.hset( | |
| key, | |
| mapping={ | |
| "content": text, | |
| "content_vector": np.array( | |
| self.embedding_function(text), dtype=np.float32 | |
| ).tobytes(), | |
| "metadata": json.dumps(metadata), | |
| }, | |
| ) | |
| ids.append(key) | |
| return ids | |
| def similarity_search( | |
| self, query: str, k: int = 4, **kwargs: Any | |
| ) -> List[Document]: | |
| try: | |
| from redis.commands.search.query import Query | |
| except ImportError: | |
| raise ValueError( | |
| "Could not import redis python package. " | |
| "Please install it with `pip install redis`." | |
| ) | |
| # Creates embedding vector from user query | |
| embedding = self.embedding_function(query) | |
| # Prepare the Query | |
| return_fields = ["metadata", "content", "vector_score"] | |
| vector_field = "content_vector" | |
| hybrid_fields = "*" | |
| base_query = ( | |
| f"{hybrid_fields}=>[KNN {k} @{vector_field} $vector AS vector_score]" | |
| ) | |
| redis_query = ( | |
| Query(base_query) | |
| .return_fields(*return_fields) | |
| .sort_by("vector_score") | |
| .paging(0, k) | |
| .dialect(2) | |
| ) | |
| params_dict: Mapping[str, str] = { | |
| "vector": np.array(embedding) # type: ignore | |
| .astype(dtype=np.float32) | |
| .tobytes() | |
| } | |
| # perform vector search | |
| results = self.client.ft(self.index_name).search(redis_query, params_dict) | |
| documents = [ | |
| Document(page_content=result.content, metadata=json.loads(result.metadata)) | |
| for result in results.docs | |
| ] | |
| return documents | |
| def from_texts( | |
| cls, | |
| texts: List[str], | |
| embedding: Embeddings, | |
| metadatas: Optional[List[dict]] = None, | |
| index_name: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> Redis: | |
| """Construct RediSearch wrapper from raw documents. | |
| This is a user-friendly interface that: | |
| 1. Embeds documents. | |
| 2. Creates a new index for the embeddings in the RediSearch instance. | |
| 3. Adds the documents to the newly created RediSearch index. | |
| This is intended to be a quick way to get started. | |
| Example: | |
| .. code-block:: python | |
| from langchain import RediSearch | |
| from langchain.embeddings import OpenAIEmbeddings | |
| embeddings = OpenAIEmbeddings() | |
| redisearch = RediSearch.from_texts( | |
| texts, | |
| embeddings, | |
| redis_url="redis://username:password@localhost:6379" | |
| ) | |
| """ | |
| redis_url = get_from_dict_or_env(kwargs, "redis_url", "REDIS_URL") | |
| try: | |
| import redis | |
| from redis.commands.search.field import TextField, VectorField | |
| from redis.commands.search.indexDefinition import IndexDefinition, IndexType | |
| except ImportError: | |
| raise ValueError( | |
| "Could not import redis python package. " | |
| "Please install it with `pip install redis`." | |
| ) | |
| try: | |
| # We need to first remove redis_url from kwargs, | |
| # otherwise passing it to Redis will result in an error. | |
| kwargs.pop("redis_url") | |
| client = redis.from_url(url=redis_url, **kwargs) | |
| except ValueError as e: | |
| raise ValueError(f"Your redis connected error: {e}") | |
| # check if redis add redisearch module | |
| if not _check_redis_module_exist(client, "search"): | |
| raise ValueError( | |
| "Could not use redis directly, you need to add search module" | |
| "Please refer [RediSearch](https://redis.io/docs/stack/search/quick_start/)" # noqa | |
| ) | |
| embeddings = embedding.embed_documents(texts) | |
| dim = len(embeddings[0]) | |
| # Constants | |
| vector_number = len(embeddings) # initial number of vectors | |
| # name of the search index if not given | |
| if not index_name: | |
| index_name = uuid.uuid4().hex | |
| prefix = f"doc:{index_name}" # prefix for the document keys | |
| distance_metric = ( | |
| "COSINE" # distance metric for the vectors (ex. COSINE, IP, L2) | |
| ) | |
| content = TextField(name="content") | |
| metadata = TextField(name="metadata") | |
| content_embedding = VectorField( | |
| "content_vector", | |
| "FLAT", | |
| { | |
| "TYPE": "FLOAT32", | |
| "DIM": dim, | |
| "DISTANCE_METRIC": distance_metric, | |
| "INITIAL_CAP": vector_number, | |
| }, | |
| ) | |
| fields = [content, metadata, content_embedding] | |
| # Check if index exists | |
| try: | |
| client.ft(index_name).info() | |
| logger.info("Index already exists") | |
| except: # noqa | |
| # Create Redis Index | |
| client.ft(index_name).create_index( | |
| fields=fields, | |
| definition=IndexDefinition(prefix=[prefix], index_type=IndexType.HASH), | |
| ) | |
| pipeline = client.pipeline() | |
| for i, text in enumerate(texts): | |
| key = f"{prefix}:{i}" | |
| metadata = metadatas[i] if metadatas else {} | |
| pipeline.hset( | |
| key, | |
| mapping={ | |
| "content": text, | |
| "content_vector": np.array( | |
| embeddings[i], dtype=np.float32 | |
| ).tobytes(), | |
| "metadata": json.dumps(metadata), | |
| }, | |
| ) | |
| pipeline.execute() | |
| return cls(redis_url, index_name, embedding.embed_query) | |
| def drop_index( | |
| index_name: str, | |
| delete_documents: bool, | |
| **kwargs: Any, | |
| ) -> bool: | |
| redis_url = get_from_dict_or_env(kwargs, "redis_url", "REDIS_URL") | |
| try: | |
| import redis | |
| except ImportError: | |
| raise ValueError( | |
| "Could not import redis python package. " | |
| "Please install it with `pip install redis`." | |
| ) | |
| try: | |
| # We need to first remove redis_url from kwargs, | |
| # otherwise passing it to Redis will result in an error. | |
| kwargs.pop("redis_url") | |
| client = redis.from_url(url=redis_url, **kwargs) | |
| except ValueError as e: | |
| raise ValueError(f"Your redis connected error: {e}") | |
| # Check if index exists | |
| try: | |
| client.ft(index_name).dropindex(delete_documents) | |
| logger.info("Drop index") | |
| return True | |
| except: # noqa | |
| # Index not exist | |
| return False | |
| def from_existing_index( | |
| cls, | |
| embedding: Embeddings, | |
| index_name: str, | |
| **kwargs: Any, | |
| ) -> Redis: | |
| redis_url = get_from_dict_or_env(kwargs, "redis_url", "REDIS_URL") | |
| try: | |
| import redis | |
| except ImportError: | |
| raise ValueError( | |
| "Could not import redis python package. " | |
| "Please install it with `pip install redis`." | |
| ) | |
| try: | |
| # We need to first remove redis_url from kwargs, | |
| # otherwise passing it to Redis will result in an error. | |
| kwargs.pop("redis_url") | |
| client = redis.from_url(url=redis_url, **kwargs) | |
| except ValueError as e: | |
| raise ValueError(f"Your redis connected error: {e}") | |
| # check if redis add redisearch module | |
| if not _check_redis_module_exist(client, "search"): | |
| raise ValueError( | |
| "Could not use redis directly, you need to add search module" | |
| "Please refer [RediSearch](https://redis.io/docs/stack/search/quick_start/)" # noqa | |
| ) | |
| return cls(redis_url, index_name, embedding.embed_query) | |