import os import re import json import time import sqlite3 import requests import torch import numpy as np from PIL import Image import gradio as gr from huggingface_hub import HfApi, snapshot_download from transformers import ( AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, pipeline ) from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from github import Github import duckduckgo_search import psutil import gc import logging import threading from datetime import datetime # ----- Configuration ----- CONFIG = { "MAX_CONTEXT_TOKENS": 12000, "MAX_AUDIT_ATTEMPTS": 5, "CODER_ENDPOINTS": [ "https://colab1-xyz.run.app/process", "https://colab2-abc.run.app/process" ], "FALLBACK_MODEL": "huggingface.co/spaces/your_space/coder-mini", "GOOGLE_DRIVE_FOLDER_ID": os.getenv("GOOGLE_DRIVE_FOLDER_ID", "your_folder_id"), "GITHUB_REPO": os.getenv("GITHUB_REPO", "username/repo"), "MODEL_CONFIG": { "chat": "AhmadA82/Model", "coder": "deepseek-ai/deepseek-coder-6.7b", "vision": "your_vision_model" }, "SAVE_INTERVAL": 300, "HF_TOKEN": os.getenv("HF_TOKEN"), "MAX_IMAGE_SIZE": 3e6, # 3MP "MINI_MODEL_PATH": "/data/mini_model", # تغيير المسار إلى /data "HF_CACHE_DIR": "/data/huggingface_cache", # تغيير المسار إلى /data "SYSTEM_PROMPT": """You are an AI development assistant. Follow these rules: 1. If request is simple (single file, <50 lines), handle it directly 2. For complex requests (multiple files, >50 lines), send to coder 3. Always check code for errors before sending 4. Never execute unsafe code""" } # Create cache directory os.makedirs(CONFIG["HF_CACHE_DIR"], exist_ok=True) # ----- System Initialization ----- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize database with lock for thread safety db_lock = threading.Lock() # إضافة قفل لإدارة الوصول المتزامن def init_db(): conn = sqlite3.connect('/data/sessions.db', check_same_thread=False) # تغيير المسار إلى /data c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS sessions (id TEXT PRIMARY KEY, context TEXT, last_updated REAL)''') c.execute('''CREATE TABLE IF NOT EXISTS backups (id INTEGER PRIMARY KEY, session_id TEXT, diff TEXT, prompt TEXT, model TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''') conn.commit() return conn db_conn = init_db() # ----- Hugging Face Integration ----- class HFIntegration: @staticmethod def download_mini_model(): """Download fallback model from Hugging Face Hub""" if not os.path.exists(CONFIG["MINI_MODEL_PATH"]): try: snapshot_download( repo_id="AhmadA82/Model", cache_dir=CONFIG["HF_CACHE_DIR"], local_dir=CONFIG["MINI_MODEL_PATH"], token=CONFIG["HF_TOKEN"] ) logger.info("Fallback model downloaded successfully") except Exception as e: logger.error(f"Failed to download fallback model: {e}") @staticmethod def health_check(endpoints=None): """Check health of all endpoints""" endpoints = endpoints or CONFIG["CODER_ENDPOINTS"] results = {} for url in endpoints: try: start_time = time.time() response = requests.get(f"{url}/health", timeout=5, verify=False) # تجاهل التحقق من الشهادات latency = time.time() - start_time if response.status_code == 200: results[url] = { "status": "healthy", "latency": round(latency, 2), "load": response.json().get("load", "N/A") } else: results[url] = {"status": "unhealthy", "error": f"Status {response.status_code}"} except Exception as e: results[url] = {"status": "error", "error": str(e)} return results @staticmethod def save_to_hub(file_path, repo_id): """Save file to Hugging Face Hub""" try: api = HfApi(token=CONFIG["HF_TOKEN"]) api.upload_file( path_or_fileobj=file_path, path_in_repo=os.path.basename(file_path), repo_id=repo_id, repo_type="space" ) return f"Saved to Hugging Face Hub: {repo_id}/{os.path.basename(file_path)}" except: # noqa logger.error(f"Hugging Face Hub upload failed") return None # Download fallback model on startup HFIntegration.download_mini_model() # ----- Model Loading with Resource Optimization ----- def load_model(model_name, model_type="chat"): quant_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16 ) model_config = { "cache_dir": CONFIG["HF_CACHE_DIR"], "attn_implementation": "flash_attention_2", "torch_dtype": torch.bfloat16 } if model_type == "coder": model_config["quantization_config"] = quant_config elif model_type == "vision": model_config["device_map"] = "auto" try: model = AutoModelForCausalLM.from_pretrained( model_name, **model_config ) tokenizer = AutoTokenizer.from_pretrained(model_name) return model, tokenizer except Exception as e: logger.error(f"Model loading failed: {e}") return None, None # Load main chat model chat_model, chat_tokenizer = load_model(CONFIG["MODEL_CONFIG"]["chat"]) # ----- Memory Management ----- class MemoryManager: def __init__(self): self.active_models = {} self.model_usage = {} def load_model(self, model_name, model_type): if model_name in self.active_models: self.model_usage[model_name] = time.time() return self.active_models[model_name] # Try to load from Hugging Face model, tokenizer = load_model(model_name, model_type) if model and tokenizer: self.active_models[model_name] = (model, tokenizer) self.model_usage[model_name] = time.time() # Unload least recently used model if memory is low if len(self.active_models) > 2 and psutil.virtual_memory().percent > 85: oldest_model = min(self.model_usage, key=self.model_usage.get) self.unload_model(oldest_model) return model, tokenizer # Fallback to mini model if available if os.path.exists(CONFIG["MINI_MODEL_PATH"]): try: model = AutoModelForCausalLM.from_pretrained( CONFIG["MINI_MODEL_PATH"], device_map="auto" ) tokenizer = AutoTokenizer.from_pretrained( CONFIG["MINI_MODEL_PATH"] ) self.active_models[model_name] = (model, tokenizer) self.model_usage[model_name] = time.time() return model, tokenizer except Exception as e: logger.error(f"Mini model loading failed: {e}") return None, None def unload_model(self, model_name): if model_name in self.active_models: model, tokenizer = self.active_models.pop(model_name) del model del tokenizer gc.collect() torch.cuda.empty_cache() self.model_usage.pop(model_name, None) logger.info(f"Unloaded model: {model_name}") memory_manager = MemoryManager() # ----- External Service Integration ----- class ExternalServices: @staticmethod def google_drive_upload(file_path): try: creds = service_account.Credentials.from_service_account_file( '/private/credentials.json', scopes=['https://www.googleapis.com/auth/drive'] ) service = build('drive', 'v3', credentials=creds) file_metadata = { 'name': os.path.basename(file_path), 'parents': [CONFIG["GOOGLE_DRIVE_FOLDER_ID"]] } media = MediaFileUpload(file_path) file = service.files().create( body=file_metadata, media_body=media, fields='id' ).execute() return f'https://drive.google.com/file/d/{file.get("id")}' except Exception as e: logger.error(f"Google Drive upload failed: {e}") return None @staticmethod def github_commit(file_path, message): try: g = Github(os.getenv('GITHUB_TOKEN')) repo = g.get_repo(CONFIG["GITHUB_REPO"]) with open(file_path, 'r') as file: content = file.read() repo.create_file( path=os.path.basename(file_path), message=message, content=content ) return True except Exception as e: logger.error(f"GitHub commit failed: {e}") return False @staticmethod def web_search(query): try: results = duckduckgo_search.ddg(query, max_results=3) return "\n".join([f"{r['title']}: {r['href']}" for r in results]) except Exception as e: logger.error(f"Web search failed: {e}") return "" # ----- Code Auditor ----- class CodeAuditor: SECURITY_PATTERNS = [ r"exec\(", r"eval\(", r"subprocess\.run\(", r"os\.system\(", r"pickle\.load\(", r"__import__\(" ] @staticmethod def audit_code(code, context): # Security checks for pattern in CodeAuditor.SECURITY_PATTERNS: if re.search(pattern, code): return False, f"Security risk detected: {pattern}" # Syntax check try: compile(code, '', 'exec') except SyntaxError as e: return False, f"Syntax error: {str(e)}" # Context consistency check if context: prompt = f"Review this code for logical errors and consistency with the context:\n\nContext: {context}\n\nCode:\n```python\n{code}\n```" response = generate_chat_response(prompt, max_tokens=500) if "error" in response.lower() or "issue" in response.lower(): return False, response return True, "Code passed all audits" # ----- Response Generation ----- def generate_chat_response(prompt, max_tokens=1024): try: inputs = chat_tokenizer( prompt, return_tensors="pt", truncation=True, max_length=CONFIG["MAX_CONTEXT_TOKENS"] ).to(chat_model.device) outputs = chat_model.generate( **inputs, max_new_tokens=max_tokens, temperature=0.7, top_p=0.9, pad_token_id=chat_tokenizer.eos_token_id ) response = chat_tokenizer.decode( outputs[0], skip_special_tokens=True ) return response.replace(prompt, "").strip() except Exception as e: logger.error(f"Chat generation failed: {e}") return "I encountered an error processing your request." def generate_code_response(prompt, context): # Try external endpoints first for endpoint in CONFIG["CODER_ENDPOINTS"]: try: response = requests.post( endpoint, json={"prompt": prompt, "context": context}, timeout=15, verify=False # تجاهل التحقق من الشهادات ) if response.status_code == 200: return response.json()["code"] except: # noqa continue # Fallback to local model try: coder_model, coder_tokenizer = memory_manager.load_model( CONFIG["MODEL_CONFIG"]["coder"], "coder" ) if coder_model is None: raise Exception("Coder model not available") full_prompt = f"Context: {context}\n\nTask: {prompt}\n\nCode:" inputs = coder_tokenizer( full_prompt, return_tensors="pt", truncation=True ).to(coder_model.device) outputs = coder_model.generate( **inputs, max_new_tokens=1024, temperature=0.5 ) return coder_tokenizer.decode( outputs[0], skip_special_tokens=True ).replace(full_prompt, "").strip() except Exception as e: logger.error(f"Code generation failed: {e}") # Final fallback to mini model return generate_mini_model_response(prompt) def generate_mini_model_response(prompt): """Generate response using fallback mini model""" try: # Check if mini model is loaded if "mini_model" not in memory_manager.active_models: # Try to load mini model mini_model = AutoModelForCausalLM.from_pretrained( CONFIG["MINI_MODEL_PATH"], device_map="auto" ) mini_tokenizer = AutoTokenizer.from_pretrained( CONFIG["MINI_MODEL_PATH"] ) memory_manager.active_models["mini_model"] = (mini_model, mini_tokenizer) model, tokenizer = memory_manager.active_models["mini_model"] inputs = tokenizer( prompt, return_tensors="pt", truncation=True, max_length=4096 ).to(model.device) outputs = model.generate( **inputs, max_new_tokens=512, temperature=0.7 ) response = tokenizer.decode( outputs[0], skip_special_tokens=True ) return response.replace(prompt, "").strip() + "\n\n⚠️ Using fallback mini model due to system limitations" except Exception as e: logger.error(f"Mini model generation failed: {e}") return "I'm unable to process your request at this time. Please try again later." # ----- Session Management ----- class SessionManager: def __init__(self): self.sessions = {} self.last_save = time.time() def get_session(self, session_id): with db_lock: # استخدام القفل لإدارة الوصول if session_id not in self.sessions: c = db_conn.cursor() c.execute("SELECT context FROM sessions WHERE id=?", (session_id,)) row = c.fetchone() if row: self.sessions[session_id] = json.loads(row[0]) else: self.sessions[session_id] = { "history": [], "context": "", "files": {}, "created_at": time.time() } return self.sessions[session_id] def update_session(self, session_id, user_input, response, files=None): session = self.get_session(session_id) session["history"].append({ "user": user_input, "ai": response, "timestamp": time.time() }) # Context summarization when needed if len(session["history"]) > 10: self.summarize_context(session_id) # File management if files: session["files"].update(files) # Periodic saving if time.time() - self.last_save > CONFIG["SAVE_INTERVAL"]: self.save_sessions() self.last_save = time.time() def summarize_context(self, session_id): session = self.get_session(session_id) history_text = "\n".join([ f"[{datetime.fromtimestamp(h['timestamp']).strftime('%H:%M')}] User: {h['user']}\nAI: {h['ai']}" for h in session["history"] ]) prompt = f"Summarize this conversation concisely while preserving all technical details and requirements:\n\n{history_text}" summary = generate_chat_response(prompt, max_tokens=500) session["context"] = summary session["history"] = session["history"][-5:] # Keep last 5 exchanges logger.info(f"Context summarized for session {session_id}") def save_sessions(self): with db_lock: # استخدام القفل لإدارة الوصول try: c = db_conn.cursor() for session_id, data in self.sessions.items(): serialized = json.dumps({ "history": data["history"], "context": data["context"], "files": data["files"], "created_at": data.get("created_at", time.time()) }) c.execute( "REPLACE INTO sessions (id, context, last_updated) VALUES (?, ?, ?)", (session_id, serialized, time.time()) ) db_conn.commit() logger.info("Sessions saved to database") # Backup to Hugging Face Hub with open("/data/sessions_backup.json", "w") as f: # تغيير المسار إلى /data json.dump(self.sessions, f) HFIntegration.save_to_hub("/data/sessions_backup.json", "your_hf_username/sessions_backup") except Exception as e: logger.error(f"Session save failed: {e}") session_manager = SessionManager() # ----- Image Processing ----- def process_image(image, prompt, session_id): try: # Check image size img = Image.fromarray(image.astype('uint8')) img_size = img.size[0] * img.size[1] if img_size > CONFIG["MAX_IMAGE_SIZE"]: return f"Image is too large ({img_size/1e6:.1f}MP). Please resize to under {CONFIG['MAX_IMAGE_SIZE']/1e6}MP." # Simulate VL model processing vision_prompt = f"Describe this image in detail for coding purposes: {prompt}" # In production, use: # vision_model, vision_processor = memory_manager.load_model(CONFIG["MODEL_CONFIG"]["vision"], "vision") # inputs = vision_processor(images=img, return_tensors="pt").to(vision_model.device) # outputs = vision_model.generate(**inputs) # description = vision_processor.decode(outputs[0], skip_special_tokens=True) # Simulated description description = "Image shows a web page layout with header, main content area, and footer." # Update session with image description session = session_manager.get_session(session_id) session["files"]["image_description"] = description return description except Exception as e: logger.error(f"Image processing failed: {e}") return "Could not process the image." # ----- Main Processing Flow ----- def handle_request(user_input, session_id, image=None): # Check if we should wait due to high load if psutil.cpu_percent() > 90 or psutil.virtual_memory().percent > 90: return "Systems busy. Your request is queued. Processing may take longer than usual." session = session_manager.get_session(session_id) context = session["context"] # Process image if provided image_description = "" if image is not None: image_description = process_image(image, user_input, session_id) user_input = f"{user_input}\n\nImage description: {image_description}" # Decision making: Should we use coder or chat? decision_prompt = f"""Decide if this request requires coding assistance: Context: {context} Request: {user_input} Respond ONLY with 'CODER' if it requires: - Complex programming - File operations - Code modification - Technical implementation Otherwise respond with 'CHAT'""" decision = generate_chat_response(decision_prompt, max_tokens=10) use_coder = "CODER" in decision logger.info(f"Decision: {decision}") # Handle request if use_coder: return handle_coder_request(user_input, context, session_id) else: response = generate_chat_response( f"Context: {context}\n\nUser: {user_input}", max_tokens=1024 ) session_manager.update_session(session_id, user_input, response) return response def handle_coder_request(prompt, context, session_id): # Prepare context with file information session = session_manager.get_session(session_id) file_context = "\n".join([f"{name}: {content[:500]}" for name, content in session["files"].items()]) full_context = f"{context}\n\nFiles:\n{file_context}" # Generate initial code code = generate_code_response(prompt, full_context) # Code auditing loop for attempt in range(CONFIG["MAX_AUDIT_ATTEMPTS"]): valid, audit_result = CodeAuditor.audit_code(code, full_context) if valid: # Save code to session session["files"]["generated_code.py"] = code session_manager.update_session(session_id, prompt, f"Generated code:\n```python\n{code}\n```") # Create backup try: with open("/data/temp_code.py", "w") as f: # تغيير المسار إلى /data f.write(code) ExternalServices.github_commit("/data/temp_code.py", f"Code generated for: {prompt}") HFIntegration.save_to_hub("/data/temp_code.py", "your_hf_username/code_backups") except Exception as e: logger.error(f"Backup failed: {e}") return f"Here's the generated code:\n```python\n{code}\n```" else: # Improve code based on audit feedback improvement_prompt = f"""Improve this code based on the audit feedback: Original Request: {prompt} Audit Feedback: {audit_result} Code to improve: ```python {code} ``` Revised code:""" code = generate_code_response(improvement_prompt, full_context) logger.info(f"Code improvement attempt {attempt+1}") return "Unable to generate valid code after multiple attempts. Please try a different approach." # ----- Health Check Endpoint ----- def health_check(): return { "status": "ok", "timestamp": time.time(), "resources": { "cpu": psutil.cpu_percent(), "memory": psutil.virtual_memory().percent, "gpu": "N/A" }, "sessions": len(session_manager.sessions), "models_loaded": len(memory_manager.active_models) } # ----- Gradio Interface ----- with gr.Blocks(title="AI Development Assistant", theme=gr.themes.Soft()) as demo: session_id = gr.State(str(time.time())) with gr.Row(): with gr.Column(scale=3): chatbot = gr.Chatbot( height=500, bubble_full_width=False, avatar_images=( "https://cdn-icons-png.flaticon.com/512/4712/4712035.png", "https://cdn-icons-png.flaticon.com/512/4712/4712134.png" ) ) msg = gr.Textbox(label="Your Message", placeholder="Type your request here...") with gr.Row(): submit_btn = gr.Button("Submit", variant="primary") clear_btn = gr.Button("Clear") with gr.Accordion("Additional Tools", open=False): with gr.Row(): image_input = gr.Image(label="Upload Image", type="numpy") with gr.Column(): drive_btn = gr.Button("Save to Google Drive") github_btn = gr.Button("Commit to GitHub") search_btn = gr.Button("Web Search") hf_btn = gr.Button("Save to Hugging Face") with gr.Column(scale=2): gr.Markdown("### System Status") sys_status = gr.Textbox(label="Resources", interactive=False) gr.Markdown("### Service Health") health_status = gr.JSON(label="Endpoints", value={}) gr.Markdown("### Current Files") file_display = gr.JSON(label="Session Files") gr.Markdown("### Context Summary") context_display = gr.Textbox(label="Current Context", interactive=False, lines=5) def respond(user_input, image, session_id, chat_history): # Update health status health_data = HFIntegration.health_check() # System status monitoring status = f"CPU: {psutil.cpu_percent()}% | RAM: {psutil.virtual_memory().percent}% | Models: {len(memory_manager.active_models)}" # Process request response = handle_request(user_input, session_id, image) # Update context display session = session_manager.get_session(session_id) context = session.get("context", "No context summary yet") # Format response chat_history.append((user_input, response)) return "", chat_history, status, context, health_data def save_to_drive(session_id): session = session_manager.get_session(session_id) try: with open("/data/session_backup.json", "w") as f: # تغيير المسار إلى /data json.dump(session, f) link = ExternalServices.google_drive_upload("/data/session_backup.json") return f"Saved to Google Drive: {link}" if link else "Save failed" except Exception as e: return f"Error: {str(e)}" def save_to_hf(session_id): session = session_manager.get_session(session_id) try: with open("/data/session_backup.json", "w") as f: # تغيير المسار إلى /data json.dump(session, f) result = HFIntegration.save_to_hub("/data/session_backup.json", "your_hf_username/session_backup") return result if result else "Save to Hugging Face failed" except Exception as e: return f"Error: {str(e)}" def web_search(query): return ExternalServices.web_search(query) def update_context_display(session_id): session = session_manager.get_session(session_id) return session.get("context", "No context summary yet") msg.submit( respond, [msg, image_input, session_id, chatbot], [msg, chatbot, sys_status, context_display, health_status] ) submit_btn.click( respond, [msg, image_input, session_id, chatbot], [msg, chatbot, sys_status, context_display, health_status] ) drive_btn.click( save_to_drive, [session_id], context_display ) hf_btn.click( save_to_hf, [session_id], context_display ) search_btn.click( web_search, [msg], context_display ) clear_btn.click( lambda: ([], "", {}, ""), [], [chatbot, msg, file_display, context_display], queue=False ) demo.load( lambda sid: update_context_display(sid), [session_id], [context_display] ) # ----- System Monitoring ----- def system_monitor(): while True: try: # Log system status status = { "timestamp": time.time(), "cpu": psutil.cpu_percent(), "memory": psutil.virtual_memory().percent, "sessions": len(session_manager.sessions), "models_loaded": len(memory_manager.active_models) } logger.info(f"System Status: {status}") # Save sessions periodically session_manager.save_sessions() # Check service health health = HFIntegration.health_check() logger.info(f"Service Health: {json.dumps(health, indent=2)}") # Cleanup old sessions (older than 24 hours) current_time = time.time() for sid, session in list(session_manager.sessions.items()): if current_time - session.get("created_at", current_time) > 86400: del session_manager.sessions[sid] logger.info(f"Cleaned up old session: {sid}") time.sleep(60) except Exception as e: logger.error(f"Monitor thread error: {e}") time.sleep(10) # Start monitoring in background thread monitor_thread = threading.Thread(target=system_monitor, daemon=True) monitor_thread.start() # ----- Hugging Face Spaces Entry Point ----- if __name__ == "__main__": # For Hugging Face Spaces demo.launch( server_name="0.0.0.0", server_port=7860, share=False, favicon_path="https://cdn-icons-png.flaticon.com/512/4712/4712035.png" )