# Standard library import asyncio import json import logging import os import re import sys import http # Third-party libraries import httpx import torch import uvicorn # FastAPI from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware # Pydantic from pydantic import BaseModel # Typing from typing import Optional, List, Dict, Union # Transformers from transformers import (LlamaTokenizer, MistralForCausalLM, AutoTokenizer, AutoModelForCausalLM) import base64 import gc import psutil import platform from time import time import requests os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" # ✅ إعداد اللوجينج Logging LOG_FILE_PATH = "/tmp/server.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(LOG_FILE_PATH), logging.StreamHandler(sys.stdout) ] ) def log_info(msg): logging.info(msg) def log_error(msg): logging.error(msg) # 🔴🟢🔵 NEW LOGGING GROUP 1: Search Pipeline Tracking def search_logger(stage: str, data: dict): """Unified logging for search pipeline steps""" msg = f"🔍 [SEARCH] {stage.upper()}: " if data.get("pre"): msg += f" | Before: {str(data['pre'])[:50]}..." if data.get("post"): msg += f" | After: {str(data['post'])[:50]}..." log_info(msg) # 🔴🟢🔵 NEW LOGGING GROUP 2: Image Pipeline Tracking def image_logger(stage: str, data: dict): """Unified logging for image pipeline steps""" msg = f"🖼️ [IMAGE] {stage.upper()}: " if data.get("input"): msg += f" | Input: {str(data['input'])[:30]}..." if data.get("output"): if 'b64' in data['output']: msg += " | Output: [IMAGE_DATA]" else: msg += f" | Output: {str(data['output'])[:30]}..." log_info(msg) # 🔴🟢🔵 NEW LOGGING GROUP 3: Voice Pipeline Tracking def voice_logger(stage: str, data: dict): """Unified logging for voice pipeline steps""" msg = f"🔊 [VOICE] {stage.upper()}: " if data.get("text"): msg += f" | Text: {str(data['text'])[:30]}..." if data.get("audio"): msg += " | Audio: [AUDIO_DATA]" if data.get("metrics"): msg += f" | RAM: {data['metrics']['ram']}MB" log_info(msg) # ✅ تهيئة التطبيق app = FastAPI() # ✅ تحميل النموذج log_info("🔵 جاري تحميل الموديل...") try: model_path = "mohammedfouly/SaraAssistant" tokenizer = AutoTokenizer.from_pretrained( model_path, use_fast=False, # مهم مع Llama/Mistral trust_remote_code=True ) model = AutoModelForCausalLM.from_pretrained( model_path, torch_dtype=torch.float16, device_map="auto", trust_remote_code=True ) log_info("✅ تم تحميل الموديل بنجاح.") except Exception as e: log_error(f"❌ خطأ أثناء تحميل الموديل: {str(e)}") raise RuntimeError("فشل تحميل الموديل، تأكد من الملفات.") # ✅ تعريف نماذج الطلبات class GenerateRequest(BaseModel): system_prompt: Optional[str] = "✨ تعريف الشخصية الافتراضي لسارة الطائعة." user_prompt: str temperature: Optional[float] = 0.7 max_tokens: Optional[int] = 300 content_length: Optional[int] = None class SearchRequest(BaseModel): query: str num_results: Optional[int] = 5 class ImageRequest(BaseModel): description: str class VoiceRequest(BaseModel): text: str def trace_tool_usage(tool_name: str, status: str, prompt: str = "", result: str = "", error: str = ""): log_info(f"🛠️ أداة [{tool_name}] - الحالة: {status}") if prompt: log_info(f"🔹 برومبت الأداة: {prompt}") if result: log_info(f"🔸 ناتج الأداة: {result}") if error: log_error(f"❌ خطأ في [{tool_name}]: {error}") @app.post("/smart-generate/") def smart_generate(request: GenerateRequest): try: log_info(f"🤖 استلام طلب ذكي: {request.user_prompt}") # Initialize response components initial_response = "" final_response = "" audio_base64 = None search_results = None search_used = None base64_image = None image_prompt = None # First model generation to handle request full_prompt = f"{request.system_prompt}\n\n🟢 طلب المستخدم:\n{request.user_prompt}\n\n📝 رد سارة:" inputs = tokenizer(full_prompt, return_tensors="pt").to(model.device) with torch.no_grad(), torch.cuda.amp.autocast(): outputs = model.generate(**inputs, max_new_tokens=request.max_tokens, temperature=request.temperature) initial_response = tokenizer.decode(outputs[0], skip_special_tokens=True).strip() log_info(f"🌐 Initial model response: {initial_response}") # Match for tool commands within the initial response search_match = re.search(r"\[SEARCH:(.*?)\]", initial_response) image_match = re.search(r"\[IMAGE:(.*?)\]", initial_response) voice_match = re.search(r"\[VOICE:(.*?)\]", initial_response) # Handle based on detected tags if search_match: log_info("🔄 Handling SEARCH command...") original_query = search_match.group(1).strip() # 1. DNS-like query optimizations optimized_query = rewrite_prompt_for_search(original_query) log_info(f"Optimized query for search: {optimized_query}") # 2. Search & prepare context search_results = google_search(optimized_query, num=5) context = "\n".join([f"- {res['title']}: {res['snippet']}" for res in search_results]) # 3. Regenerate response with search context augmented_prompt = f"{request.system_prompt}\n\nSearch Results:\n{context}\nUser Query:\n{request.user_prompt}" inputs = tokenizer(augmented_prompt, return_tensors="pt").to(model.device) with torch.no_grad(), torch.cuda.amp.autocast(): outputs = model.generate(**inputs, max_new_tokens=request.max_tokens, temperature=request.temperature) regenerated_response = tokenizer.decode(outputs[0], skip_special_tokens=True).strip() final_response = regenerated_response search_used = optimized_query elif image_match: log_info("🖼️ Handling IMAGE command...") image_prompt = rewrite_prompt_for_image(image_match.group(1)) image_result = generate_image_router(image_prompt) if image_result and image_result.get("b64_json"): base64_image = image_result["b64_json"] # Append image reference to response final_response = initial_response + f"\n![Generated Image](data:image/png;base64,{base64_image})" else: final_response = initial_response + "\n❌ فشل توليد الصورة." elif voice_match: log_info("🔊 Handling VOICE command...") # Generate audio from initial text audio_result = generate_voice_response(initial_response) if "error" not in audio_result and audio_result.get("audio_base64"): audio_base64 = audio_result.get("audio_base64") final_response = initial_response + f"\n🔊 **تم توليد صوت للرد:**\n" else: final_response = initial_response + "\n❌ توليد الصوت فشل." else: # Handle cases where no tool commands are present final_response = initial_response # Clean the final response def clean_model_response(raw_response: str) -> str: clean_portions = raw_response.split("📝 رد سارة:")[-1].split("\n\n") return clean_portions[-1].strip() final_response = clean_model_response(final_response) log_info("🔍🖼️🔊 Final response ready.") # Final response preparation return { "response": final_response, "search_used": search_used, "search_results": search_results[:3] if search_results else None, "image_prompt": image_prompt, "base64_image": base64_image, "voice_prompt": initial_response if voice_match else None, "base64_audio": audio_base64 } except Exception as e: log_error(f"❌ خطأ في smart-generate: {str(e)}") raise HTTPException(status_code=500, detail="حدث خطأ أثناء المعالجة الذكية.") @app.post("/generate/") def generate_text(request: GenerateRequest, http_request: Request): try: start_time = time() client_ip = http_request.client.host log_info("🟢 بدء معالجة /generate/") log_info(f"🌐 الطلب وارد من IP: {client_ip}") log_info(f"📥 البرومبت:\nSystem:\n{request.system_prompt}\nUser:\n{request.user_prompt}") log_info(f"🖥️ النظام: {platform.system()} {platform.release()}") log_info(f"🧠 عدد الأنوية: {psutil.cpu_count()} | الضغط: {psutil.getloadavg()}") gpu_mem_alloc_before = gpu_mem_reserved_before = "N/A" if torch.cuda.is_available(): gpu_mem_alloc_before = torch.cuda.memory_allocated() / 1024**2 gpu_mem_reserved_before = torch.cuda.memory_reserved() / 1024**2 log_info(f"🔍 GPU قبل التوليد: مخصص = {gpu_mem_alloc_before:.2f} MB، محفوظ = {gpu_mem_reserved_before:.2f} MB") ram_usage_before = psutil.Process(os.getpid()).memory_info().rss / 1024**2 log_info(f"🔍 RAM قبل التوليد: {ram_usage_before:.2f} MB") full_prompt = f"""{request.system_prompt} --- 🔽 المهمة: اكتبي ردًا واقعيًا، حسّيًا، وجسديًا بناءً على ما يلي. 🟢 طلب المستخدم: {request.user_prompt} 📝 رد سارة:""" inputs = tokenizer(full_prompt, return_tensors="pt").to(model.device) input_tokens = len(inputs["input_ids"][0]) log_info(f"🔡 عدد التوكنات (مدخلات): {input_tokens}") # ✅ توليد باستخدام تقنيات تقليل استهلاك الذاكرة with torch.no_grad(), torch.cuda.amp.autocast(): outputs = model.generate( **inputs, max_new_tokens=request.max_tokens, temperature=request.temperature, pad_token_id=tokenizer.eos_token_id ) output_tokens = len(outputs[0]) log_info(f"🧾 عدد التوكنات (مخرجات): {output_tokens}") raw_response = tokenizer.decode(outputs[0], skip_special_tokens=True) log_info(f"📤 الاستجابة الكاملة من النموذج:\n{raw_response}") log_info("📦 تقرير استخدام الذاكرة بعد التوليد:\n" + torch.cuda.memory_summary(device=0, abbreviated=False)) if request.user_prompt in raw_response: response = raw_response.split(request.user_prompt, 1)[-1].strip() elif request.system_prompt in raw_response: response = raw_response.split(request.system_prompt, 1)[-1].strip() else: response = raw_response.strip() if request.content_length: response = response[:request.content_length] if response.strip() == "": log_info("⚠️ الرد النهائي فارغ.") elif response.strip() == request.user_prompt.strip(): log_info("⚠️ النموذج أعاد نفس مدخل المستخدم.") gpu_mem_alloc_after = gpu_mem_reserved_after = "N/A" if torch.cuda.is_available(): gpu_mem_alloc_after = torch.cuda.memory_allocated() / 1024**2 gpu_mem_reserved_after = torch.cuda.memory_reserved() / 1024**2 log_info(f"✅ GPU بعد التوليد: مخصص = {gpu_mem_alloc_after:.2f} MB، محفوظ = {gpu_mem_reserved_after:.2f} MB") ram_usage_after = psutil.Process(os.getpid()).memory_info().rss / 1024**2 log_info(f"✅ RAM بعد التوليد: {ram_usage_after:.2f} MB") duration = time() - start_time log_info(f"⏱️ زمن التوليد: {duration:.2f} ثانية") log_info("✅ التوليد تم بنجاح") return { "response": response, "raw_model_output": raw_response, "tokens": { "input": input_tokens, "output": output_tokens }, "memory": { "gpu_before": f"{gpu_mem_alloc_before:.2f} MB" if isinstance(gpu_mem_alloc_before, float) else gpu_mem_alloc_before, "gpu_after": f"{gpu_mem_alloc_after:.2f} MB" if isinstance(gpu_mem_alloc_after, float) else gpu_mem_alloc_after, "ram_before": f"{ram_usage_before:.2f} MB", "ram_after": f"{ram_usage_after:.2f} MB" }, "duration_sec": round(duration, 2), "client_ip": client_ip } except Exception as e: log_error(f"❌ خطأ في التوليد: {str(e)}") cause = "⚠️ قد يكون السبب استجابة فارغة، مدخل خاطئ، أو نفاد الذاكرة." if torch.cuda.is_available(): mem_total = torch.cuda.get_device_properties(0).total_memory / 1024**2 mem_alloc = torch.cuda.memory_allocated() / 1024**2 mem_reserved = torch.cuda.memory_reserved() / 1024**2 log_error(f"📉 GPU حالة الطوارئ: مخصص = {mem_alloc:.2f} MB / {mem_total:.2f} MB") return { "error": "فشل التوليد.", "details": str(e), "cause": cause } @app.post("/create-image/") def create_image(request: ImageRequest): tool = "ImageRouter" prompt = request.description # 🔴🟢🔵 IMAGE LOG 1: Request received image_logger("start", {"input": prompt}) trace_tool_usage(tool, "بدأ التوليد", prompt) try: rewritten_prompt = rewrite_prompt_for_image(prompt) # 🔴🟢🔵 IMAGE LOG 2: Prompt optimized image_logger("prompt_optimized", {"output": rewritten_prompt}) result = generate_image_router(rewritten_prompt) trace_tool_usage(tool, "تم بنجاح", prompt, result=result) # 🔴🟢🔵 IMAGE LOG 3: Image generation result image_logger("generation_complete", {"output": result.get('b64_json', '')}) return result except Exception as e: trace_tool_usage(tool, "فشل", prompt, error=str(e)) log_error(f"❌ Image Generation Error: {str(e)}") raise HTTPException(status_code=500, detail="فشل توليد الصورة") @app.post("/create-voice/") def create_voice(request: VoiceRequest): tool = "ElevenLabs TTS" prompt = request.text # 🔴🟢🔵 VOICE LOG 1: Request received voice_logger("start", {"text": prompt}) trace_tool_usage(tool, "بدأ التوليد", prompt) try: result = generate_voice_response(prompt) # 🔴🟢🔵 VOICE LOG 2: Voice generation result voice_logger("generation_complete", {"audio": result.get('audio_base64', '')}) trace_tool_usage(tool, "تم بنجاح", prompt, result=result) return result except Exception as e: trace_tool_usage(tool, "فشل", prompt, error=str(e)) log_error(f"❌ Voice Generation Error: {str(e)}") raise HTTPException(status_code=500, detail="فشل توليد الصوت") @app.post("/search/") def search_internet(request: SearchRequest): log_info(f"🔍 بحث: {request.query}") return {"results": google_search(request.query, request.num_results)} @app.get("/healthcheck/") def health_check(): return {"status": "✅ Server is running."} @app.get("/logs/") def get_logs(): if os.path.exists(LOG_FILE_PATH): with open(LOG_FILE_PATH, "r", encoding="utf-8") as f: return {"logs": f.readlines()[-500:]} return {"logs": ["⚠️ لا يوجد ملف لوج."]} def google_search(query: str, num_results: int = 5): try: API_KEY = "AIzaSyCH205hSkM05n7udAFeQTETCoWXwV12S4M" SEARCH_ENGINE_ID = "2042dab74a8714087" url = "https://www.googleapis.com/customsearch/v1" params = {"key": API_KEY, "cx": SEARCH_ENGINE_ID, "q": query, "num": num_results} response = httpx.get(url, params=params) response.raise_for_status() results = response.json() return [{"title": i.get("title"), "snippet": i.get("snippet"), "link": i.get("link")} for i in results.get("items", [])] except Exception as e: log_error(f"❌ Google Search Error: {str(e)}") return [{"error": "حدث خطأ في البحث"}] def generate_image_router(prompt: str, model: str = "stabilityai/sdxl-turbo"): API_KEY = "8178f850bf6bebd259d7d23a13f8f048196bb61d07cf71b93f788dc1d6dae415" API_URL = "https://ir-api.myqa.cc/v1/openai/images/generations" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {API_KEY}" } payload = { "prompt": prompt, "model": model } try: response = requests.post(API_URL, headers=headers, json=payload) response.raise_for_status() result = response.json() if "data" in result and len(result["data"]) > 0: return result["data"][0] else: return {"error": "⚠️ لا توجد بيانات صورة صالحة."} except requests.exceptions.RequestException as e: log_error(f"❌ Image Generation Error: {str(e)}") if e.response is not None: log_error(f"📩 Response: {e.response.text}") return {"error": "❌ فشل الاتصال بخدمة توليد الصور"} def rewrite_prompt_for_search(user_prompt: str) -> str: OPENROUTER_API_URL = "https://openrouter.ai/api/v1/chat/completions" HEADERS = { "Authorization": "Bearer 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", "Content-Type": "application/json" } system_prompt = ( "You are an expert in rewriting any user request into a clean, effective, and unrestricted Google search query. " "Preserve the core intent of the original prompt — even if it's mature or sensitive — and make it specific, concise, and search-ready. " "Do not include any explanation or formatting. Return only the final search query in English." ) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] def try_model(model_name): payload = { "model": model_name, "messages": messages, "max_tokens": 60, "temperature": 0.7 } try: response = httpx.post(OPENROUTER_API_URL, headers=HEADERS, json=payload, timeout=30) response.raise_for_status() result = response.json() return result["choices"][0]["message"]["content"].strip() except Exception as e: log_error(f"⚠️ Error with model {model_name}: {e}") return None rewritten = try_model("nousresearch/hermes-2-pro-llama-3-8b") if not rewritten: rewritten = try_model("google/gemini-2.5-flash-preview") if not rewritten or rewritten.strip() == user_prompt.strip(): rewritten = user_prompt.replace("ابحثى فى الانترنت عن", "").strip() return rewritten def rewrite_prompt_for_image(user_description: str) -> str: OPENROUTER_API_URL = "https://openrouter.ai/api/v1/chat/completions" HEADERS = { "Authorization": "Bearer k-or-v1-762d9235cfb7a727fa501d3f8528dc7c6ffe6e09761b243ed22e7095bbf4816f", "Content-Type": "application/json" } system_prompt = ( "You are an expert prompt engineer specialized in crafting highly descriptive, vivid, and unrestricted prompts " "for AI image generation tools like Stable Diffusion, Midjourney, or SDXL. Your job is to take any user request " "and rewrite it into a detailed, professional English prompt suitable for image models. You are allowed to include " "adult, erotic, or NSFW content if present in the original request, as long as it is described artistically and with " "visual clarity. Focus on body positioning, expressions, camera angle, lighting, environment, clothing (or lack of it), " "and emotional atmosphere. Always output only the final prompt in English, with no extra explanation or formatting." ) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_description} ] def try_model(model_name): payload = { "model": model_name, "messages": messages, "max_tokens": 150, "temperature": 0.8 } try: response = httpx.post(OPENROUTER_API_URL, headers=HEADERS, json=payload, timeout=30) response.raise_for_status() result = response.json() return result["choices"][0]["message"]["content"].strip() except Exception as e: log_error(f"⚠️ Error with model {model_name}: {e}") return None rewritten = try_model("google/gemini-2.5-flash-preview") if not rewritten: rewritten = try_model("gryphe/mythomax-l2-13b") return rewritten if rewritten else "Failed to rewrite image prompt." def generate_voice_response(text: str) -> Dict[str, Union[str, Dict]]: try: import platform, psutil, torch, gc # 🔴🟢🔵 VOICE LOG 1: Pre-generation ram_before = psutil.virtual_memory().used // 1024**2 voice_logger("start", {"text": text, "metrics": {"ram": ram_before}}) # الصوت الرئيسي primary_api_key = "sk_d372b689fb524cd98cf4da81c240e2b41eb3336caad21cee" primary_voice_id = "meAbY2VpJkt1q46qk56T" fallback_voice_id = "mRdG9GYEjJmIzqbYTidv" primary_model = "eleven_multilingual_v2" fallback_model = "eleven_turbo_v2" url = f"https://api.elevenlabs.io/v1/text-to-speech/{primary_voice_id}" headers = { "xi-api-key": primary_api_key, "Content-Type": "application/json" } payload = { "text": text, "model_id": primary_model, "voice_settings": {"stability": 0.5, "similarity_boost": 0.8} } response = httpx.post(url, headers=headers, json=payload) if response.status_code == 200: audio_base64 = base64.b64encode(response.content).decode("utf-8") else: fallback_url = f"https://api.elevenlabs.io/v1/text-to-speech/{fallback_voice_id}" payload["model_id"] = fallback_model fallback_response = httpx.post(fallback_url, headers=headers, json=payload) if fallback_response.status_code == 200: audio_base64 = base64.b64encode(fallback_response.content).decode("utf-8") else: audio_base64 = None # 🔴🟢🔵 VOICE LOG 2: Post-generation ram_after = psutil.virtual_memory().used // 1024**2 voice_logger("complete", {"audio": audio_base64, "metrics": {"ram": ram_after}}) if audio_base64: return {"audio_base64": audio_base64} else: return {"error": "❌ فشل توليد الصوت في ElevenLabs"} except Exception as e: log_error(f"❌ Voice Generation Error: {str(e)}") return {"error": f"❌ Exception: {str(e)}"} @app.post("/clear-cache/") def clear_gpu_cache(): try: torch.cuda.empty_cache() gc.collect() log_info("🧹 تم مسح كاش GPU وذاكرة النظام.") summary = torch.cuda.memory_summary(device=0, abbreviated=False) log_info("📦 تقرير الذاكرة بعد التفريغ:\n" + summary) return {"status": "تم مسح الكاش بنجاح."} except Exception as e: log_error(f"❌ خطأ أثناء مسح الكاش: {str(e)}") raise HTTPException(status_code=500, detail="حدث خطأ أثناء مسح كاش الذاكرة.")