# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import asyncio import httpx import json import random import uuid from src.config import * from src.cores.server import fetch_response_stream_async from src.cores.session import ensure_stop_event, get_model_key async def chat_with_model_async(history, user_input, model_display, sess, custom_prompt, deep_search): """ Core async function to interact with AI model. Prepares message history, system instructions, and optionally integrates deep search results. Tries multiple backend hosts and keys with fallback. Yields streamed responses for UI updates. """ ensure_stop_event(sess) sess.stop_event.clear() sess.cancel_token["cancelled"] = False if not LINUX_SERVER_PROVIDER_KEYS or not LINUX_SERVER_HOSTS: yield ("content", RESPONSES["RESPONSE_3"]) # No providers available return if not hasattr(sess, "session_id") or not sess.session_id: sess.session_id = str(uuid.uuid4()) model_key = get_model_key(model_display, MODEL_MAPPING, DEFAULT_MODEL_KEY) cfg = MODEL_CONFIG.get(model_key, DEFAULT_CONFIG) msgs = [] # If deep search enabled and using primary model, prepend deep search instructions and results if deep_search and model_display == MODEL_CHOICES[0]: msgs.append({"role": "system", "content": DEEP_SEARCH_INSTRUCTIONS}) try: async with httpx.AsyncClient() as client: payload = { "query": user_input, "topic": "general", "search_depth": "basic", "chunks_per_source": 5, "max_results": 5, "time_range": None, "days": 7, "include_answer": True, "include_raw_content": False, "include_images": False, "include_image_descriptions": False, "include_domains": [], "exclude_domains": [] } r = await client.post(DEEP_SEARCH_PROVIDER_HOST, headers={"Authorization": f"Bearer {DEEP_SEARCH_PROVIDER_KEY}"}, json=payload) sr_json = r.json() msgs.append({"role": "system", "content": json.dumps(sr_json)}) except Exception: # Fail silently if deep search fails pass msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS}) elif model_display == MODEL_CHOICES[0]: # For primary model without deep search, use internal instructions msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS}) else: # For other models, use default instructions msgs.append({"role": "system", "content": custom_prompt or SYSTEM_PROMPT_MAPPING.get(model_key, SYSTEM_PROMPT_DEFAULT)}) # Append conversation history alternating user and assistant messages msgs.extend([{"role": "user", "content": u} for u, _ in history]) msgs.extend([{"role": "assistant", "content": a} for _, a in history if a]) # Append current user input msgs.append({"role": "user", "content": user_input}) # Shuffle provider hosts and keys for load balancing and fallback candidates = [(h, k) for h in LINUX_SERVER_HOSTS for k in LINUX_SERVER_PROVIDER_KEYS] random.shuffle(candidates) # Try each host-key pair until a successful response is received for h, k in candidates: stream_gen = fetch_response_stream_async(h, k, model_key, msgs, cfg, sess.session_id, sess.stop_event, sess.cancel_token) got_responses = False async for chunk in stream_gen: if sess.stop_event.is_set() or sess.cancel_token["cancelled"]: return got_responses = True yield chunk if got_responses: return # If no response from any provider, yield fallback message yield ("content", RESPONSES["RESPONSE_2"])