from typing import Any, Dict, List, Optional, Union from types import GeneratorType from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler from langchain.schema import AgentAction, AgentFinish, LLMResult class StreamingLLMCallbackHandler(AsyncCallbackHandler): """Callback handler for streaming LLM responses to a queue.""" def __init__(self, q): self.q = q def on_llm_new_token(self, token: str, **kwargs: Any) -> None: self.q.put(token) class SyncStreamingLLMCallbackHandler(BaseCallbackHandler): """Callback handler for streaming LLM responses to a queue.""" def __init__(self, q): self.q = q def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any ) -> None: """Do nothing.""" pass def on_llm_new_token(self, token: str, **kwargs: Any) -> None: self.q.put(token) def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: """Do nothing.""" pass def on_llm_error( self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any ) -> None: """Do nothing.""" pass def on_chain_start( self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any ) -> None: """Do nothing.""" pass def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None: """Do nothing.""" pass def on_chain_error( self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any ) -> None: """Do nothing.""" pass def on_tool_start( self, serialized: Dict[str, Any], input_str: str, **kwargs: Any, ) -> None: """Do nothing.""" pass def on_tool_end( self, output: str, color: Optional[str] = None, observation_prefix: Optional[str] = None, llm_prefix: Optional[str] = None, **kwargs: Any, ) -> None: """Do nothing.""" pass def on_tool_error( self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any ) -> None: """Do nothing.""" pass def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any: """Run on agent action.""" pass def on_agent_finish( self, finish: AgentFinish, color: Optional[str] = None, **kwargs: Any ) -> None: """Run on agent end.""" pass def concatenate_generators(*args): final_outputs = "" for g in args: if isinstance(g, GeneratorType): for v in g: yield final_outputs + v result = v else: yield final_outputs + g result = g final_outputs += result