Spaces:
Runtime error
Runtime error
import logging | |
from abc import ABC, abstractmethod | |
from itertools import islice | |
from typing import Any, Dict, Iterable, List, Optional | |
from langchain_core.language_models import BaseLanguageModel | |
from langchain_core.messages import BaseMessage, get_buffer_string | |
from langchain_core.prompts import BasePromptTemplate | |
from langchain_core.pydantic_v1 import BaseModel, Field | |
from langchain.chains.llm import LLMChain | |
from langchain.memory.chat_memory import BaseChatMemory | |
from langchain.memory.prompt import ( | |
ENTITY_EXTRACTION_PROMPT, | |
ENTITY_SUMMARIZATION_PROMPT, | |
) | |
from langchain.memory.utils import get_prompt_input_key | |
from langchain.utilities.redis import get_client | |
logger = logging.getLogger(__name__) | |
class BaseEntityStore(BaseModel, ABC): | |
"""Abstract base class for Entity store.""" | |
def get(self, key: str, default: Optional[str] = None) -> Optional[str]: | |
"""Get entity value from store.""" | |
pass | |
def set(self, key: str, value: Optional[str]) -> None: | |
"""Set entity value in store.""" | |
pass | |
def delete(self, key: str) -> None: | |
"""Delete entity value from store.""" | |
pass | |
def exists(self, key: str) -> bool: | |
"""Check if entity exists in store.""" | |
pass | |
def clear(self) -> None: | |
"""Delete all entities from store.""" | |
pass | |
class InMemoryEntityStore(BaseEntityStore): | |
"""In-memory Entity store.""" | |
store: Dict[str, Optional[str]] = {} | |
def get(self, key: str, default: Optional[str] = None) -> Optional[str]: | |
return self.store.get(key, default) | |
def set(self, key: str, value: Optional[str]) -> None: | |
self.store[key] = value | |
def delete(self, key: str) -> None: | |
del self.store[key] | |
def exists(self, key: str) -> bool: | |
return key in self.store | |
def clear(self) -> None: | |
return self.store.clear() | |
class UpstashRedisEntityStore(BaseEntityStore): | |
"""Upstash Redis backed Entity store. | |
Entities get a TTL of 1 day by default, and | |
that TTL is extended by 3 days every time the entity is read back. | |
""" | |
def __init__( | |
self, | |
session_id: str = "default", | |
url: str = "", | |
token: str = "", | |
key_prefix: str = "memory_store", | |
ttl: Optional[int] = 60 * 60 * 24, | |
recall_ttl: Optional[int] = 60 * 60 * 24 * 3, | |
*args: Any, | |
**kwargs: Any, | |
): | |
try: | |
from upstash_redis import Redis | |
except ImportError: | |
raise ImportError( | |
"Could not import upstash_redis python package. " | |
"Please install it with `pip install upstash_redis`." | |
) | |
super().__init__(*args, **kwargs) | |
try: | |
self.redis_client = Redis(url=url, token=token) | |
except Exception: | |
logger.error("Upstash Redis instance could not be initiated.") | |
self.session_id = session_id | |
self.key_prefix = key_prefix | |
self.ttl = ttl | |
self.recall_ttl = recall_ttl or ttl | |
def full_key_prefix(self) -> str: | |
return f"{self.key_prefix}:{self.session_id}" | |
def get(self, key: str, default: Optional[str] = None) -> Optional[str]: | |
res = ( | |
self.redis_client.getex(f"{self.full_key_prefix}:{key}", ex=self.recall_ttl) | |
or default | |
or "" | |
) | |
logger.debug(f"Upstash Redis MEM get '{self.full_key_prefix}:{key}': '{res}'") | |
return res | |
def set(self, key: str, value: Optional[str]) -> None: | |
if not value: | |
return self.delete(key) | |
self.redis_client.set(f"{self.full_key_prefix}:{key}", value, ex=self.ttl) | |
logger.debug( | |
f"Redis MEM set '{self.full_key_prefix}:{key}': '{value}' EX {self.ttl}" | |
) | |
def delete(self, key: str) -> None: | |
self.redis_client.delete(f"{self.full_key_prefix}:{key}") | |
def exists(self, key: str) -> bool: | |
return self.redis_client.exists(f"{self.full_key_prefix}:{key}") == 1 | |
def clear(self) -> None: | |
def scan_and_delete(cursor: int) -> int: | |
cursor, keys_to_delete = self.redis_client.scan( | |
cursor, f"{self.full_key_prefix}:*" | |
) | |
self.redis_client.delete(*keys_to_delete) | |
return cursor | |
cursor = scan_and_delete(0) | |
while cursor != 0: | |
scan_and_delete(cursor) | |
class RedisEntityStore(BaseEntityStore): | |
"""Redis-backed Entity store. | |
Entities get a TTL of 1 day by default, and | |
that TTL is extended by 3 days every time the entity is read back. | |
""" | |
redis_client: Any | |
session_id: str = "default" | |
key_prefix: str = "memory_store" | |
ttl: Optional[int] = 60 * 60 * 24 | |
recall_ttl: Optional[int] = 60 * 60 * 24 * 3 | |
def __init__( | |
self, | |
session_id: str = "default", | |
url: str = "redis://localhost:6379/0", | |
key_prefix: str = "memory_store", | |
ttl: Optional[int] = 60 * 60 * 24, | |
recall_ttl: Optional[int] = 60 * 60 * 24 * 3, | |
*args: Any, | |
**kwargs: Any, | |
): | |
try: | |
import redis | |
except ImportError: | |
raise ImportError( | |
"Could not import redis python package. " | |
"Please install it with `pip install redis`." | |
) | |
super().__init__(*args, **kwargs) | |
try: | |
self.redis_client = get_client(redis_url=url, decode_responses=True) | |
except redis.exceptions.ConnectionError as error: | |
logger.error(error) | |
self.session_id = session_id | |
self.key_prefix = key_prefix | |
self.ttl = ttl | |
self.recall_ttl = recall_ttl or ttl | |
def full_key_prefix(self) -> str: | |
return f"{self.key_prefix}:{self.session_id}" | |
def get(self, key: str, default: Optional[str] = None) -> Optional[str]: | |
res = ( | |
self.redis_client.getex(f"{self.full_key_prefix}:{key}", ex=self.recall_ttl) | |
or default | |
or "" | |
) | |
logger.debug(f"REDIS MEM get '{self.full_key_prefix}:{key}': '{res}'") | |
return res | |
def set(self, key: str, value: Optional[str]) -> None: | |
if not value: | |
return self.delete(key) | |
self.redis_client.set(f"{self.full_key_prefix}:{key}", value, ex=self.ttl) | |
logger.debug( | |
f"REDIS MEM set '{self.full_key_prefix}:{key}': '{value}' EX {self.ttl}" | |
) | |
def delete(self, key: str) -> None: | |
self.redis_client.delete(f"{self.full_key_prefix}:{key}") | |
def exists(self, key: str) -> bool: | |
return self.redis_client.exists(f"{self.full_key_prefix}:{key}") == 1 | |
def clear(self) -> None: | |
# iterate a list in batches of size batch_size | |
def batched(iterable: Iterable[Any], batch_size: int) -> Iterable[Any]: | |
iterator = iter(iterable) | |
while batch := list(islice(iterator, batch_size)): | |
yield batch | |
for keybatch in batched( | |
self.redis_client.scan_iter(f"{self.full_key_prefix}:*"), 500 | |
): | |
self.redis_client.delete(*keybatch) | |
class SQLiteEntityStore(BaseEntityStore): | |
"""SQLite-backed Entity store""" | |
session_id: str = "default" | |
table_name: str = "memory_store" | |
def __init__( | |
self, | |
session_id: str = "default", | |
db_file: str = "entities.db", | |
table_name: str = "memory_store", | |
*args: Any, | |
**kwargs: Any, | |
): | |
try: | |
import sqlite3 | |
except ImportError: | |
raise ImportError( | |
"Could not import sqlite3 python package. " | |
"Please install it with `pip install sqlite3`." | |
) | |
super().__init__(*args, **kwargs) | |
self.conn = sqlite3.connect(db_file) | |
self.session_id = session_id | |
self.table_name = table_name | |
self._create_table_if_not_exists() | |
def full_table_name(self) -> str: | |
return f"{self.table_name}_{self.session_id}" | |
def _create_table_if_not_exists(self) -> None: | |
create_table_query = f""" | |
CREATE TABLE IF NOT EXISTS {self.full_table_name} ( | |
key TEXT PRIMARY KEY, | |
value TEXT | |
) | |
""" | |
with self.conn: | |
self.conn.execute(create_table_query) | |
def get(self, key: str, default: Optional[str] = None) -> Optional[str]: | |
query = f""" | |
SELECT value | |
FROM {self.full_table_name} | |
WHERE key = ? | |
""" | |
cursor = self.conn.execute(query, (key,)) | |
result = cursor.fetchone() | |
if result is not None: | |
value = result[0] | |
return value | |
return default | |
def set(self, key: str, value: Optional[str]) -> None: | |
if not value: | |
return self.delete(key) | |
query = f""" | |
INSERT OR REPLACE INTO {self.full_table_name} (key, value) | |
VALUES (?, ?) | |
""" | |
with self.conn: | |
self.conn.execute(query, (key, value)) | |
def delete(self, key: str) -> None: | |
query = f""" | |
DELETE FROM {self.full_table_name} | |
WHERE key = ? | |
""" | |
with self.conn: | |
self.conn.execute(query, (key,)) | |
def exists(self, key: str) -> bool: | |
query = f""" | |
SELECT 1 | |
FROM {self.full_table_name} | |
WHERE key = ? | |
LIMIT 1 | |
""" | |
cursor = self.conn.execute(query, (key,)) | |
result = cursor.fetchone() | |
return result is not None | |
def clear(self) -> None: | |
query = f""" | |
DELETE FROM {self.full_table_name} | |
""" | |
with self.conn: | |
self.conn.execute(query) | |
class ConversationEntityMemory(BaseChatMemory): | |
"""Entity extractor & summarizer memory. | |
Extracts named entities from the recent chat history and generates summaries. | |
With a swappable entity store, persisting entities across conversations. | |
Defaults to an in-memory entity store, and can be swapped out for a Redis, | |
SQLite, or other entity store. | |
""" | |
human_prefix: str = "Human" | |
ai_prefix: str = "AI" | |
llm: BaseLanguageModel | |
entity_extraction_prompt: BasePromptTemplate = ENTITY_EXTRACTION_PROMPT | |
entity_summarization_prompt: BasePromptTemplate = ENTITY_SUMMARIZATION_PROMPT | |
# Cache of recently detected entity names, if any | |
# It is updated when load_memory_variables is called: | |
entity_cache: List[str] = [] | |
# Number of recent message pairs to consider when updating entities: | |
k: int = 3 | |
chat_history_key: str = "history" | |
# Store to manage entity-related data: | |
entity_store: BaseEntityStore = Field(default_factory=InMemoryEntityStore) | |
def buffer(self) -> List[BaseMessage]: | |
"""Access chat memory messages.""" | |
return self.chat_memory.messages | |
def memory_variables(self) -> List[str]: | |
"""Will always return list of memory variables. | |
:meta private: | |
""" | |
return ["entities", self.chat_history_key] | |
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Returns chat history and all generated entities with summaries if available, | |
and updates or clears the recent entity cache. | |
New entity name can be found when calling this method, before the entity | |
summaries are generated, so the entity cache values may be empty if no entity | |
descriptions are generated yet. | |
""" | |
# Create an LLMChain for predicting entity names from the recent chat history: | |
chain = LLMChain(llm=self.llm, prompt=self.entity_extraction_prompt) | |
if self.input_key is None: | |
prompt_input_key = get_prompt_input_key(inputs, self.memory_variables) | |
else: | |
prompt_input_key = self.input_key | |
# Extract an arbitrary window of the last message pairs from | |
# the chat history, where the hyperparameter k is the | |
# number of message pairs: | |
buffer_string = get_buffer_string( | |
self.buffer[-self.k * 2 :], | |
human_prefix=self.human_prefix, | |
ai_prefix=self.ai_prefix, | |
) | |
# Generates a comma-separated list of named entities, | |
# e.g. "Jane, White House, UFO" | |
# or "NONE" if no named entities are extracted: | |
output = chain.predict( | |
history=buffer_string, | |
input=inputs[prompt_input_key], | |
) | |
# If no named entities are extracted, assigns an empty list. | |
if output.strip() == "NONE": | |
entities = [] | |
else: | |
# Make a list of the extracted entities: | |
entities = [w.strip() for w in output.split(",")] | |
# Make a dictionary of entities with summary if exists: | |
entity_summaries = {} | |
for entity in entities: | |
entity_summaries[entity] = self.entity_store.get(entity, "") | |
# Replaces the entity name cache with the most recently discussed entities, | |
# or if no entities were extracted, clears the cache: | |
self.entity_cache = entities | |
# Should we return as message objects or as a string? | |
if self.return_messages: | |
# Get last `k` pair of chat messages: | |
buffer: Any = self.buffer[-self.k * 2 :] | |
else: | |
# Reuse the string we made earlier: | |
buffer = buffer_string | |
return { | |
self.chat_history_key: buffer, | |
"entities": entity_summaries, | |
} | |
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None: | |
""" | |
Save context from this conversation history to the entity store. | |
Generates a summary for each entity in the entity cache by prompting | |
the model, and saves these summaries to the entity store. | |
""" | |
super().save_context(inputs, outputs) | |
if self.input_key is None: | |
prompt_input_key = get_prompt_input_key(inputs, self.memory_variables) | |
else: | |
prompt_input_key = self.input_key | |
# Extract an arbitrary window of the last message pairs from | |
# the chat history, where the hyperparameter k is the | |
# number of message pairs: | |
buffer_string = get_buffer_string( | |
self.buffer[-self.k * 2 :], | |
human_prefix=self.human_prefix, | |
ai_prefix=self.ai_prefix, | |
) | |
input_data = inputs[prompt_input_key] | |
# Create an LLMChain for predicting entity summarization from the context | |
chain = LLMChain(llm=self.llm, prompt=self.entity_summarization_prompt) | |
# Generate new summaries for entities and save them in the entity store | |
for entity in self.entity_cache: | |
# Get existing summary if it exists | |
existing_summary = self.entity_store.get(entity, "") | |
output = chain.predict( | |
summary=existing_summary, | |
entity=entity, | |
history=buffer_string, | |
input=input_data, | |
) | |
# Save the updated summary to the entity store | |
self.entity_store.set(entity, output.strip()) | |
def clear(self) -> None: | |
"""Clear memory contents.""" | |
self.chat_memory.clear() | |
self.entity_cache.clear() | |
self.entity_store.clear() | |