ai / src /core /server.py
hadadrjt's picture
ai: Enable API for Next-Gen!
5d9ca4f
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import json # Import json module to work with JSON objects for request and response handling
import uuid # Import uuid module to generate unique identifiers if needed for tracking or sessions
from typing import List, Dict # Import type hinting for function parameters to improve code clarity and checking
from config import * # Import all configuration variables such as server lists and tokens from config files
from src.utils.session_mapping import get_host # Import helper function to map session ID to appropriate server host
from src.utils.ip_generator import generate_ip # Import utility function to generate random IP addresses for headers
from src.utils.helper import mark # Import function to mark servers as failed for retry or logging purposes
import asyncio # Import asyncio module to enable asynchronous programming constructs in the function
from src.utils.time import get_time # Import function to get current date and time in required format
from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close # Import functions to wrap reasoning text with tags
from src.utils.instruction import set_instructions # Import function to generate system instructions based on mode and time
from src.core.transport.httpx import httpx_transport # Import primary HTTP transport method using httpx for streaming
from src.core.transport.aiohttp import aiohttp_transport # Import fallback HTTP transport method using aiohttp
# Define the main asynchronous function to communicate with AI server and stream responses
async def jarvis(
session_id: str, # Unique identifier for the user session to route requests correctly
model: str, # AI model name or identifier to specify which model to use for generation
history: List[Dict[str, str]], # List of previous conversation messages to maintain context
user_message: str, # The latest message input from the user to send to the AI model
mode: str, # Mode string controlling behavior such as enabling or disabling reasoning output
files=None, # Optional parameter for any files attached by the user to include in the request
temperature: float = 0.6, # Sampling temperature controlling randomness of AI responses
top_k: int = 20, # Limits token selection to top-k probable tokens for response generation
min_p: float = 0, # Minimum probability threshold for token sampling to filter unlikely tokens
top_p: float = 0.95, # Cumulative probability cutoff for nucleus sampling of tokens
repetition_penalty: float = 1, # Parameter to penalize repeated tokens to reduce repetition in output
):
"""
Stream AI response from multiple configured servers using asynchronous HTTP requests
Yields chunks of response that include reasoning and content parts as they arrive
"""
# Initialize a set to keep track of servers that have already been attempted
tried = set() # Prevents retrying the same server multiple times to avoid redundant requests
# Loop until a server successfully returns a response or all servers have been exhausted
while len(tried) < len(auth): # Continue trying servers until all configured servers are tried
# Retrieve server configuration details mapped to the current session
setup = get_host(session_id) # Get server host, token, and error codes for the session
server = setup["jarvis"] # Extract server name identifier for logging and marking
host = setup["endpoint"] # Extract server endpoint URL for sending requests
token = setup["token"] # Extract authentication token for authorized access
error = setup["error"] # Extract HTTP status code that indicates retryable error
tried.add(server) # Add current server to tried set to avoid retrying it again
# Get the current date and time for system instruction
date = get_time() # Retrieve current timestamp to include in system instructions
# Generate system instructions
instructions = set_instructions(mode, date) # Create system instructions guiding AI behavior
# Make a shallow copy of the conversation history to avoid mutating original list
messages = history.copy() # Duplicate previous messages to safely modify for this request
# Insert the system instruction message at the beginning of the message list
messages.insert(0, {"role": "system", "content": instructions}) # Add system instructions as first message
# Construct the user message dictionary with role and content
msg = {"role": "user", "content": user_message} # Prepare user's latest input for the request
if files: # Check if any files are attached to include in the message payload
msg["files"] = files # Attach files to the user message to send alongside text input
messages.append(msg) # Append the user message (with optional files) to the message history
# Prepare HTTP headers including authorization and content type for the request
headers = {
"Authorization": f"Bearer {token}", # Bearer token for authenticating with the AI server
"Content-Type": "application/json", # Specify that the request body is JSON formatted
"X-Forwarded-For": generate_ip(), # Randomly generated IP address to simulate client origin
}
# Build the JSON payload containing model, messages, and generation parameters
payload = {
"model": model, # Specify which AI model to use for generating responses
"messages": messages, # Provide the full message history including system and user inputs
"stream": True, # Enable streaming mode to receive partial response chunks progressively
"temperature": temperature, # Control randomness in token sampling for response diversity
"top_k": top_k, # Restrict token selection to top-k most probable tokens
"min_p": min_p, # Set minimum probability threshold to filter out unlikely tokens
"top_p": top_p, # Use nucleus sampling with cumulative probability cutoff
"repetition_penalty": repetition_penalty, # Penalize repeated tokens to reduce redundancy
}
# Attempt to stream the response using the primary HTTP transport method (httpx)
try:
async for chunk in httpx_transport(host, headers, payload, mode): # Stream response chunks asynchronously
yield chunk # Yield each chunk to the caller as it arrives for real-time processing
return # Exit the function if streaming completes successfully without errors
# Handle HTTP errors with status codes that indicate retryable failures
except httpx.HTTPStatusError as e: # Catch HTTP errors specific to httpx transport
if e.response.status_code == error: # If error code matches retryable error, try next server
continue # Skip current server and proceed to next iteration to retry
else:
mark(server) # Mark the current server as failed for non-retryable errors
# Handle any other unexpected exceptions during httpx transport
except Exception: # Catch all other exceptions to prevent crashing
mark(server) # Mark server as failed due to unexpected error
# If the primary transport fails, attempt to stream response using fallback transport (aiohttp)
try:
async for chunk in aiohttp_transport(host, headers, payload, mode): # Use fallback streaming method
yield chunk # Yield streamed chunks to caller as they arrive
return # Exit if fallback transport succeeds
# Handle aiohttp-specific response errors with retryable status codes
except aiohttp.ClientResponseError as e: # Catch HTTP response errors from aiohttp transport
if e.status == error: # Retry on matching error code by trying next server
continue # Continue to next server attempt
else:
mark(server) # Mark server as failed for non-retryable errors
# Handle any other exceptions during aiohttp transport
except Exception: # Catch generic exceptions to avoid crashing
mark(server) # Mark fallback server as failed
# If all servers have been tried and failed, yield a user-friendly error message
yield "The server is currently busy. Please wait a moment or try again later" # Inform user of service unavailability
return # End the function after exhausting all servers