Spaces:
Runtime error
Runtime error
| # | |
| # SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| import asyncio | |
| import httpx | |
| import json | |
| import random | |
| import uuid | |
| from src.config import * | |
| from src.cores.server import fetch_response_stream_async | |
| from src.cores.session import ensure_stop_event, get_model_key | |
| async def chat_with_model_async(history, user_input, model_display, sess, custom_prompt, deep_search): | |
| """ | |
| Core async function to interact with AI model. | |
| Prepares message history, system instructions, and optionally integrates deep search results. | |
| Tries multiple backend hosts and keys with fallback. | |
| Yields streamed responses for UI updates. | |
| """ | |
| ensure_stop_event(sess) | |
| sess.stop_event.clear() | |
| sess.cancel_token["cancelled"] = False | |
| if not LINUX_SERVER_PROVIDER_KEYS or not LINUX_SERVER_HOSTS: | |
| yield ("content", RESPONSES["RESPONSE_3"]) # No providers available | |
| return | |
| if not hasattr(sess, "session_id") or not sess.session_id: | |
| sess.session_id = str(uuid.uuid4()) | |
| model_key = get_model_key(model_display, MODEL_MAPPING, DEFAULT_MODEL_KEY) | |
| cfg = MODEL_CONFIG.get(model_key, DEFAULT_CONFIG) | |
| msgs = [] | |
| # If deep search enabled and using primary model, prepend deep search instructions and results | |
| if deep_search and model_display == MODEL_CHOICES[0]: | |
| msgs.append({"role": "system", "content": DEEP_SEARCH_INSTRUCTIONS}) | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| payload = { | |
| "query": user_input, | |
| "topic": "general", | |
| "search_depth": "basic", | |
| "chunks_per_source": 5, | |
| "max_results": 5, | |
| "time_range": None, | |
| "days": 7, | |
| "include_answer": True, | |
| "include_raw_content": False, | |
| "include_images": False, | |
| "include_image_descriptions": False, | |
| "include_domains": [], | |
| "exclude_domains": [] | |
| } | |
| r = await client.post(DEEP_SEARCH_PROVIDER_HOST, headers={"Authorization": f"Bearer {DEEP_SEARCH_PROVIDER_KEY}"}, json=payload) | |
| sr_json = r.json() | |
| msgs.append({"role": "system", "content": json.dumps(sr_json)}) | |
| except Exception: | |
| # Fail silently if deep search fails | |
| pass | |
| msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS}) | |
| elif model_display == MODEL_CHOICES[0]: | |
| # For primary model without deep search, use internal instructions | |
| msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS}) | |
| else: | |
| # For other models, use default instructions | |
| msgs.append({"role": "system", "content": custom_prompt or SYSTEM_PROMPT_MAPPING.get(model_key, SYSTEM_PROMPT_DEFAULT)}) | |
| # Append conversation history alternating user and assistant messages | |
| msgs.extend([{"role": "user", "content": u} for u, _ in history]) | |
| msgs.extend([{"role": "assistant", "content": a} for _, a in history if a]) | |
| # Append current user input | |
| msgs.append({"role": "user", "content": user_input}) | |
| # Shuffle provider hosts and keys for load balancing and fallback | |
| candidates = [(h, k) for h in LINUX_SERVER_HOSTS for k in LINUX_SERVER_PROVIDER_KEYS] | |
| random.shuffle(candidates) | |
| # Try each host-key pair until a successful response is received | |
| for h, k in candidates: | |
| stream_gen = fetch_response_stream_async(h, k, model_key, msgs, cfg, sess.session_id, sess.stop_event, sess.cancel_token) | |
| got_responses = False | |
| async for chunk in stream_gen: | |
| if sess.stop_event.is_set() or sess.cancel_token["cancelled"]: | |
| return | |
| got_responses = True | |
| yield chunk | |
| if got_responses: | |
| return | |
| # If no response from any provider, yield fallback message | |
| yield ("content", RESPONSES["RESPONSE_2"]) | |