|
from langchain.callbacks.base import BaseCallbackHandler |
|
|
|
class MyCustomSyncHandler(BaseCallbackHandler): |
|
def __init__(self, redisClient): |
|
self.message = '' |
|
self.redisClient = redisClient |
|
|
|
def on_llm_new_token(self, token: str, **kwargs) -> Any: |
|
self.message += token |
|
self.redisClient.publish(f'{kwargs["tags"][0]}', self.message) |
|
|
|
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any: |
|
self.redisClient.publish(f'{kwargs["tags"][0]}', 'end') |
|
|
|
def on_llm_error( |
|
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any |
|
) -> Any: |
|
self.redisClient.publish(f'{kwargs["tags"][0]}', 'end') |
|
|
|
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> Any: |
|
self.redisClient.publish(f'{kwargs["tags"][0]}', 'end') |
|
|