File size: 8,880 Bytes
b5e7375
 
 
 
 
5d9ca4f
 
 
 
 
 
 
 
 
 
 
 
 
5b754a3
 
5d9ca4f
 
b5e7375
5d9ca4f
 
 
 
 
 
 
 
 
 
 
b5e7375
 
5d9ca4f
 
 
b5e7375
5d9ca4f
 
b5e7375
5d9ca4f
 
b5e7375
5d9ca4f
 
 
 
 
 
 
b5e7375
5d9ca4f
 
b5e7375
5d9ca4f
 
b5e7375
5d9ca4f
 
e96f98e
5d9ca4f
 
b5e7375
5d9ca4f
 
 
 
 
b5e7375
5d9ca4f
b5e7375
5d9ca4f
 
 
b5e7375
 
5d9ca4f
b5e7375
5d9ca4f
 
 
 
 
 
 
 
b5e7375
 
5d9ca4f
b5e7375
5d9ca4f
 
 
 
 
 
 
 
b5e7375
5d9ca4f
e96f98e
5d9ca4f
 
 
e96f98e
5d9ca4f
 
 
 
 
 
 
 
 
 
e96f98e
5d9ca4f
 
 
 
 
b5e7375
5d9ca4f
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#

import json  # Import json module to work with JSON objects for request and response handling
import uuid  # Import uuid module to generate unique identifiers if needed for tracking or sessions
from typing import List, Dict  # Import type hinting for function parameters to improve code clarity and checking
from config import *  # Import all configuration variables such as server lists and tokens from config files
from src.utils.session_mapping import get_host  # Import helper function to map session ID to appropriate server host
from src.utils.ip_generator import generate_ip  # Import utility function to generate random IP addresses for headers
from src.utils.helper import mark  # Import function to mark servers as failed for retry or logging purposes
import asyncio  # Import asyncio module to enable asynchronous programming constructs in the function
from src.utils.time import get_time  # Import function to get current date and time in required format
from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close  # Import functions to wrap reasoning text with tags
from src.utils.instruction import set_instructions  # Import function to generate system instructions based on mode and time
from src.core.transport.httpx import httpx_transport  # Import primary HTTP transport method using httpx for streaming
from src.core.transport.aiohttp import aiohttp_transport  # Import fallback HTTP transport method using aiohttp
import httpx  # Import httpx module for exception handling of HTTP status error
import aiohttp  # Import aiohttp module for exception handling of client response error

# Define the main asynchronous function to communicate with AI server and stream responses
async def jarvis(
    session_id: str,  # Unique identifier for the user session to route requests correctly
    model: str,  # AI model name or identifier to specify which model to use for generation
    history: List[Dict[str, str]],  # List of previous conversation messages to maintain context
    user_message: str,  # The latest message input from the user to send to the AI model
    mode: str,  # Mode string controlling behavior such as enabling or disabling reasoning output
    files=None,  # Optional parameter for any files attached by the user to include in the request
    temperature: float = 0.6,  # Sampling temperature controlling randomness of AI responses
    top_k: int = 20,  # Limits token selection to top-k probable tokens for response generation
    min_p: float = 0,  # Minimum probability threshold for token sampling to filter unlikely tokens
    top_p: float = 0.95,  # Cumulative probability cutoff for nucleus sampling of tokens
    repetition_penalty: float = 1,  # Parameter to penalize repeated tokens to reduce repetition in output
):
    """
    Stream AI response from multiple configured servers using asynchronous HTTP requests
    Yields chunks of response that include reasoning and content parts as they arrive
    """

    # Initialize a set to keep track of servers that have already been attempted
    tried = set()  # Prevents retrying the same server multiple times to avoid redundant requests

    # Loop until a server successfully returns a response or all servers have been exhausted
    while len(tried) < len(auth):  # Continue trying servers until all configured servers are tried

        # Retrieve server configuration details mapped to the current session
        setup = get_host(session_id)  # Get server host, token, and error codes for the session
        server = setup["jarvis"]  # Extract server name identifier for logging and marking
        host = setup["endpoint"]  # Extract server endpoint URL for sending requests
        token = setup["token"]  # Extract authentication token for authorized access
        error = setup["error"]  # Extract HTTP status code that indicates retryable error
        tried.add(server)  # Add current server to tried set to avoid retrying it again

        # Get the current date and time for system instruction
        date = get_time()  # Retrieve current timestamp to include in system instructions

        # Generate system instructions
        instructions = set_instructions(mode, date)  # Create system instructions guiding AI behavior

        # Make a shallow copy of the conversation history to avoid mutating original list
        messages = history.copy()  # Duplicate previous messages to safely modify for this request

        # Insert the system instruction message at the beginning of the message list
        messages.insert(0, {"role": "system", "content": instructions})  # Add system instructions as first message

        # Construct the user message dictionary with role and content
        msg = {"role": "user", "content": user_message}  # Prepare user's latest input for the request
        if files:  # Check if any files are attached to include in the message payload
            msg["files"] = files  # Attach files to the user message to send alongside text input
        messages.append(msg)  # Append the user message (with optional files) to the message history

        # Prepare HTTP headers including authorization and content type for the request
        headers = {
            "Authorization": f"Bearer {token}",  # Bearer token for authenticating with the AI server
            "Content-Type": "application/json",  # Specify that the request body is JSON formatted
            "X-Forwarded-For": generate_ip(),  # Randomly generated IP address to simulate client origin
        }

        # Build the JSON payload containing model, messages, and generation parameters
        payload = {
            "model": model,  # Specify which AI model to use for generating responses
            "messages": messages,  # Provide the full message history including system and user inputs
            "stream": True,  # Enable streaming mode to receive partial response chunks progressively
            "temperature": temperature,  # Control randomness in token sampling for response diversity
            "top_k": top_k,  # Restrict token selection to top-k most probable tokens
            "min_p": min_p,  # Set minimum probability threshold to filter out unlikely tokens
            "top_p": top_p,  # Use nucleus sampling with cumulative probability cutoff
            "repetition_penalty": repetition_penalty,  # Penalize repeated tokens to reduce redundancy
        }

        # Attempt to stream the response using the primary HTTP transport method (httpx)
        try:
            async for chunk in httpx_transport(host, headers, payload, mode):  # Stream response chunks asynchronously
                yield chunk  # Yield each chunk to the caller as it arrives for real-time processing
            return  # Exit the function if streaming completes successfully without errors

        # Handle HTTP errors with status codes that indicate retryable failures
        except httpx.HTTPStatusError as e:  # Catch HTTP errors specific to httpx transport
            if e.response.status_code == error:  # If error code matches retryable error, try next server
                continue  # Skip current server and proceed to next iteration to retry
            else:
                mark(server)  # Mark the current server as failed for non-retryable errors

        # Handle any other unexpected exceptions during httpx transport
        except Exception:  # Catch all other exceptions to prevent crashing
            mark(server)  # Mark server as failed due to unexpected error

        # If the primary transport fails, attempt to stream response using fallback transport (aiohttp)
        try:
            async for chunk in aiohttp_transport(host, headers, payload, mode):  # Use fallback streaming method
                yield chunk  # Yield streamed chunks to caller as they arrive
            return  # Exit if fallback transport succeeds

        # Handle aiohttp-specific response errors with retryable status codes
        except aiohttp.ClientResponseError as e:  # Catch HTTP response errors from aiohttp transport
            if e.status == error:  # Retry on matching error code by trying next server
                continue  # Continue to next server attempt
            else:
                mark(server)  # Mark server as failed for non-retryable errors

        # Handle any other exceptions during aiohttp transport
        except Exception:  # Catch generic exceptions to avoid crashing
            mark(server)  # Mark fallback server as failed

    # If all servers have been tried and failed, yield a user-friendly error message
    yield "The server is currently busy. Please wait a moment or try again later"  # Inform user of service unavailability
    return  # End the function after exhausting all servers