Spaces:
Running
Running
# | |
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import json # Import json module to work with JSON objects for request and response handling | |
import uuid # Import uuid module to generate unique identifiers if needed for tracking or sessions | |
from typing import List, Dict # Import type hinting for function parameters to improve code clarity and checking | |
from config import * # Import all configuration variables such as server lists and tokens from config files | |
from src.utils.session_mapping import get_host # Import helper function to map session ID to appropriate server host | |
from src.utils.ip_generator import generate_ip # Import utility function to generate random IP addresses for headers | |
from src.utils.helper import mark # Import function to mark servers as failed for retry or logging purposes | |
import asyncio # Import asyncio module to enable asynchronous programming constructs in the function | |
from src.utils.time import get_time # Import function to get current date and time in required format | |
from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close # Import functions to wrap reasoning text with tags | |
from src.utils.instruction import set_instructions # Import function to generate system instructions based on mode and time | |
from src.core.transport.httpx import httpx_transport # Import primary HTTP transport method using httpx for streaming | |
from src.core.transport.aiohttp import aiohttp_transport # Import fallback HTTP transport method using aiohttp | |
# Define the main asynchronous function to communicate with AI server and stream responses | |
async def jarvis( | |
session_id: str, # Unique identifier for the user session to route requests correctly | |
model: str, # AI model name or identifier to specify which model to use for generation | |
history: List[Dict[str, str]], # List of previous conversation messages to maintain context | |
user_message: str, # The latest message input from the user to send to the AI model | |
mode: str, # Mode string controlling behavior such as enabling or disabling reasoning output | |
files=None, # Optional parameter for any files attached by the user to include in the request | |
temperature: float = 0.6, # Sampling temperature controlling randomness of AI responses | |
top_k: int = 20, # Limits token selection to top-k probable tokens for response generation | |
min_p: float = 0, # Minimum probability threshold for token sampling to filter unlikely tokens | |
top_p: float = 0.95, # Cumulative probability cutoff for nucleus sampling of tokens | |
repetition_penalty: float = 1, # Parameter to penalize repeated tokens to reduce repetition in output | |
): | |
""" | |
Stream AI response from multiple configured servers using asynchronous HTTP requests | |
Yields chunks of response that include reasoning and content parts as they arrive | |
""" | |
# Initialize a set to keep track of servers that have already been attempted | |
tried = set() # Prevents retrying the same server multiple times to avoid redundant requests | |
# Loop until a server successfully returns a response or all servers have been exhausted | |
while len(tried) < len(auth): # Continue trying servers until all configured servers are tried | |
# Retrieve server configuration details mapped to the current session | |
setup = get_host(session_id) # Get server host, token, and error codes for the session | |
server = setup["jarvis"] # Extract server name identifier for logging and marking | |
host = setup["endpoint"] # Extract server endpoint URL for sending requests | |
token = setup["token"] # Extract authentication token for authorized access | |
error = setup["error"] # Extract HTTP status code that indicates retryable error | |
tried.add(server) # Add current server to tried set to avoid retrying it again | |
# Get the current date and time for system instruction | |
date = get_time() # Retrieve current timestamp to include in system instructions | |
# Generate system instructions | |
instructions = set_instructions(mode, date) # Create system instructions guiding AI behavior | |
# Make a shallow copy of the conversation history to avoid mutating original list | |
messages = history.copy() # Duplicate previous messages to safely modify for this request | |
# Insert the system instruction message at the beginning of the message list | |
messages.insert(0, {"role": "system", "content": instructions}) # Add system instructions as first message | |
# Construct the user message dictionary with role and content | |
msg = {"role": "user", "content": user_message} # Prepare user's latest input for the request | |
if files: # Check if any files are attached to include in the message payload | |
msg["files"] = files # Attach files to the user message to send alongside text input | |
messages.append(msg) # Append the user message (with optional files) to the message history | |
# Prepare HTTP headers including authorization and content type for the request | |
headers = { | |
"Authorization": f"Bearer {token}", # Bearer token for authenticating with the AI server | |
"Content-Type": "application/json", # Specify that the request body is JSON formatted | |
"X-Forwarded-For": generate_ip(), # Randomly generated IP address to simulate client origin | |
} | |
# Build the JSON payload containing model, messages, and generation parameters | |
payload = { | |
"model": model, # Specify which AI model to use for generating responses | |
"messages": messages, # Provide the full message history including system and user inputs | |
"stream": True, # Enable streaming mode to receive partial response chunks progressively | |
"temperature": temperature, # Control randomness in token sampling for response diversity | |
"top_k": top_k, # Restrict token selection to top-k most probable tokens | |
"min_p": min_p, # Set minimum probability threshold to filter out unlikely tokens | |
"top_p": top_p, # Use nucleus sampling with cumulative probability cutoff | |
"repetition_penalty": repetition_penalty, # Penalize repeated tokens to reduce redundancy | |
} | |
# Attempt to stream the response using the primary HTTP transport method (httpx) | |
try: | |
async for chunk in httpx_transport(host, headers, payload, mode): # Stream response chunks asynchronously | |
yield chunk # Yield each chunk to the caller as it arrives for real-time processing | |
return # Exit the function if streaming completes successfully without errors | |
# Handle HTTP errors with status codes that indicate retryable failures | |
except httpx.HTTPStatusError as e: # Catch HTTP errors specific to httpx transport | |
if e.response.status_code == error: # If error code matches retryable error, try next server | |
continue # Skip current server and proceed to next iteration to retry | |
else: | |
mark(server) # Mark the current server as failed for non-retryable errors | |
# Handle any other unexpected exceptions during httpx transport | |
except Exception: # Catch all other exceptions to prevent crashing | |
mark(server) # Mark server as failed due to unexpected error | |
# If the primary transport fails, attempt to stream response using fallback transport (aiohttp) | |
try: | |
async for chunk in aiohttp_transport(host, headers, payload, mode): # Use fallback streaming method | |
yield chunk # Yield streamed chunks to caller as they arrive | |
return # Exit if fallback transport succeeds | |
# Handle aiohttp-specific response errors with retryable status codes | |
except aiohttp.ClientResponseError as e: # Catch HTTP response errors from aiohttp transport | |
if e.status == error: # Retry on matching error code by trying next server | |
continue # Continue to next server attempt | |
else: | |
mark(server) # Mark server as failed for non-retryable errors | |
# Handle any other exceptions during aiohttp transport | |
except Exception: # Catch generic exceptions to avoid crashing | |
mark(server) # Mark fallback server as failed | |
# If all servers have been tried and failed, yield a user-friendly error message | |
yield "The server is currently busy. Please wait a moment or try again later" # Inform user of service unavailability | |
return # End the function after exhausting all servers |