Spaces:
Runtime error
Runtime error
# | |
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import asyncio | |
import httpx | |
import json | |
import random | |
import uuid | |
from src.config import * | |
from src.cores.server import fetch_response_stream_async | |
from src.cores.session import ensure_stop_event, get_model_key | |
async def chat_with_model_async(history, user_input, model_display, sess, custom_prompt, deep_search): | |
""" | |
Core async function to interact with AI model. | |
Prepares message history, system instructions, and optionally integrates deep search results. | |
Tries multiple backend hosts and keys with fallback. | |
Yields streamed responses for UI updates. | |
""" | |
ensure_stop_event(sess) | |
sess.stop_event.clear() | |
sess.cancel_token["cancelled"] = False | |
if not LINUX_SERVER_PROVIDER_KEYS or not LINUX_SERVER_HOSTS: | |
yield ("content", RESPONSES["RESPONSE_3"]) # No providers available | |
return | |
if not hasattr(sess, "session_id") or not sess.session_id: | |
sess.session_id = str(uuid.uuid4()) | |
model_key = get_model_key(model_display, MODEL_MAPPING, DEFAULT_MODEL_KEY) | |
cfg = MODEL_CONFIG.get(model_key, DEFAULT_CONFIG) | |
msgs = [] | |
# If deep search enabled and using primary model, prepend deep search instructions and results | |
if deep_search and model_display == MODEL_CHOICES[0]: | |
msgs.append({"role": "system", "content": DEEP_SEARCH_INSTRUCTIONS}) | |
try: | |
async with httpx.AsyncClient() as client: | |
payload = { | |
"query": user_input, | |
"topic": "general", | |
"search_depth": "basic", | |
"chunks_per_source": 5, | |
"max_results": 5, | |
"time_range": None, | |
"days": 7, | |
"include_answer": True, | |
"include_raw_content": False, | |
"include_images": False, | |
"include_image_descriptions": False, | |
"include_domains": [], | |
"exclude_domains": [] | |
} | |
r = await client.post(DEEP_SEARCH_PROVIDER_HOST, headers={"Authorization": f"Bearer {DEEP_SEARCH_PROVIDER_KEY}"}, json=payload) | |
sr_json = r.json() | |
msgs.append({"role": "system", "content": json.dumps(sr_json)}) | |
except Exception: | |
# Fail silently if deep search fails | |
pass | |
msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS}) | |
elif model_display == MODEL_CHOICES[0]: | |
# For primary model without deep search, use internal instructions | |
msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS}) | |
else: | |
# For other models, use default instructions | |
msgs.append({"role": "system", "content": custom_prompt or SYSTEM_PROMPT_MAPPING.get(model_key, SYSTEM_PROMPT_DEFAULT)}) | |
# Append conversation history alternating user and assistant messages | |
msgs.extend([{"role": "user", "content": u} for u, _ in history]) | |
msgs.extend([{"role": "assistant", "content": a} for _, a in history if a]) | |
# Append current user input | |
msgs.append({"role": "user", "content": user_input}) | |
# Shuffle provider hosts and keys for load balancing and fallback | |
candidates = [(h, k) for h in LINUX_SERVER_HOSTS for k in LINUX_SERVER_PROVIDER_KEYS] | |
random.shuffle(candidates) | |
# Try each host-key pair until a successful response is received | |
for h, k in candidates: | |
stream_gen = fetch_response_stream_async(h, k, model_key, msgs, cfg, sess.session_id, sess.stop_event, sess.cancel_token) | |
got_responses = False | |
async for chunk in stream_gen: | |
if sess.stop_event.is_set() or sess.cancel_token["cancelled"]: | |
return | |
got_responses = True | |
yield chunk | |
if got_responses: | |
return | |
# If no response from any provider, yield fallback message | |
yield ("content", RESPONSES["RESPONSE_2"]) | |