Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import time | |
| import sqlite3 | |
| import requests | |
| import torch | |
| import numpy as np | |
| from PIL import Image | |
| import gradio as gr | |
| from huggingface_hub import HfApi, snapshot_download | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| BitsAndBytesConfig, | |
| pipeline | |
| ) | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaFileUpload | |
| from github import Github | |
| import duckduckgo_search | |
| import psutil | |
| import gc | |
| import logging | |
| import threading | |
| from datetime import datetime | |
| # ----- Configuration ----- | |
| CONFIG = { | |
| "MAX_CONTEXT_TOKENS": 12000, | |
| "MAX_AUDIT_ATTEMPTS": 5, | |
| "CODER_ENDPOINTS": [ | |
| "https://colab1-xyz.run.app/process", | |
| "https://colab2-abc.run.app/process" | |
| ], | |
| "FALLBACK_MODEL": "huggingface.co/spaces/your_space/coder-mini", | |
| "GOOGLE_DRIVE_FOLDER_ID": os.getenv("GOOGLE_DRIVE_FOLDER_ID", "your_folder_id"), | |
| "GITHUB_REPO": os.getenv("GITHUB_REPO", "username/repo"), | |
| "MODEL_CONFIG": { | |
| "chat": "AhmadA82/Model", | |
| "coder": "deepseek-ai/deepseek-coder-6.7b", | |
| "vision": "your_vision_model" | |
| }, | |
| "SAVE_INTERVAL": 300, | |
| "HF_TOKEN": os.getenv("HF_TOKEN"), | |
| "MAX_IMAGE_SIZE": 3e6, # 3MP | |
| "MINI_MODEL_PATH": "/data/mini_model", # تغيير المسار إلى /data | |
| "HF_CACHE_DIR": "/data/huggingface_cache", # تغيير المسار إلى /data | |
| "SYSTEM_PROMPT": """You are an AI development assistant. Follow these rules: | |
| 1. If request is simple (single file, <50 lines), handle it directly | |
| 2. For complex requests (multiple files, >50 lines), send to coder | |
| 3. Always check code for errors before sending | |
| 4. Never execute unsafe code""" | |
| } | |
| # Create cache directory | |
| os.makedirs(CONFIG["HF_CACHE_DIR"], exist_ok=True) | |
| # ----- System Initialization ----- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize database with lock for thread safety | |
| db_lock = threading.Lock() # إضافة قفل لإدارة الوصول المتزامن | |
| def init_db(): | |
| conn = sqlite3.connect('/data/sessions.db', check_same_thread=False) # تغيير المسار إلى /data | |
| c = conn.cursor() | |
| c.execute('''CREATE TABLE IF NOT EXISTS sessions | |
| (id TEXT PRIMARY KEY, context TEXT, last_updated REAL)''') | |
| c.execute('''CREATE TABLE IF NOT EXISTS backups | |
| (id INTEGER PRIMARY KEY, session_id TEXT, diff TEXT, | |
| prompt TEXT, model TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''') | |
| conn.commit() | |
| return conn | |
| db_conn = init_db() | |
| # ----- Hugging Face Integration ----- | |
| class HFIntegration: | |
| def download_mini_model(): | |
| """Download fallback model from Hugging Face Hub""" | |
| if not os.path.exists(CONFIG["MINI_MODEL_PATH"]): | |
| try: | |
| snapshot_download( | |
| repo_id="AhmadA82/Model", | |
| cache_dir=CONFIG["HF_CACHE_DIR"], | |
| local_dir=CONFIG["MINI_MODEL_PATH"], | |
| token=CONFIG["HF_TOKEN"] | |
| ) | |
| logger.info("Fallback model downloaded successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to download fallback model: {e}") | |
| def health_check(endpoints=None): | |
| """Check health of all endpoints""" | |
| endpoints = endpoints or CONFIG["CODER_ENDPOINTS"] | |
| results = {} | |
| for url in endpoints: | |
| try: | |
| start_time = time.time() | |
| response = requests.get(f"{url}/health", timeout=5, verify=False) # تجاهل التحقق من الشهادات | |
| latency = time.time() - start_time | |
| if response.status_code == 200: | |
| results[url] = { | |
| "status": "healthy", | |
| "latency": round(latency, 2), | |
| "load": response.json().get("load", "N/A") | |
| } | |
| else: | |
| results[url] = {"status": "unhealthy", "error": f"Status {response.status_code}"} | |
| except Exception as e: | |
| results[url] = {"status": "error", "error": str(e)} | |
| return results | |
| def save_to_hub(file_path, repo_id): | |
| """Save file to Hugging Face Hub""" | |
| try: | |
| api = HfApi(token=CONFIG["HF_TOKEN"]) | |
| api.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=os.path.basename(file_path), | |
| repo_id=repo_id, | |
| repo_type="space" | |
| ) | |
| return f"Saved to Hugging Face Hub: {repo_id}/{os.path.basename(file_path)}" | |
| except: # noqa | |
| logger.error(f"Hugging Face Hub upload failed") | |
| return None | |
| # Download fallback model on startup | |
| HFIntegration.download_mini_model() | |
| # ----- Model Loading with Resource Optimization ----- | |
| def load_model(model_name, model_type="chat"): | |
| quant_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_compute_dtype=torch.bfloat16 | |
| ) | |
| model_config = { | |
| "cache_dir": CONFIG["HF_CACHE_DIR"], | |
| "attn_implementation": "flash_attention_2", | |
| "torch_dtype": torch.bfloat16 | |
| } | |
| if model_type == "coder": | |
| model_config["quantization_config"] = quant_config | |
| elif model_type == "vision": | |
| model_config["device_map"] = "auto" | |
| try: | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| **model_config | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| return model, tokenizer | |
| except Exception as e: | |
| logger.error(f"Model loading failed: {e}") | |
| return None, None | |
| # Load main chat model | |
| chat_model, chat_tokenizer = load_model(CONFIG["MODEL_CONFIG"]["chat"]) | |
| # ----- Memory Management ----- | |
| class MemoryManager: | |
| def __init__(self): | |
| self.active_models = {} | |
| self.model_usage = {} | |
| def load_model(self, model_name, model_type): | |
| if model_name in self.active_models: | |
| self.model_usage[model_name] = time.time() | |
| return self.active_models[model_name] | |
| # Try to load from Hugging Face | |
| model, tokenizer = load_model(model_name, model_type) | |
| if model and tokenizer: | |
| self.active_models[model_name] = (model, tokenizer) | |
| self.model_usage[model_name] = time.time() | |
| # Unload least recently used model if memory is low | |
| if len(self.active_models) > 2 and psutil.virtual_memory().percent > 85: | |
| oldest_model = min(self.model_usage, key=self.model_usage.get) | |
| self.unload_model(oldest_model) | |
| return model, tokenizer | |
| # Fallback to mini model if available | |
| if os.path.exists(CONFIG["MINI_MODEL_PATH"]): | |
| try: | |
| model = AutoModelForCausalLM.from_pretrained( | |
| CONFIG["MINI_MODEL_PATH"], | |
| device_map="auto" | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| CONFIG["MINI_MODEL_PATH"] | |
| ) | |
| self.active_models[model_name] = (model, tokenizer) | |
| self.model_usage[model_name] = time.time() | |
| return model, tokenizer | |
| except Exception as e: | |
| logger.error(f"Mini model loading failed: {e}") | |
| return None, None | |
| def unload_model(self, model_name): | |
| if model_name in self.active_models: | |
| model, tokenizer = self.active_models.pop(model_name) | |
| del model | |
| del tokenizer | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| self.model_usage.pop(model_name, None) | |
| logger.info(f"Unloaded model: {model_name}") | |
| memory_manager = MemoryManager() | |
| # ----- External Service Integration ----- | |
| class ExternalServices: | |
| def google_drive_upload(file_path): | |
| try: | |
| creds = service_account.Credentials.from_service_account_file( | |
| '/private/credentials.json', | |
| scopes=['https://www.googleapis.com/auth/drive'] | |
| ) | |
| service = build('drive', 'v3', credentials=creds) | |
| file_metadata = { | |
| 'name': os.path.basename(file_path), | |
| 'parents': [CONFIG["GOOGLE_DRIVE_FOLDER_ID"]] | |
| } | |
| media = MediaFileUpload(file_path) | |
| file = service.files().create( | |
| body=file_metadata, | |
| media_body=media, | |
| fields='id' | |
| ).execute() | |
| return f'https://drive.google.com/file/d/{file.get("id")}' | |
| except Exception as e: | |
| logger.error(f"Google Drive upload failed: {e}") | |
| return None | |
| def github_commit(file_path, message): | |
| try: | |
| g = Github(os.getenv('GITHUB_TOKEN')) | |
| repo = g.get_repo(CONFIG["GITHUB_REPO"]) | |
| with open(file_path, 'r') as file: | |
| content = file.read() | |
| repo.create_file( | |
| path=os.path.basename(file_path), | |
| message=message, | |
| content=content | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error(f"GitHub commit failed: {e}") | |
| return False | |
| def web_search(query): | |
| try: | |
| results = duckduckgo_search.ddg(query, max_results=3) | |
| return "\n".join([f"{r['title']}: {r['href']}" for r in results]) | |
| except Exception as e: | |
| logger.error(f"Web search failed: {e}") | |
| return "" | |
| # ----- Code Auditor ----- | |
| class CodeAuditor: | |
| SECURITY_PATTERNS = [ | |
| r"exec\(", | |
| r"eval\(", | |
| r"subprocess\.run\(", | |
| r"os\.system\(", | |
| r"pickle\.load\(", | |
| r"__import__\(" | |
| ] | |
| def audit_code(code, context): | |
| # Security checks | |
| for pattern in CodeAuditor.SECURITY_PATTERNS: | |
| if re.search(pattern, code): | |
| return False, f"Security risk detected: {pattern}" | |
| # Syntax check | |
| try: | |
| compile(code, '<string>', 'exec') | |
| except SyntaxError as e: | |
| return False, f"Syntax error: {str(e)}" | |
| # Context consistency check | |
| if context: | |
| prompt = f"Review this code for logical errors and consistency with the context:\n\nContext: {context}\n\nCode:\n```python\n{code}\n```" | |
| response = generate_chat_response(prompt, max_tokens=500) | |
| if "error" in response.lower() or "issue" in response.lower(): | |
| return False, response | |
| return True, "Code passed all audits" | |
| # ----- Response Generation ----- | |
| def generate_chat_response(prompt, max_tokens=1024): | |
| try: | |
| inputs = chat_tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=CONFIG["MAX_CONTEXT_TOKENS"] | |
| ).to(chat_model.device) | |
| outputs = chat_model.generate( | |
| **inputs, | |
| max_new_tokens=max_tokens, | |
| temperature=0.7, | |
| top_p=0.9, | |
| pad_token_id=chat_tokenizer.eos_token_id | |
| ) | |
| response = chat_tokenizer.decode( | |
| outputs[0], | |
| skip_special_tokens=True | |
| ) | |
| return response.replace(prompt, "").strip() | |
| except Exception as e: | |
| logger.error(f"Chat generation failed: {e}") | |
| return "I encountered an error processing your request." | |
| def generate_code_response(prompt, context): | |
| # Try external endpoints first | |
| for endpoint in CONFIG["CODER_ENDPOINTS"]: | |
| try: | |
| response = requests.post( | |
| endpoint, | |
| json={"prompt": prompt, "context": context}, | |
| timeout=15, | |
| verify=False # تجاهل التحقق من الشهادات | |
| ) | |
| if response.status_code == 200: | |
| return response.json()["code"] | |
| except: # noqa | |
| continue | |
| # Fallback to local model | |
| try: | |
| coder_model, coder_tokenizer = memory_manager.load_model( | |
| CONFIG["MODEL_CONFIG"]["coder"], | |
| "coder" | |
| ) | |
| if coder_model is None: | |
| raise Exception("Coder model not available") | |
| full_prompt = f"Context: {context}\n\nTask: {prompt}\n\nCode:" | |
| inputs = coder_tokenizer( | |
| full_prompt, | |
| return_tensors="pt", | |
| truncation=True | |
| ).to(coder_model.device) | |
| outputs = coder_model.generate( | |
| **inputs, | |
| max_new_tokens=1024, | |
| temperature=0.5 | |
| ) | |
| return coder_tokenizer.decode( | |
| outputs[0], | |
| skip_special_tokens=True | |
| ).replace(full_prompt, "").strip() | |
| except Exception as e: | |
| logger.error(f"Code generation failed: {e}") | |
| # Final fallback to mini model | |
| return generate_mini_model_response(prompt) | |
| def generate_mini_model_response(prompt): | |
| """Generate response using fallback mini model""" | |
| try: | |
| # Check if mini model is loaded | |
| if "mini_model" not in memory_manager.active_models: | |
| # Try to load mini model | |
| mini_model = AutoModelForCausalLM.from_pretrained( | |
| CONFIG["MINI_MODEL_PATH"], | |
| device_map="auto" | |
| ) | |
| mini_tokenizer = AutoTokenizer.from_pretrained( | |
| CONFIG["MINI_MODEL_PATH"] | |
| ) | |
| memory_manager.active_models["mini_model"] = (mini_model, mini_tokenizer) | |
| model, tokenizer = memory_manager.active_models["mini_model"] | |
| inputs = tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=4096 | |
| ).to(model.device) | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=512, | |
| temperature=0.7 | |
| ) | |
| response = tokenizer.decode( | |
| outputs[0], | |
| skip_special_tokens=True | |
| ) | |
| return response.replace(prompt, "").strip() + "\n\n⚠️ Using fallback mini model due to system limitations" | |
| except Exception as e: | |
| logger.error(f"Mini model generation failed: {e}") | |
| return "I'm unable to process your request at this time. Please try again later." | |
| # ----- Session Management ----- | |
| class SessionManager: | |
| def __init__(self): | |
| self.sessions = {} | |
| self.last_save = time.time() | |
| def get_session(self, session_id): | |
| with db_lock: # استخدام القفل لإدارة الوصول | |
| if session_id not in self.sessions: | |
| c = db_conn.cursor() | |
| c.execute("SELECT context FROM sessions WHERE id=?", (session_id,)) | |
| row = c.fetchone() | |
| if row: | |
| self.sessions[session_id] = json.loads(row[0]) | |
| else: | |
| self.sessions[session_id] = { | |
| "history": [], | |
| "context": "", | |
| "files": {}, | |
| "created_at": time.time() | |
| } | |
| return self.sessions[session_id] | |
| def update_session(self, session_id, user_input, response, files=None): | |
| session = self.get_session(session_id) | |
| session["history"].append({ | |
| "user": user_input, | |
| "ai": response, | |
| "timestamp": time.time() | |
| }) | |
| # Context summarization when needed | |
| if len(session["history"]) > 10: | |
| self.summarize_context(session_id) | |
| # File management | |
| if files: | |
| session["files"].update(files) | |
| # Periodic saving | |
| if time.time() - self.last_save > CONFIG["SAVE_INTERVAL"]: | |
| self.save_sessions() | |
| self.last_save = time.time() | |
| def summarize_context(self, session_id): | |
| session = self.get_session(session_id) | |
| history_text = "\n".join([ | |
| f"[{datetime.fromtimestamp(h['timestamp']).strftime('%H:%M')}] User: {h['user']}\nAI: {h['ai']}" | |
| for h in session["history"] | |
| ]) | |
| prompt = f"Summarize this conversation concisely while preserving all technical details and requirements:\n\n{history_text}" | |
| summary = generate_chat_response(prompt, max_tokens=500) | |
| session["context"] = summary | |
| session["history"] = session["history"][-5:] # Keep last 5 exchanges | |
| logger.info(f"Context summarized for session {session_id}") | |
| def save_sessions(self): | |
| with db_lock: # استخدام القفل لإدارة الوصول | |
| try: | |
| c = db_conn.cursor() | |
| for session_id, data in self.sessions.items(): | |
| serialized = json.dumps({ | |
| "history": data["history"], | |
| "context": data["context"], | |
| "files": data["files"], | |
| "created_at": data.get("created_at", time.time()) | |
| }) | |
| c.execute( | |
| "REPLACE INTO sessions (id, context, last_updated) VALUES (?, ?, ?)", | |
| (session_id, serialized, time.time()) | |
| ) | |
| db_conn.commit() | |
| logger.info("Sessions saved to database") | |
| # Backup to Hugging Face Hub | |
| with open("/data/sessions_backup.json", "w") as f: # تغيير المسار إلى /data | |
| json.dump(self.sessions, f) | |
| HFIntegration.save_to_hub("/data/sessions_backup.json", "your_hf_username/sessions_backup") | |
| except Exception as e: | |
| logger.error(f"Session save failed: {e}") | |
| session_manager = SessionManager() | |
| # ----- Image Processing ----- | |
| def process_image(image, prompt, session_id): | |
| try: | |
| # Check image size | |
| img = Image.fromarray(image.astype('uint8')) | |
| img_size = img.size[0] * img.size[1] | |
| if img_size > CONFIG["MAX_IMAGE_SIZE"]: | |
| return f"Image is too large ({img_size/1e6:.1f}MP). Please resize to under {CONFIG['MAX_IMAGE_SIZE']/1e6}MP." | |
| # Simulate VL model processing | |
| vision_prompt = f"Describe this image in detail for coding purposes: {prompt}" | |
| # In production, use: | |
| # vision_model, vision_processor = memory_manager.load_model(CONFIG["MODEL_CONFIG"]["vision"], "vision") | |
| # inputs = vision_processor(images=img, return_tensors="pt").to(vision_model.device) | |
| # outputs = vision_model.generate(**inputs) | |
| # description = vision_processor.decode(outputs[0], skip_special_tokens=True) | |
| # Simulated description | |
| description = "Image shows a web page layout with header, main content area, and footer." | |
| # Update session with image description | |
| session = session_manager.get_session(session_id) | |
| session["files"]["image_description"] = description | |
| return description | |
| except Exception as e: | |
| logger.error(f"Image processing failed: {e}") | |
| return "Could not process the image." | |
| # ----- Main Processing Flow ----- | |
| def handle_request(user_input, session_id, image=None): | |
| # Check if we should wait due to high load | |
| if psutil.cpu_percent() > 90 or psutil.virtual_memory().percent > 90: | |
| return "Systems busy. Your request is queued. Processing may take longer than usual." | |
| session = session_manager.get_session(session_id) | |
| context = session["context"] | |
| # Process image if provided | |
| image_description = "" | |
| if image is not None: | |
| image_description = process_image(image, user_input, session_id) | |
| user_input = f"{user_input}\n\nImage description: {image_description}" | |
| # Decision making: Should we use coder or chat? | |
| decision_prompt = f"""Decide if this request requires coding assistance: | |
| Context: {context} | |
| Request: {user_input} | |
| Respond ONLY with 'CODER' if it requires: | |
| - Complex programming | |
| - File operations | |
| - Code modification | |
| - Technical implementation | |
| Otherwise respond with 'CHAT'""" | |
| decision = generate_chat_response(decision_prompt, max_tokens=10) | |
| use_coder = "CODER" in decision | |
| logger.info(f"Decision: {decision}") | |
| # Handle request | |
| if use_coder: | |
| return handle_coder_request(user_input, context, session_id) | |
| else: | |
| response = generate_chat_response( | |
| f"Context: {context}\n\nUser: {user_input}", | |
| max_tokens=1024 | |
| ) | |
| session_manager.update_session(session_id, user_input, response) | |
| return response | |
| def handle_coder_request(prompt, context, session_id): | |
| # Prepare context with file information | |
| session = session_manager.get_session(session_id) | |
| file_context = "\n".join([f"{name}: {content[:500]}" for name, content in session["files"].items()]) | |
| full_context = f"{context}\n\nFiles:\n{file_context}" | |
| # Generate initial code | |
| code = generate_code_response(prompt, full_context) | |
| # Code auditing loop | |
| for attempt in range(CONFIG["MAX_AUDIT_ATTEMPTS"]): | |
| valid, audit_result = CodeAuditor.audit_code(code, full_context) | |
| if valid: | |
| # Save code to session | |
| session["files"]["generated_code.py"] = code | |
| session_manager.update_session(session_id, prompt, f"Generated code:\n```python\n{code}\n```") | |
| # Create backup | |
| try: | |
| with open("/data/temp_code.py", "w") as f: # تغيير المسار إلى /data | |
| f.write(code) | |
| ExternalServices.github_commit("/data/temp_code.py", f"Code generated for: {prompt}") | |
| HFIntegration.save_to_hub("/data/temp_code.py", "your_hf_username/code_backups") | |
| except Exception as e: | |
| logger.error(f"Backup failed: {e}") | |
| return f"Here's the generated code:\n```python\n{code}\n```" | |
| else: | |
| # Improve code based on audit feedback | |
| improvement_prompt = f"""Improve this code based on the audit feedback: | |
| Original Request: {prompt} | |
| Audit Feedback: {audit_result} | |
| Code to improve: | |
| ```python | |
| {code} | |
| ``` | |
| Revised code:""" | |
| code = generate_code_response(improvement_prompt, full_context) | |
| logger.info(f"Code improvement attempt {attempt+1}") | |
| return "Unable to generate valid code after multiple attempts. Please try a different approach." | |
| # ----- Health Check Endpoint ----- | |
| def health_check(): | |
| return { | |
| "status": "ok", | |
| "timestamp": time.time(), | |
| "resources": { | |
| "cpu": psutil.cpu_percent(), | |
| "memory": psutil.virtual_memory().percent, | |
| "gpu": "N/A" | |
| }, | |
| "sessions": len(session_manager.sessions), | |
| "models_loaded": len(memory_manager.active_models) | |
| } | |
| # ----- Gradio Interface ----- | |
| with gr.Blocks(title="AI Development Assistant", theme=gr.themes.Soft()) as demo: | |
| session_id = gr.State(str(time.time())) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot( | |
| height=500, | |
| bubble_full_width=False, | |
| avatar_images=( | |
| "https://cdn-icons-png.flaticon.com/512/4712/4712035.png", | |
| "https://cdn-icons-png.flaticon.com/512/4712/4712134.png" | |
| ) | |
| ) | |
| msg = gr.Textbox(label="Your Message", placeholder="Type your request here...") | |
| with gr.Row(): | |
| submit_btn = gr.Button("Submit", variant="primary") | |
| clear_btn = gr.Button("Clear") | |
| with gr.Accordion("Additional Tools", open=False): | |
| with gr.Row(): | |
| image_input = gr.Image(label="Upload Image", type="numpy") | |
| with gr.Column(): | |
| drive_btn = gr.Button("Save to Google Drive") | |
| github_btn = gr.Button("Commit to GitHub") | |
| search_btn = gr.Button("Web Search") | |
| hf_btn = gr.Button("Save to Hugging Face") | |
| with gr.Column(scale=2): | |
| gr.Markdown("### System Status") | |
| sys_status = gr.Textbox(label="Resources", interactive=False) | |
| gr.Markdown("### Service Health") | |
| health_status = gr.JSON(label="Endpoints", value={}) | |
| gr.Markdown("### Current Files") | |
| file_display = gr.JSON(label="Session Files") | |
| gr.Markdown("### Context Summary") | |
| context_display = gr.Textbox(label="Current Context", interactive=False, lines=5) | |
| def respond(user_input, image, session_id, chat_history): | |
| # Update health status | |
| health_data = HFIntegration.health_check() | |
| # System status monitoring | |
| status = f"CPU: {psutil.cpu_percent()}% | RAM: {psutil.virtual_memory().percent}% | Models: {len(memory_manager.active_models)}" | |
| # Process request | |
| response = handle_request(user_input, session_id, image) | |
| # Update context display | |
| session = session_manager.get_session(session_id) | |
| context = session.get("context", "No context summary yet") | |
| # Format response | |
| chat_history.append((user_input, response)) | |
| return "", chat_history, status, context, health_data | |
| def save_to_drive(session_id): | |
| session = session_manager.get_session(session_id) | |
| try: | |
| with open("/data/session_backup.json", "w") as f: # تغيير المسار إلى /data | |
| json.dump(session, f) | |
| link = ExternalServices.google_drive_upload("/data/session_backup.json") | |
| return f"Saved to Google Drive: {link}" if link else "Save failed" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def save_to_hf(session_id): | |
| session = session_manager.get_session(session_id) | |
| try: | |
| with open("/data/session_backup.json", "w") as f: # تغيير المسار إلى /data | |
| json.dump(session, f) | |
| result = HFIntegration.save_to_hub("/data/session_backup.json", "your_hf_username/session_backup") | |
| return result if result else "Save to Hugging Face failed" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def web_search(query): | |
| return ExternalServices.web_search(query) | |
| def update_context_display(session_id): | |
| session = session_manager.get_session(session_id) | |
| return session.get("context", "No context summary yet") | |
| msg.submit( | |
| respond, | |
| [msg, image_input, session_id, chatbot], | |
| [msg, chatbot, sys_status, context_display, health_status] | |
| ) | |
| submit_btn.click( | |
| respond, | |
| [msg, image_input, session_id, chatbot], | |
| [msg, chatbot, sys_status, context_display, health_status] | |
| ) | |
| drive_btn.click( | |
| save_to_drive, | |
| [session_id], | |
| context_display | |
| ) | |
| hf_btn.click( | |
| save_to_hf, | |
| [session_id], | |
| context_display | |
| ) | |
| search_btn.click( | |
| web_search, | |
| [msg], | |
| context_display | |
| ) | |
| clear_btn.click( | |
| lambda: ([], "", {}, ""), | |
| [], | |
| [chatbot, msg, file_display, context_display], | |
| queue=False | |
| ) | |
| demo.load( | |
| lambda sid: update_context_display(sid), | |
| [session_id], | |
| [context_display] | |
| ) | |
| # ----- System Monitoring ----- | |
| def system_monitor(): | |
| while True: | |
| try: | |
| # Log system status | |
| status = { | |
| "timestamp": time.time(), | |
| "cpu": psutil.cpu_percent(), | |
| "memory": psutil.virtual_memory().percent, | |
| "sessions": len(session_manager.sessions), | |
| "models_loaded": len(memory_manager.active_models) | |
| } | |
| logger.info(f"System Status: {status}") | |
| # Save sessions periodically | |
| session_manager.save_sessions() | |
| # Check service health | |
| health = HFIntegration.health_check() | |
| logger.info(f"Service Health: {json.dumps(health, indent=2)}") | |
| # Cleanup old sessions (older than 24 hours) | |
| current_time = time.time() | |
| for sid, session in list(session_manager.sessions.items()): | |
| if current_time - session.get("created_at", current_time) > 86400: | |
| del session_manager.sessions[sid] | |
| logger.info(f"Cleaned up old session: {sid}") | |
| time.sleep(60) | |
| except Exception as e: | |
| logger.error(f"Monitor thread error: {e}") | |
| time.sleep(10) | |
| # Start monitoring in background thread | |
| monitor_thread = threading.Thread(target=system_monitor, daemon=True) | |
| monitor_thread.start() | |
| # ----- Hugging Face Spaces Entry Point ----- | |
| if __name__ == "__main__": | |
| # For Hugging Face Spaces | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| favicon_path="https://cdn-icons-png.flaticon.com/512/4712/4712035.png" | |
| ) |