Spaces:
Running
Running
File size: 8,880 Bytes
b5e7375 5d9ca4f 5b754a3 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f e96f98e 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f b5e7375 5d9ca4f e96f98e 5d9ca4f e96f98e 5d9ca4f e96f98e 5d9ca4f b5e7375 5d9ca4f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import json # Import json module to work with JSON objects for request and response handling
import uuid # Import uuid module to generate unique identifiers if needed for tracking or sessions
from typing import List, Dict # Import type hinting for function parameters to improve code clarity and checking
from config import * # Import all configuration variables such as server lists and tokens from config files
from src.utils.session_mapping import get_host # Import helper function to map session ID to appropriate server host
from src.utils.ip_generator import generate_ip # Import utility function to generate random IP addresses for headers
from src.utils.helper import mark # Import function to mark servers as failed for retry or logging purposes
import asyncio # Import asyncio module to enable asynchronous programming constructs in the function
from src.utils.time import get_time # Import function to get current date and time in required format
from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close # Import functions to wrap reasoning text with tags
from src.utils.instruction import set_instructions # Import function to generate system instructions based on mode and time
from src.core.transport.httpx import httpx_transport # Import primary HTTP transport method using httpx for streaming
from src.core.transport.aiohttp import aiohttp_transport # Import fallback HTTP transport method using aiohttp
import httpx # Import httpx module for exception handling of HTTP status error
import aiohttp # Import aiohttp module for exception handling of client response error
# Define the main asynchronous function to communicate with AI server and stream responses
async def jarvis(
session_id: str, # Unique identifier for the user session to route requests correctly
model: str, # AI model name or identifier to specify which model to use for generation
history: List[Dict[str, str]], # List of previous conversation messages to maintain context
user_message: str, # The latest message input from the user to send to the AI model
mode: str, # Mode string controlling behavior such as enabling or disabling reasoning output
files=None, # Optional parameter for any files attached by the user to include in the request
temperature: float = 0.6, # Sampling temperature controlling randomness of AI responses
top_k: int = 20, # Limits token selection to top-k probable tokens for response generation
min_p: float = 0, # Minimum probability threshold for token sampling to filter unlikely tokens
top_p: float = 0.95, # Cumulative probability cutoff for nucleus sampling of tokens
repetition_penalty: float = 1, # Parameter to penalize repeated tokens to reduce repetition in output
):
"""
Stream AI response from multiple configured servers using asynchronous HTTP requests
Yields chunks of response that include reasoning and content parts as they arrive
"""
# Initialize a set to keep track of servers that have already been attempted
tried = set() # Prevents retrying the same server multiple times to avoid redundant requests
# Loop until a server successfully returns a response or all servers have been exhausted
while len(tried) < len(auth): # Continue trying servers until all configured servers are tried
# Retrieve server configuration details mapped to the current session
setup = get_host(session_id) # Get server host, token, and error codes for the session
server = setup["jarvis"] # Extract server name identifier for logging and marking
host = setup["endpoint"] # Extract server endpoint URL for sending requests
token = setup["token"] # Extract authentication token for authorized access
error = setup["error"] # Extract HTTP status code that indicates retryable error
tried.add(server) # Add current server to tried set to avoid retrying it again
# Get the current date and time for system instruction
date = get_time() # Retrieve current timestamp to include in system instructions
# Generate system instructions
instructions = set_instructions(mode, date) # Create system instructions guiding AI behavior
# Make a shallow copy of the conversation history to avoid mutating original list
messages = history.copy() # Duplicate previous messages to safely modify for this request
# Insert the system instruction message at the beginning of the message list
messages.insert(0, {"role": "system", "content": instructions}) # Add system instructions as first message
# Construct the user message dictionary with role and content
msg = {"role": "user", "content": user_message} # Prepare user's latest input for the request
if files: # Check if any files are attached to include in the message payload
msg["files"] = files # Attach files to the user message to send alongside text input
messages.append(msg) # Append the user message (with optional files) to the message history
# Prepare HTTP headers including authorization and content type for the request
headers = {
"Authorization": f"Bearer {token}", # Bearer token for authenticating with the AI server
"Content-Type": "application/json", # Specify that the request body is JSON formatted
"X-Forwarded-For": generate_ip(), # Randomly generated IP address to simulate client origin
}
# Build the JSON payload containing model, messages, and generation parameters
payload = {
"model": model, # Specify which AI model to use for generating responses
"messages": messages, # Provide the full message history including system and user inputs
"stream": True, # Enable streaming mode to receive partial response chunks progressively
"temperature": temperature, # Control randomness in token sampling for response diversity
"top_k": top_k, # Restrict token selection to top-k most probable tokens
"min_p": min_p, # Set minimum probability threshold to filter out unlikely tokens
"top_p": top_p, # Use nucleus sampling with cumulative probability cutoff
"repetition_penalty": repetition_penalty, # Penalize repeated tokens to reduce redundancy
}
# Attempt to stream the response using the primary HTTP transport method (httpx)
try:
async for chunk in httpx_transport(host, headers, payload, mode): # Stream response chunks asynchronously
yield chunk # Yield each chunk to the caller as it arrives for real-time processing
return # Exit the function if streaming completes successfully without errors
# Handle HTTP errors with status codes that indicate retryable failures
except httpx.HTTPStatusError as e: # Catch HTTP errors specific to httpx transport
if e.response.status_code == error: # If error code matches retryable error, try next server
continue # Skip current server and proceed to next iteration to retry
else:
mark(server) # Mark the current server as failed for non-retryable errors
# Handle any other unexpected exceptions during httpx transport
except Exception: # Catch all other exceptions to prevent crashing
mark(server) # Mark server as failed due to unexpected error
# If the primary transport fails, attempt to stream response using fallback transport (aiohttp)
try:
async for chunk in aiohttp_transport(host, headers, payload, mode): # Use fallback streaming method
yield chunk # Yield streamed chunks to caller as they arrive
return # Exit if fallback transport succeeds
# Handle aiohttp-specific response errors with retryable status codes
except aiohttp.ClientResponseError as e: # Catch HTTP response errors from aiohttp transport
if e.status == error: # Retry on matching error code by trying next server
continue # Continue to next server attempt
else:
mark(server) # Mark server as failed for non-retryable errors
# Handle any other exceptions during aiohttp transport
except Exception: # Catch generic exceptions to avoid crashing
mark(server) # Mark fallback server as failed
# If all servers have been tried and failed, yield a user-friendly error message
yield "The server is currently busy. Please wait a moment or try again later" # Inform user of service unavailability
return # End the function after exhausting all servers |