coder-demo / app.py
AhmadA82's picture
fix app temp
a189651 verified
raw
history blame
30.7 kB
import os
import re
import json
import time
import sqlite3
import requests
import torch
import numpy as np
from PIL import Image
import gradio as gr
from huggingface_hub import HfApi, snapshot_download
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
BitsAndBytesConfig,
pipeline
)
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from github import Github
import duckduckgo_search
import psutil
import gc
import logging
import threading
from datetime import datetime
# ----- Configuration -----
CONFIG = {
"MAX_CONTEXT_TOKENS": 12000,
"MAX_AUDIT_ATTEMPTS": 5,
"CODER_ENDPOINTS": [
"https://colab1-xyz.run.app/process",
"https://colab2-abc.run.app/process"
],
"FALLBACK_MODEL": "huggingface.co/spaces/your_space/coder-mini",
"GOOGLE_DRIVE_FOLDER_ID": os.getenv("GOOGLE_DRIVE_FOLDER_ID", "your_folder_id"),
"GITHUB_REPO": os.getenv("GITHUB_REPO", "username/repo"),
"MODEL_CONFIG": {
"chat": "AhmadA82/Model",
"coder": "deepseek-ai/deepseek-coder-6.7b",
"vision": "your_vision_model"
},
"SAVE_INTERVAL": 300,
"HF_TOKEN": os.getenv("HF_TOKEN"),
"MAX_IMAGE_SIZE": 3e6, # 3MP
"MINI_MODEL_PATH": "/data/mini_model", # تغيير المسار إلى /data
"HF_CACHE_DIR": "/data/huggingface_cache", # تغيير المسار إلى /data
"SYSTEM_PROMPT": """You are an AI development assistant. Follow these rules:
1. If request is simple (single file, <50 lines), handle it directly
2. For complex requests (multiple files, >50 lines), send to coder
3. Always check code for errors before sending
4. Never execute unsafe code"""
}
# Create cache directory
os.makedirs(CONFIG["HF_CACHE_DIR"], exist_ok=True)
# ----- System Initialization -----
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize database with lock for thread safety
db_lock = threading.Lock() # إضافة قفل لإدارة الوصول المتزامن
def init_db():
conn = sqlite3.connect('/data/sessions.db', check_same_thread=False) # تغيير المسار إلى /data
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS sessions
(id TEXT PRIMARY KEY, context TEXT, last_updated REAL)''')
c.execute('''CREATE TABLE IF NOT EXISTS backups
(id INTEGER PRIMARY KEY, session_id TEXT, diff TEXT,
prompt TEXT, model TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''')
conn.commit()
return conn
db_conn = init_db()
# ----- Hugging Face Integration -----
class HFIntegration:
@staticmethod
def download_mini_model():
"""Download fallback model from Hugging Face Hub"""
if not os.path.exists(CONFIG["MINI_MODEL_PATH"]):
try:
snapshot_download(
repo_id="AhmadA82/Model",
cache_dir=CONFIG["HF_CACHE_DIR"],
local_dir=CONFIG["MINI_MODEL_PATH"],
token=CONFIG["HF_TOKEN"]
)
logger.info("Fallback model downloaded successfully")
except Exception as e:
logger.error(f"Failed to download fallback model: {e}")
@staticmethod
def health_check(endpoints=None):
"""Check health of all endpoints"""
endpoints = endpoints or CONFIG["CODER_ENDPOINTS"]
results = {}
for url in endpoints:
try:
start_time = time.time()
response = requests.get(f"{url}/health", timeout=5, verify=False) # تجاهل التحقق من الشهادات
latency = time.time() - start_time
if response.status_code == 200:
results[url] = {
"status": "healthy",
"latency": round(latency, 2),
"load": response.json().get("load", "N/A")
}
else:
results[url] = {"status": "unhealthy", "error": f"Status {response.status_code}"}
except Exception as e:
results[url] = {"status": "error", "error": str(e)}
return results
@staticmethod
def save_to_hub(file_path, repo_id):
"""Save file to Hugging Face Hub"""
try:
api = HfApi(token=CONFIG["HF_TOKEN"])
api.upload_file(
path_or_fileobj=file_path,
path_in_repo=os.path.basename(file_path),
repo_id=repo_id,
repo_type="space"
)
return f"Saved to Hugging Face Hub: {repo_id}/{os.path.basename(file_path)}"
except: # noqa
logger.error(f"Hugging Face Hub upload failed")
return None
# Download fallback model on startup
HFIntegration.download_mini_model()
# ----- Model Loading with Resource Optimization -----
def load_model(model_name, model_type="chat"):
quant_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16
)
model_config = {
"cache_dir": CONFIG["HF_CACHE_DIR"],
"attn_implementation": "flash_attention_2",
"torch_dtype": torch.bfloat16
}
if model_type == "coder":
model_config["quantization_config"] = quant_config
elif model_type == "vision":
model_config["device_map"] = "auto"
try:
model = AutoModelForCausalLM.from_pretrained(
model_name,
**model_config
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
return model, tokenizer
except Exception as e:
logger.error(f"Model loading failed: {e}")
return None, None
# Load main chat model
chat_model, chat_tokenizer = load_model(CONFIG["MODEL_CONFIG"]["chat"])
# ----- Memory Management -----
class MemoryManager:
def __init__(self):
self.active_models = {}
self.model_usage = {}
def load_model(self, model_name, model_type):
if model_name in self.active_models:
self.model_usage[model_name] = time.time()
return self.active_models[model_name]
# Try to load from Hugging Face
model, tokenizer = load_model(model_name, model_type)
if model and tokenizer:
self.active_models[model_name] = (model, tokenizer)
self.model_usage[model_name] = time.time()
# Unload least recently used model if memory is low
if len(self.active_models) > 2 and psutil.virtual_memory().percent > 85:
oldest_model = min(self.model_usage, key=self.model_usage.get)
self.unload_model(oldest_model)
return model, tokenizer
# Fallback to mini model if available
if os.path.exists(CONFIG["MINI_MODEL_PATH"]):
try:
model = AutoModelForCausalLM.from_pretrained(
CONFIG["MINI_MODEL_PATH"],
device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(
CONFIG["MINI_MODEL_PATH"]
)
self.active_models[model_name] = (model, tokenizer)
self.model_usage[model_name] = time.time()
return model, tokenizer
except Exception as e:
logger.error(f"Mini model loading failed: {e}")
return None, None
def unload_model(self, model_name):
if model_name in self.active_models:
model, tokenizer = self.active_models.pop(model_name)
del model
del tokenizer
gc.collect()
torch.cuda.empty_cache()
self.model_usage.pop(model_name, None)
logger.info(f"Unloaded model: {model_name}")
memory_manager = MemoryManager()
# ----- External Service Integration -----
class ExternalServices:
@staticmethod
def google_drive_upload(file_path):
try:
creds = service_account.Credentials.from_service_account_file(
'/private/credentials.json',
scopes=['https://www.googleapis.com/auth/drive']
)
service = build('drive', 'v3', credentials=creds)
file_metadata = {
'name': os.path.basename(file_path),
'parents': [CONFIG["GOOGLE_DRIVE_FOLDER_ID"]]
}
media = MediaFileUpload(file_path)
file = service.files().create(
body=file_metadata,
media_body=media,
fields='id'
).execute()
return f'https://drive.google.com/file/d/{file.get("id")}'
except Exception as e:
logger.error(f"Google Drive upload failed: {e}")
return None
@staticmethod
def github_commit(file_path, message):
try:
g = Github(os.getenv('GITHUB_TOKEN'))
repo = g.get_repo(CONFIG["GITHUB_REPO"])
with open(file_path, 'r') as file:
content = file.read()
repo.create_file(
path=os.path.basename(file_path),
message=message,
content=content
)
return True
except Exception as e:
logger.error(f"GitHub commit failed: {e}")
return False
@staticmethod
def web_search(query):
try:
results = duckduckgo_search.ddg(query, max_results=3)
return "\n".join([f"{r['title']}: {r['href']}" for r in results])
except Exception as e:
logger.error(f"Web search failed: {e}")
return ""
# ----- Code Auditor -----
class CodeAuditor:
SECURITY_PATTERNS = [
r"exec\(",
r"eval\(",
r"subprocess\.run\(",
r"os\.system\(",
r"pickle\.load\(",
r"__import__\("
]
@staticmethod
def audit_code(code, context):
# Security checks
for pattern in CodeAuditor.SECURITY_PATTERNS:
if re.search(pattern, code):
return False, f"Security risk detected: {pattern}"
# Syntax check
try:
compile(code, '<string>', 'exec')
except SyntaxError as e:
return False, f"Syntax error: {str(e)}"
# Context consistency check
if context:
prompt = f"Review this code for logical errors and consistency with the context:\n\nContext: {context}\n\nCode:\n```python\n{code}\n```"
response = generate_chat_response(prompt, max_tokens=500)
if "error" in response.lower() or "issue" in response.lower():
return False, response
return True, "Code passed all audits"
# ----- Response Generation -----
def generate_chat_response(prompt, max_tokens=1024):
try:
inputs = chat_tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=CONFIG["MAX_CONTEXT_TOKENS"]
).to(chat_model.device)
outputs = chat_model.generate(
**inputs,
max_new_tokens=max_tokens,
temperature=0.7,
top_p=0.9,
pad_token_id=chat_tokenizer.eos_token_id
)
response = chat_tokenizer.decode(
outputs[0],
skip_special_tokens=True
)
return response.replace(prompt, "").strip()
except Exception as e:
logger.error(f"Chat generation failed: {e}")
return "I encountered an error processing your request."
def generate_code_response(prompt, context):
# Try external endpoints first
for endpoint in CONFIG["CODER_ENDPOINTS"]:
try:
response = requests.post(
endpoint,
json={"prompt": prompt, "context": context},
timeout=15,
verify=False # تجاهل التحقق من الشهادات
)
if response.status_code == 200:
return response.json()["code"]
except: # noqa
continue
# Fallback to local model
try:
coder_model, coder_tokenizer = memory_manager.load_model(
CONFIG["MODEL_CONFIG"]["coder"],
"coder"
)
if coder_model is None:
raise Exception("Coder model not available")
full_prompt = f"Context: {context}\n\nTask: {prompt}\n\nCode:"
inputs = coder_tokenizer(
full_prompt,
return_tensors="pt",
truncation=True
).to(coder_model.device)
outputs = coder_model.generate(
**inputs,
max_new_tokens=1024,
temperature=0.5
)
return coder_tokenizer.decode(
outputs[0],
skip_special_tokens=True
).replace(full_prompt, "").strip()
except Exception as e:
logger.error(f"Code generation failed: {e}")
# Final fallback to mini model
return generate_mini_model_response(prompt)
def generate_mini_model_response(prompt):
"""Generate response using fallback mini model"""
try:
# Check if mini model is loaded
if "mini_model" not in memory_manager.active_models:
# Try to load mini model
mini_model = AutoModelForCausalLM.from_pretrained(
CONFIG["MINI_MODEL_PATH"],
device_map="auto"
)
mini_tokenizer = AutoTokenizer.from_pretrained(
CONFIG["MINI_MODEL_PATH"]
)
memory_manager.active_models["mini_model"] = (mini_model, mini_tokenizer)
model, tokenizer = memory_manager.active_models["mini_model"]
inputs = tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=4096
).to(model.device)
outputs = model.generate(
**inputs,
max_new_tokens=512,
temperature=0.7
)
response = tokenizer.decode(
outputs[0],
skip_special_tokens=True
)
return response.replace(prompt, "").strip() + "\n\n⚠️ Using fallback mini model due to system limitations"
except Exception as e:
logger.error(f"Mini model generation failed: {e}")
return "I'm unable to process your request at this time. Please try again later."
# ----- Session Management -----
class SessionManager:
def __init__(self):
self.sessions = {}
self.last_save = time.time()
def get_session(self, session_id):
with db_lock: # استخدام القفل لإدارة الوصول
if session_id not in self.sessions:
c = db_conn.cursor()
c.execute("SELECT context FROM sessions WHERE id=?", (session_id,))
row = c.fetchone()
if row:
self.sessions[session_id] = json.loads(row[0])
else:
self.sessions[session_id] = {
"history": [],
"context": "",
"files": {},
"created_at": time.time()
}
return self.sessions[session_id]
def update_session(self, session_id, user_input, response, files=None):
session = self.get_session(session_id)
session["history"].append({
"user": user_input,
"ai": response,
"timestamp": time.time()
})
# Context summarization when needed
if len(session["history"]) > 10:
self.summarize_context(session_id)
# File management
if files:
session["files"].update(files)
# Periodic saving
if time.time() - self.last_save > CONFIG["SAVE_INTERVAL"]:
self.save_sessions()
self.last_save = time.time()
def summarize_context(self, session_id):
session = self.get_session(session_id)
history_text = "\n".join([
f"[{datetime.fromtimestamp(h['timestamp']).strftime('%H:%M')}] User: {h['user']}\nAI: {h['ai']}"
for h in session["history"]
])
prompt = f"Summarize this conversation concisely while preserving all technical details and requirements:\n\n{history_text}"
summary = generate_chat_response(prompt, max_tokens=500)
session["context"] = summary
session["history"] = session["history"][-5:] # Keep last 5 exchanges
logger.info(f"Context summarized for session {session_id}")
def save_sessions(self):
with db_lock: # استخدام القفل لإدارة الوصول
try:
c = db_conn.cursor()
for session_id, data in self.sessions.items():
serialized = json.dumps({
"history": data["history"],
"context": data["context"],
"files": data["files"],
"created_at": data.get("created_at", time.time())
})
c.execute(
"REPLACE INTO sessions (id, context, last_updated) VALUES (?, ?, ?)",
(session_id, serialized, time.time())
)
db_conn.commit()
logger.info("Sessions saved to database")
# Backup to Hugging Face Hub
with open("/data/sessions_backup.json", "w") as f: # تغيير المسار إلى /data
json.dump(self.sessions, f)
HFIntegration.save_to_hub("/data/sessions_backup.json", "your_hf_username/sessions_backup")
except Exception as e:
logger.error(f"Session save failed: {e}")
session_manager = SessionManager()
# ----- Image Processing -----
def process_image(image, prompt, session_id):
try:
# Check image size
img = Image.fromarray(image.astype('uint8'))
img_size = img.size[0] * img.size[1]
if img_size > CONFIG["MAX_IMAGE_SIZE"]:
return f"Image is too large ({img_size/1e6:.1f}MP). Please resize to under {CONFIG['MAX_IMAGE_SIZE']/1e6}MP."
# Simulate VL model processing
vision_prompt = f"Describe this image in detail for coding purposes: {prompt}"
# In production, use:
# vision_model, vision_processor = memory_manager.load_model(CONFIG["MODEL_CONFIG"]["vision"], "vision")
# inputs = vision_processor(images=img, return_tensors="pt").to(vision_model.device)
# outputs = vision_model.generate(**inputs)
# description = vision_processor.decode(outputs[0], skip_special_tokens=True)
# Simulated description
description = "Image shows a web page layout with header, main content area, and footer."
# Update session with image description
session = session_manager.get_session(session_id)
session["files"]["image_description"] = description
return description
except Exception as e:
logger.error(f"Image processing failed: {e}")
return "Could not process the image."
# ----- Main Processing Flow -----
def handle_request(user_input, session_id, image=None):
# Check if we should wait due to high load
if psutil.cpu_percent() > 90 or psutil.virtual_memory().percent > 90:
return "Systems busy. Your request is queued. Processing may take longer than usual."
session = session_manager.get_session(session_id)
context = session["context"]
# Process image if provided
image_description = ""
if image is not None:
image_description = process_image(image, user_input, session_id)
user_input = f"{user_input}\n\nImage description: {image_description}"
# Decision making: Should we use coder or chat?
decision_prompt = f"""Decide if this request requires coding assistance:
Context: {context}
Request: {user_input}
Respond ONLY with 'CODER' if it requires:
- Complex programming
- File operations
- Code modification
- Technical implementation
Otherwise respond with 'CHAT'"""
decision = generate_chat_response(decision_prompt, max_tokens=10)
use_coder = "CODER" in decision
logger.info(f"Decision: {decision}")
# Handle request
if use_coder:
return handle_coder_request(user_input, context, session_id)
else:
response = generate_chat_response(
f"Context: {context}\n\nUser: {user_input}",
max_tokens=1024
)
session_manager.update_session(session_id, user_input, response)
return response
def handle_coder_request(prompt, context, session_id):
# Prepare context with file information
session = session_manager.get_session(session_id)
file_context = "\n".join([f"{name}: {content[:500]}" for name, content in session["files"].items()])
full_context = f"{context}\n\nFiles:\n{file_context}"
# Generate initial code
code = generate_code_response(prompt, full_context)
# Code auditing loop
for attempt in range(CONFIG["MAX_AUDIT_ATTEMPTS"]):
valid, audit_result = CodeAuditor.audit_code(code, full_context)
if valid:
# Save code to session
session["files"]["generated_code.py"] = code
session_manager.update_session(session_id, prompt, f"Generated code:\n```python\n{code}\n```")
# Create backup
try:
with open("/data/temp_code.py", "w") as f: # تغيير المسار إلى /data
f.write(code)
ExternalServices.github_commit("/data/temp_code.py", f"Code generated for: {prompt}")
HFIntegration.save_to_hub("/data/temp_code.py", "your_hf_username/code_backups")
except Exception as e:
logger.error(f"Backup failed: {e}")
return f"Here's the generated code:\n```python\n{code}\n```"
else:
# Improve code based on audit feedback
improvement_prompt = f"""Improve this code based on the audit feedback:
Original Request: {prompt}
Audit Feedback: {audit_result}
Code to improve:
```python
{code}
```
Revised code:"""
code = generate_code_response(improvement_prompt, full_context)
logger.info(f"Code improvement attempt {attempt+1}")
return "Unable to generate valid code after multiple attempts. Please try a different approach."
# ----- Health Check Endpoint -----
def health_check():
return {
"status": "ok",
"timestamp": time.time(),
"resources": {
"cpu": psutil.cpu_percent(),
"memory": psutil.virtual_memory().percent,
"gpu": "N/A"
},
"sessions": len(session_manager.sessions),
"models_loaded": len(memory_manager.active_models)
}
# ----- Gradio Interface -----
with gr.Blocks(title="AI Development Assistant", theme=gr.themes.Soft()) as demo:
session_id = gr.State(str(time.time()))
with gr.Row():
with gr.Column(scale=3):
chatbot = gr.Chatbot(
height=500,
bubble_full_width=False,
avatar_images=(
"https://cdn-icons-png.flaticon.com/512/4712/4712035.png",
"https://cdn-icons-png.flaticon.com/512/4712/4712134.png"
)
)
msg = gr.Textbox(label="Your Message", placeholder="Type your request here...")
with gr.Row():
submit_btn = gr.Button("Submit", variant="primary")
clear_btn = gr.Button("Clear")
with gr.Accordion("Additional Tools", open=False):
with gr.Row():
image_input = gr.Image(label="Upload Image", type="numpy")
with gr.Column():
drive_btn = gr.Button("Save to Google Drive")
github_btn = gr.Button("Commit to GitHub")
search_btn = gr.Button("Web Search")
hf_btn = gr.Button("Save to Hugging Face")
with gr.Column(scale=2):
gr.Markdown("### System Status")
sys_status = gr.Textbox(label="Resources", interactive=False)
gr.Markdown("### Service Health")
health_status = gr.JSON(label="Endpoints", value={})
gr.Markdown("### Current Files")
file_display = gr.JSON(label="Session Files")
gr.Markdown("### Context Summary")
context_display = gr.Textbox(label="Current Context", interactive=False, lines=5)
def respond(user_input, image, session_id, chat_history):
# Update health status
health_data = HFIntegration.health_check()
# System status monitoring
status = f"CPU: {psutil.cpu_percent()}% | RAM: {psutil.virtual_memory().percent}% | Models: {len(memory_manager.active_models)}"
# Process request
response = handle_request(user_input, session_id, image)
# Update context display
session = session_manager.get_session(session_id)
context = session.get("context", "No context summary yet")
# Format response
chat_history.append((user_input, response))
return "", chat_history, status, context, health_data
def save_to_drive(session_id):
session = session_manager.get_session(session_id)
try:
with open("/data/session_backup.json", "w") as f: # تغيير المسار إلى /data
json.dump(session, f)
link = ExternalServices.google_drive_upload("/data/session_backup.json")
return f"Saved to Google Drive: {link}" if link else "Save failed"
except Exception as e:
return f"Error: {str(e)}"
def save_to_hf(session_id):
session = session_manager.get_session(session_id)
try:
with open("/data/session_backup.json", "w") as f: # تغيير المسار إلى /data
json.dump(session, f)
result = HFIntegration.save_to_hub("/data/session_backup.json", "your_hf_username/session_backup")
return result if result else "Save to Hugging Face failed"
except Exception as e:
return f"Error: {str(e)}"
def web_search(query):
return ExternalServices.web_search(query)
def update_context_display(session_id):
session = session_manager.get_session(session_id)
return session.get("context", "No context summary yet")
msg.submit(
respond,
[msg, image_input, session_id, chatbot],
[msg, chatbot, sys_status, context_display, health_status]
)
submit_btn.click(
respond,
[msg, image_input, session_id, chatbot],
[msg, chatbot, sys_status, context_display, health_status]
)
drive_btn.click(
save_to_drive,
[session_id],
context_display
)
hf_btn.click(
save_to_hf,
[session_id],
context_display
)
search_btn.click(
web_search,
[msg],
context_display
)
clear_btn.click(
lambda: ([], "", {}, ""),
[],
[chatbot, msg, file_display, context_display],
queue=False
)
demo.load(
lambda sid: update_context_display(sid),
[session_id],
[context_display]
)
# ----- System Monitoring -----
def system_monitor():
while True:
try:
# Log system status
status = {
"timestamp": time.time(),
"cpu": psutil.cpu_percent(),
"memory": psutil.virtual_memory().percent,
"sessions": len(session_manager.sessions),
"models_loaded": len(memory_manager.active_models)
}
logger.info(f"System Status: {status}")
# Save sessions periodically
session_manager.save_sessions()
# Check service health
health = HFIntegration.health_check()
logger.info(f"Service Health: {json.dumps(health, indent=2)}")
# Cleanup old sessions (older than 24 hours)
current_time = time.time()
for sid, session in list(session_manager.sessions.items()):
if current_time - session.get("created_at", current_time) > 86400:
del session_manager.sessions[sid]
logger.info(f"Cleaned up old session: {sid}")
time.sleep(60)
except Exception as e:
logger.error(f"Monitor thread error: {e}")
time.sleep(10)
# Start monitoring in background thread
monitor_thread = threading.Thread(target=system_monitor, daemon=True)
monitor_thread.start()
# ----- Hugging Face Spaces Entry Point -----
if __name__ == "__main__":
# For Hugging Face Spaces
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
favicon_path="https://cdn-icons-png.flaticon.com/512/4712/4712035.png"
)