Gemini
feat: add detailed logging
01d5a5d
"""
Chat service for handling different types of chat interactions
"""
import logging
from typing import Optional, List, Dict, Any, Union, Iterator, Type
import uuid
from typing import Tuple
from datetime import datetime
from lpm_kernel.api.services.user_llm_config_service import UserLLMConfigService
from lpm_kernel.api.domains.kernel2.dto.chat_dto import ChatRequest
from lpm_kernel.api.services.local_llm_service import local_llm_service
from lpm_kernel.api.domains.kernel2.services.message_builder import MultiTurnMessageBuilder
from lpm_kernel.api.domains.kernel2.services.prompt_builder import (
SystemPromptStrategy,
BasePromptStrategy,
RoleBasedStrategy,
KnowledgeEnhancedStrategy,
)
logger = logging.getLogger(__name__)
class ChatService:
"""Chat service for handling different types of chat interactions"""
def __init__(self):
"""Initialize chat service"""
# Base strategy chain, must contain at least one base strategy
self.default_strategy_chain = [BasePromptStrategy, RoleBasedStrategy]
def _get_strategy_chain(
self,
request: ChatRequest,
strategy_chain: Optional[List[Type[SystemPromptStrategy]]] = None,
) -> List[Type[SystemPromptStrategy]]:
"""
Get the strategy chain to use for message building
Args:
request: Chat request containing message and other parameters
strategy_chain: Optional list of strategy classes to use
Returns:
List of strategy classes to use
Raises:
ValueError: If strategy_chain is empty or None and no default chain is available
"""
# If custom strategy chain is provided, validate and return
if strategy_chain is not None:
if not strategy_chain:
raise ValueError("Strategy chain cannot be empty")
if not any(issubclass(s, BasePromptStrategy) for s in strategy_chain):
raise ValueError("Strategy chain must contain at least one base strategy")
return strategy_chain
# Use default strategy chain
result_chain = self.default_strategy_chain.copy()
# Add knowledge enhancement strategy based on request parameters
if request.enable_l0_retrieval or request.enable_l1_retrieval:
result_chain.append(KnowledgeEnhancedStrategy)
return result_chain
def _build_messages(
self,
request: ChatRequest,
strategy_chain: Optional[List[Type[SystemPromptStrategy]]] = None,
) -> List[Dict[str, str]]:
"""
Build messages using the specified strategy chain
Args:
request: Chat request containing message and other parameters
strategy_chain: Optional list of strategy classes to use. If None, uses default chain
Returns:
List of message dictionaries
"""
# Get and validate strategy chain
final_strategy_chain = self._get_strategy_chain(request, strategy_chain)
# Build messages
message_builder = MultiTurnMessageBuilder(request, strategy_chain=final_strategy_chain)
messages = message_builder.build_messages()
# Log debug information
logger.info("Using strategy chain: %s", [s.__name__ for s in final_strategy_chain])
logger.info("Final messages for LLM:")
for msg in messages:
logger.info(f"Role: {msg['role']}, Content: {msg['content']}")
return messages
def _process_chat_response(self, chunk, full_response: Optional[Any], full_content: str) -> Tuple[Any, str, Optional[str]]:
"""
Process custom chat_response format data
Args:
chunk: Response data chunk
full_response: Current complete response object
full_content: Current accumulated content
Returns:
Tuple[Any, str, Optional[str]]: (Updated response object, Updated content, Finish reason)
"""
finish_reason = None
logger.info(f"Processing custom format response: {chunk}")
# Get content
content = ""
if isinstance(chunk, dict):
content = chunk.get("content", "")
is_done = chunk.get("done", False)
else:
content = chunk.content if hasattr(chunk, 'content') else ""
is_done = chunk.done if hasattr(chunk, 'done') else False
if content:
full_content += content
logger.info(f"Added content from custom format, current length: {len(full_content)}")
# Initialize response object (if needed)
if not full_response:
full_response = {
"id": str(uuid.uuid4()),
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": "models/lpm",
"system_fingerprint": None,
"choices": [
{
"index": 0,
"delta": {
"content": ""
},
"finish_reason": None
}
]
}
# Check if completed
if is_done:
finish_reason = 'stop'
logger.info("Got finish_reason from custom format: stop")
return full_response, full_content, finish_reason
def _process_openai_response(self, chunk, full_response: Optional[Any], full_content: str) -> Tuple[Any, str, Optional[str]]:
"""
Process OpenAI format response data
Args:
chunk: Response data chunk
full_response: Current complete response object
full_content: Current accumulated content
Returns:
Tuple[Any, str, Optional[str]]: (Updated response object, Updated content, Finish reason)
"""
finish_reason = None
if not hasattr(chunk, 'choices'):
logger.warning(f"Chunk has no choices attribute: {chunk}")
return full_response, full_content, finish_reason
choices = getattr(chunk, 'choices', None)
if not choices:
logger.warning("Chunk has empty choices")
return full_response, full_content, finish_reason
# Save basic information of the first response
if full_response is None:
full_response = {
"id": getattr(chunk, 'id', str(uuid.uuid4())),
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": "models/lpm",
"system_fingerprint": getattr(chunk, 'system_fingerprint', None),
"choices": [
{
"index": 0,
"delta": {
"content": ""
},
"finish_reason": None
}
]
}
# Collect content and finish reason
choice = choices[0]
if hasattr(choice, 'delta'):
delta = choice.delta
if hasattr(delta, 'content') and delta.content is not None:
full_content += delta.content
# logger.info(f"Added content from OpenAI format, current length: {len(full_content)}")
if choice.finish_reason:
finish_reason = choice.finish_reason
logger.info(f"Got finish_reason: {finish_reason}")
return full_response, full_content, finish_reason
def collect_stream_response(self, response_iterator: Iterator[Dict[str, Any]]):
"""
Collect streaming response into a complete response
Args:
response_iterator: Streaming response iterator
Returns:
Complete response dictionary
"""
logger.info("Starting to collect stream response")
full_response = None
full_content = ""
finish_reason = None
chunk_count = 0
try:
for chunk in response_iterator:
if chunk is None:
logger.warning("Received None chunk, skipping")
continue
chunk_count += 1
# logger.info(f"Processing chunk #{chunk_count}: {chunk}")
# Check if it's a custom format response
is_chat_response = (
(hasattr(chunk, 'type') and chunk.type == 'chat_response') or
(isinstance(chunk, dict) and chunk.get("type") == "chat_response")
)
if is_chat_response:
full_response, full_content, chunk_finish_reason = self._process_chat_response(
chunk, full_response, full_content
)
else:
full_response, full_content, chunk_finish_reason = self._process_openai_response(
chunk, full_response, full_content
)
if chunk_finish_reason:
finish_reason = chunk_finish_reason
# logger.info(f"Finished processing all chunks. Total chunks: {chunk_count}")
# logger.info(f"Final content length: {len(full_content)}")
# logger.info(f"Final finish_reason: {finish_reason}")
if not full_response:
logger.error("No valid response collected")
return None
if not full_content:
logger.error("No content collected")
return None
# Update response with complete content
full_response["choices"][0]["delta"]["content"] = full_content
if finish_reason:
full_response["choices"][0]["finish_reason"] = finish_reason
# logger.info(f"Final response object: {full_response}")
# logger.info(f"Final response content: {full_content}")
return full_response
except Exception as e:
logger.error(f"Error collecting stream response: {str(e)}", exc_info=True)
return None
def chat(
self,
request: ChatRequest,
strategy_chain: Optional[List[Type[SystemPromptStrategy]]] = None,
stream: bool = True,
json_response: bool = False,
client: Optional[Any] = None,
model_params: Optional[Dict[str, Any]] = None,
context: Optional[Any] = None,
) -> Union[Dict[str, Any], Iterator[Dict[str, Any]]]:
"""
Main chat method supporting both streaming and non-streaming responses
Args:
request: Chat request containing message and other parameters
strategy_chain: Optional list of strategy classes to use
stream: Whether to return a streaming response
json_response: Whether to request JSON formatted response from LLM
client: Optional OpenAI client to use. If None, uses local_llm_service.client
model_params: Optional model specific parameters to override defaults
context: Optional context to pass to strategies
Returns:
Either an iterator for streaming responses or a single response dictionary
"""
logger.info(f"Chat request: {request}")
# Build messages
message_builder = MultiTurnMessageBuilder(request, strategy_chain=strategy_chain)
messages = message_builder.build_messages(context)
# Log debug information
# logger.info("Using strategy chain: %s", [s.__name__ for s in strategy_chain] if strategy_chain else "default")
logger.info("Final messages for LLM:")
for msg in messages:
logger.info(f"Role: {msg['role']}, Content: {msg['content']}")
# Use provided client or default local_llm_service.client
current_client = client if client is not None else local_llm_service.client
self.user_llm_config_service = UserLLMConfigService()
self.user_llm_config = self.user_llm_config_service.get_available_llm()
# Prepare API call parameters
api_params = {
"messages": messages,
"temperature": request.temperature,
"response_format": {"type": "text"},
"seed": 42, # Optional: Fixed random seed to get consistent responses
"tools": None, # Optional: If function calling or similar features are needed
"tool_choice": None, # Optional: If function calling or similar features are needed
"max_tokens": request.max_tokens,
"stream": stream,
"model": request.model or "models/lpm",
"metadata": request.metadata
}
# Add JSON format requirement (if needed)
if json_response:
api_params["response_format"] = {"type": "json_object"}
# Update custom model parameters (if provided)
if model_params:
api_params.update(model_params)
logger.info(f"Current client base URL: {current_client.base_url}")
# logger.info(f"Using model parameters: {api_params}")
# Call LLM API
try:
response = current_client.chat.completions.create(**api_params)
if not stream:
logger.info(f"Response: {response.json() if hasattr(response, 'json') else response}")
return response
except Exception as e:
logger.error(f"Chat failed: {str(e)}", exc_info=True)
raise
# Global chat service instance
chat_service = ChatService()