hadadrjt's picture
ai: Restructured repo for production.
f99ad65
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import asyncio
import httpx
import json
import random
import uuid
from src.config import *
from src.cores.server import fetch_response_stream_async
from src.cores.session import ensure_stop_event, get_model_key
async def chat_with_model_async(history, user_input, model_display, sess, custom_prompt, deep_search):
"""
Core async function to interact with AI model.
Prepares message history, system instructions, and optionally integrates deep search results.
Tries multiple backend hosts and keys with fallback.
Yields streamed responses for UI updates.
"""
ensure_stop_event(sess)
sess.stop_event.clear()
sess.cancel_token["cancelled"] = False
if not LINUX_SERVER_PROVIDER_KEYS or not LINUX_SERVER_HOSTS:
yield ("content", RESPONSES["RESPONSE_3"]) # No providers available
return
if not hasattr(sess, "session_id") or not sess.session_id:
sess.session_id = str(uuid.uuid4())
model_key = get_model_key(model_display, MODEL_MAPPING, DEFAULT_MODEL_KEY)
cfg = MODEL_CONFIG.get(model_key, DEFAULT_CONFIG)
msgs = []
# If deep search enabled and using primary model, prepend deep search instructions and results
if deep_search and model_display == MODEL_CHOICES[0]:
msgs.append({"role": "system", "content": DEEP_SEARCH_INSTRUCTIONS})
try:
async with httpx.AsyncClient() as client:
payload = {
"query": user_input,
"topic": "general",
"search_depth": "basic",
"chunks_per_source": 5,
"max_results": 5,
"time_range": None,
"days": 7,
"include_answer": True,
"include_raw_content": False,
"include_images": False,
"include_image_descriptions": False,
"include_domains": [],
"exclude_domains": []
}
r = await client.post(DEEP_SEARCH_PROVIDER_HOST, headers={"Authorization": f"Bearer {DEEP_SEARCH_PROVIDER_KEY}"}, json=payload)
sr_json = r.json()
msgs.append({"role": "system", "content": json.dumps(sr_json)})
except Exception:
# Fail silently if deep search fails
pass
msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS})
elif model_display == MODEL_CHOICES[0]:
# For primary model without deep search, use internal instructions
msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS})
else:
# For other models, use default instructions
msgs.append({"role": "system", "content": custom_prompt or SYSTEM_PROMPT_MAPPING.get(model_key, SYSTEM_PROMPT_DEFAULT)})
# Append conversation history alternating user and assistant messages
msgs.extend([{"role": "user", "content": u} for u, _ in history])
msgs.extend([{"role": "assistant", "content": a} for _, a in history if a])
# Append current user input
msgs.append({"role": "user", "content": user_input})
# Shuffle provider hosts and keys for load balancing and fallback
candidates = [(h, k) for h in LINUX_SERVER_HOSTS for k in LINUX_SERVER_PROVIDER_KEYS]
random.shuffle(candidates)
# Try each host-key pair until a successful response is received
for h, k in candidates:
stream_gen = fetch_response_stream_async(h, k, model_key, msgs, cfg, sess.session_id, sess.stop_event, sess.cancel_token)
got_responses = False
async for chunk in stream_gen:
if sess.stop_event.is_set() or sess.cancel_token["cancelled"]:
return
got_responses = True
yield chunk
if got_responses:
return
# If no response from any provider, yield fallback message
yield ("content", RESPONSES["RESPONSE_2"])