# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import json # Import json module to work with JSON objects for request and response handling import uuid # Import uuid module to generate unique identifiers if needed for tracking or sessions from typing import List, Dict # Import type hinting for function parameters to improve code clarity and checking from config import * # Import all configuration variables such as server lists and tokens from config files from src.utils.session_mapping import get_host # Import helper function to map session ID to appropriate server host from src.utils.ip_generator import generate_ip # Import utility function to generate random IP addresses for headers from src.utils.helper import mark # Import function to mark servers as failed for retry or logging purposes import asyncio # Import asyncio module to enable asynchronous programming constructs in the function from src.utils.time import get_time # Import function to get current date and time in required format from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close # Import functions to wrap reasoning text with tags from src.utils.instruction import set_instructions # Import function to generate system instructions based on mode and time from src.core.transport.httpx import httpx_transport # Import primary HTTP transport method using httpx for streaming from src.core.transport.aiohttp import aiohttp_transport # Import fallback HTTP transport method using aiohttp import httpx # Import httpx module for exception handling of HTTP status error import aiohttp # Import aiohttp module for exception handling of client response error # Define the main asynchronous function to communicate with AI server and stream responses async def jarvis( session_id: str, # Unique identifier for the user session to route requests correctly model: str, # AI model name or identifier to specify which model to use for generation history: List[Dict[str, str]], # List of previous conversation messages to maintain context user_message: str, # The latest message input from the user to send to the AI model mode: str, # Mode string controlling behavior such as enabling or disabling reasoning output files=None, # Optional parameter for any files attached by the user to include in the request temperature: float = 0.6, # Sampling temperature controlling randomness of AI responses top_k: int = 20, # Limits token selection to top-k probable tokens for response generation min_p: float = 0, # Minimum probability threshold for token sampling to filter unlikely tokens top_p: float = 0.95, # Cumulative probability cutoff for nucleus sampling of tokens repetition_penalty: float = 1, # Parameter to penalize repeated tokens to reduce repetition in output ): """ Stream AI response from multiple configured servers using asynchronous HTTP requests Yields chunks of response that include reasoning and content parts as they arrive """ # Initialize a set to keep track of servers that have already been attempted tried = set() # Prevents retrying the same server multiple times to avoid redundant requests # Loop until a server successfully returns a response or all servers have been exhausted while len(tried) < len(auth): # Continue trying servers until all configured servers are tried # Retrieve server configuration details mapped to the current session setup = get_host(session_id) # Get server host, token, and error codes for the session server = setup["jarvis"] # Extract server name identifier for logging and marking host = setup["endpoint"] # Extract server endpoint URL for sending requests token = setup["token"] # Extract authentication token for authorized access error = setup["error"] # Extract HTTP status code that indicates retryable error tried.add(server) # Add current server to tried set to avoid retrying it again # Get the current date and time for system instruction date = get_time() # Retrieve current timestamp to include in system instructions # Generate system instructions instructions = set_instructions(mode, date) # Create system instructions guiding AI behavior # Make a shallow copy of the conversation history to avoid mutating original list messages = history.copy() # Duplicate previous messages to safely modify for this request # Insert the system instruction message at the beginning of the message list messages.insert(0, {"role": "system", "content": instructions}) # Add system instructions as first message # Construct the user message dictionary with role and content msg = {"role": "user", "content": user_message} # Prepare user's latest input for the request if files: # Check if any files are attached to include in the message payload msg["files"] = files # Attach files to the user message to send alongside text input messages.append(msg) # Append the user message (with optional files) to the message history # Prepare HTTP headers including authorization and content type for the request headers = { "Authorization": f"Bearer {token}", # Bearer token for authenticating with the AI server "Content-Type": "application/json", # Specify that the request body is JSON formatted "X-Forwarded-For": generate_ip(), # Randomly generated IP address to simulate client origin } # Build the JSON payload containing model, messages, and generation parameters payload = { "model": model, # Specify which AI model to use for generating responses "messages": messages, # Provide the full message history including system and user inputs "stream": True, # Enable streaming mode to receive partial response chunks progressively "temperature": temperature, # Control randomness in token sampling for response diversity "top_k": top_k, # Restrict token selection to top-k most probable tokens "min_p": min_p, # Set minimum probability threshold to filter out unlikely tokens "top_p": top_p, # Use nucleus sampling with cumulative probability cutoff "repetition_penalty": repetition_penalty, # Penalize repeated tokens to reduce redundancy } # Attempt to stream the response using the primary HTTP transport method (httpx) try: async for chunk in httpx_transport(host, headers, payload, mode): # Stream response chunks asynchronously yield chunk # Yield each chunk to the caller as it arrives for real-time processing return # Exit the function if streaming completes successfully without errors # Handle HTTP errors with status codes that indicate retryable failures except httpx.HTTPStatusError as e: # Catch HTTP errors specific to httpx transport if e.response.status_code == error: # If error code matches retryable error, try next server continue # Skip current server and proceed to next iteration to retry else: mark(server) # Mark the current server as failed for non-retryable errors # Handle any other unexpected exceptions during httpx transport except Exception: # Catch all other exceptions to prevent crashing mark(server) # Mark server as failed due to unexpected error # If the primary transport fails, attempt to stream response using fallback transport (aiohttp) try: async for chunk in aiohttp_transport(host, headers, payload, mode): # Use fallback streaming method yield chunk # Yield streamed chunks to caller as they arrive return # Exit if fallback transport succeeds # Handle aiohttp-specific response errors with retryable status codes except aiohttp.ClientResponseError as e: # Catch HTTP response errors from aiohttp transport if e.status == error: # Retry on matching error code by trying next server continue # Continue to next server attempt else: mark(server) # Mark server as failed for non-retryable errors # Handle any other exceptions during aiohttp transport except Exception: # Catch generic exceptions to avoid crashing mark(server) # Mark fallback server as failed # If all servers have been tried and failed, yield a user-friendly error message yield "The server is currently busy. Please wait a moment or try again later" # Inform user of service unavailability return # End the function after exhausting all servers