GitRecap / services /llm_service.py
github-actions[bot]
Deploy app/api to HF Space
0491d76
import json
import os
import uuid
from typing import Dict, List, Optional
from fastapi import HTTPException
import asyncio
import random
from aicore.logger import _logger
from aicore.config import Config
from aicore.llm import Llm
from aicore.llm.config import LlmConfig
from services.prompts import SELECT_QUIRKY_REMARK_SYSTEM, SYSTEM, quirky_remarks
def get_random_quirky_remarks(remarks_list, n=5):
"""
Returns a list of n randomly selected quirky remarks.
Args:
remarks_list (list): The full list of quirky remarks.
n (int): Number of remarks to select (default is 5).
Returns:
list: Randomly selected quirky remarks.
"""
return random.sample(remarks_list, min(n, len(remarks_list)))
# LLM session storage
llm_sessions: Dict[str, Llm] = {}
async def initialize_llm_session(session_id: str, config: Optional[LlmConfig] = None) -> Llm:
"""
Initialize or retrieve an LLM session.
Args:
session_id: The session identifier.
config: Optional custom LLM configuration.
Returns:
An initialized LLM instance.
"""
if session_id in llm_sessions:
return llm_sessions[session_id]
# Initialize LLM based on whether custom config is provided.
if config:
# Convert Pydantic model to dict and use for LLM initialization.
config_dict = config.dict(exclude_none=True)
llm = Llm.from_config(config_dict)
else:
config = Config.from_environment()
llm = Llm.from_config(config.llm)
llm.session_id = session_id
llm_sessions[session_id] = llm
return llm
async def set_llm(config: Optional[LlmConfig] = None) -> str:
"""
Set a custom LLM configuration and return a new session ID.
Args:
config: The LLM configuration to use.
Returns:
A new session ID linked to the configured LLM.
"""
try:
# Generate a unique session ID.
session_id = str(uuid.uuid4())
# Initialize the LLM with the provided configuration.
await initialize_llm_session(session_id, config)
# Schedule session expiration exactly 5 minutes after session creation.
asyncio.create_task(schedule_session_expiration(session_id))
return session_id
except Exception as e:
print(f"Error setting custom LLM: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to set custom LLM: {str(e)}")
def get_llm(session_id: str) -> Optional[Llm]:
"""
Retrieve the LLM instance associated with the given session_id.
Args:
session_id: The session identifier.
Returns:
The LLM instance if found.
Raises:
HTTPException: If the session is not found.
"""
if session_id not in llm_sessions:
raise HTTPException(status_code=404, detail="Session not found")
return llm_sessions.get(session_id)
def trim_messages(messages, tokenizer_fn, max_tokens: Optional[int] = None):
"""
Trim messages to ensure that the total token count does not exceed max_tokens.
Args:
messages: List of messages.
tokenizer_fn: Function to tokenize messages.
max_tokens: Maximum allowed tokens.
Returns:
Trimmed list of messages.
"""
max_tokens = max_tokens or int(os.environ.get("MAX_HISTORY_TOKENS", 16000))
while messages and sum(len(tokenizer_fn(str(msg))) for msg in messages) > max_tokens:
messages.pop(0) # Remove from the beginning
return messages
async def run_concurrent_tasks(llm, message):
"""
Run concurrent tasks for the LLM and logger.
Args:
llm: The LLM instance.
message: Message to process.
Yields:
Chunks of logs from the logger.
"""
QUIRKY_SYSTEM = SELECT_QUIRKY_REMARK_SYSTEM.format(
examples=json.dumps(get_random_quirky_remarks(quirky_remarks), indent=4)
)
asyncio.create_task(llm.acomplete(message, system_prompt=[SYSTEM, QUIRKY_SYSTEM]))
asyncio.create_task(_logger.distribute())
# Stream logger output while LLM is running.
while True:
async for chunk in _logger.get_session_logs(llm.session_id):
yield chunk # Yield each chunk directly
def simulate_llm_response(message: str) -> List[str]:
"""
Simulate LLM response by breaking a dummy response into chunks.
Args:
message: Input message.
Returns:
List of response chunks.
"""
response = (
f"This is a simulated response to: '{message}'. In a real implementation, this would be the actual output "
"from your LLM model. The response would be generated in chunks and streamed back to the client as they become available."
)
# Break into chunks of approximately 10 characters.
chunks = []
for i in range(0, len(response), 10):
chunks.append(response[i:i+10])
return chunks
def cleanup_llm_sessions():
"""Clean up all LLM sessions."""
llm_sessions.clear()
async def schedule_session_expiration(session_id: str):
"""
Schedule the expiration of a session exactly 5 minutes after its creation.
Args:
session_id: The session identifier.
"""
# Wait for 5 minutes (300 seconds) before expiring the session.
await asyncio.sleep(300)
await expire_session(session_id)
async def expire_session(session_id: str):
"""
Expire a session by removing it from storage and cleaning up associated resources.
Args:
session_id: The session identifier.
"""
# Remove the expired session from storage.
llm_sessions.pop(session_id, None)
# Expire any associated fetcher in fetcher_service.
from services.fetcher_service import expire_fetcher
expire_fetcher(session_id)
# Expire any active websocket connections associated with session_id.
from server.websockets import close_websocket_connection
close_websocket_connection(session_id)