Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| import requests | |
| import random | |
| import time | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any, Tuple | |
| import threading | |
| from dataclasses import dataclass, field | |
| import hashlib | |
| import sqlite3 | |
| import base64 | |
| from io import BytesIO | |
| import os | |
| # Hugging Face Imports | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| from diffusers import StableDiffusionPipeline | |
| import torch | |
| from huggingface_hub import InferenceApi | |
| # Model configurations | |
| TEXT_GENERATION_MODELS = { | |
| "Qwen2.5-3B-Instruct": { | |
| "model_id": "Qwen/Qwen2.5-3B-Instruct", | |
| "description": "Efficient instruction-following model", | |
| "max_tokens": 512, | |
| "temperature": 0.7 | |
| }, | |
| "Mistral-7B-Instruct": { | |
| "model_id": "mistralai/Mistral-7B-Instruct-v0.2", | |
| "description": "Popular conversational model", | |
| "max_tokens": 1024, | |
| "temperature": 0.8 | |
| }, | |
| "Llama-3.2-1B-Instruct": { | |
| "model_id": "meta-llama/Llama-3.2-1B-Instruct", | |
| "description": "Meta's small efficient model", | |
| "max_tokens": 512, | |
| "temperature": 0.6 | |
| }, | |
| "GPT2": { | |
| "model_id": "openai-community/gpt2", | |
| "description": "Classic text generation", | |
| "max_tokens": 256, | |
| "temperature": 0.9 | |
| } | |
| } | |
| IMAGE_GENERATION_MODELS = { | |
| "FLUX.1-dev": { | |
| "model_id": "black-forest-labs/FLUX.1-dev", | |
| "description": "High quality image generation" | |
| }, | |
| "Stable-Diffusion-XL": { | |
| "model_id": "stabilityai/stable-diffusion-xl-base-1.0", | |
| "description": "Reliable stable diffusion" | |
| }, | |
| "Z-Image-Turbo": { | |
| "model_id": "Tongyi-MAI/Z-Image-Turbo", | |
| "description": "Fast image generation" | |
| } | |
| } | |
| # Database Setup | |
| class DatabaseManager: | |
| def __init__(self, db_path="cult_simulator.db"): | |
| self.db_path = db_path | |
| self.init_database() | |
| def init_database(self): | |
| """Initialize SQLite database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Create tables | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS personalities ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| personality_type TEXT, | |
| avatar_prompt TEXT, | |
| avatar_image BLOB, | |
| traits TEXT, -- JSON | |
| background_story TEXT, | |
| system_prompt TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS conversations ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| personality_id INTEGER, | |
| message_content TEXT, | |
| context TEXT, -- JSON | |
| response_model TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (personality_id) REFERENCES personalities (id) | |
| ) | |
| ''') | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS webhooks ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| personality_id INTEGER, | |
| webhook_url TEXT, | |
| discord_channel_id TEXT, | |
| is_active BOOLEAN DEFAULT 1, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (personality_id) REFERENCES personalities (id) | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| def save_personality(self, name, personality_type, avatar_prompt, avatar_blob, traits, background_story, system_prompt): | |
| """Save personality to database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO personalities (name, personality_type, avatar_prompt, avatar_image, traits, background_story, system_prompt) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| ''', (name, personality_type, avatar_prompt, avatar_blob, json.dumps(traits), background_story, system_prompt)) | |
| personality_id = cursor.lastrowid | |
| conn.commit() | |
| conn.close() | |
| return personality_id | |
| def save_conversation(self, personality_id, message_content, context, response_model): | |
| """Save conversation to database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO conversations (personality_id, message_content, context, response_model) | |
| VALUES (?, ?, ?, ?) | |
| ''', (personality_id, message_content, json.dumps(context), response_model)) | |
| conn.commit() | |
| conn.close() | |
| def get_personalities(self): | |
| """Get all personalities""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute('SELECT * FROM personalities ORDER BY created_at DESC') | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return rows | |
| def get_personalities_with_webhooks(self): | |
| """Get personalities with their webhooks""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT p.*, w.webhook_url, w.discord_channel_id, w.is_active | |
| FROM personalities p | |
| LEFT JOIN webhooks w ON p.id = w.personality_id | |
| ORDER BY p.created_at DESC | |
| ''') | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return rows | |
| class HuggingFaceModelManager: | |
| """Manage Hugging Face models for text and image generation""" | |
| def __init__(self): | |
| self.text_models = {} | |
| self.image_models = {} | |
| self.loaded_models = {} | |
| def load_text_model(self, model_key): | |
| """Load a text generation model""" | |
| if model_key not in self.loaded_models: | |
| config = TEXT_GENERATION_MODELS[model_key] | |
| try: | |
| self.loaded_models[model_key] = pipeline( | |
| "text-generation", | |
| model=config["model_id"], | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| device_map="auto" if torch.cuda.is_available() else None | |
| ) | |
| print(f"β Loaded {model_key}") | |
| except Exception as e: | |
| print(f"β Error loading {model_key}: {e}") | |
| return None | |
| return self.loaded_models[model_key] | |
| def generate_text(self, model_key, prompt, max_length=200): | |
| """Generate text using specified model""" | |
| model = self.load_text_model(model_key) | |
| if not model: | |
| return f"Error: Could not load model {model_key}" | |
| try: | |
| config = TEXT_GENERATION_MODELS[model_key] | |
| result = model( | |
| prompt, | |
| max_new_tokens=max_length, | |
| temperature=config["temperature"], | |
| do_sample=True, | |
| pad_token_id=model.tokenizer.eos_token_id | |
| ) | |
| generated_text = result[0]['generated_text'] | |
| # Remove the prompt from the response | |
| if generated_text.startswith(prompt): | |
| generated_text = generated_text[len(prompt):].strip() | |
| return generated_text | |
| except Exception as e: | |
| return f"Error generating text: {e}" | |
| def generate_avatar(self, model_key, personality_description): | |
| """Generate avatar using image model""" | |
| config = IMAGE_GENERATION_MODELS[model_key] | |
| # Create prompt for avatar | |
| avatar_prompt = f""" | |
| Portrait of {personality_description}, professional headshot, | |
| realistic style, soft lighting, detailed facial features, | |
| professional attire, clean background, high quality | |
| """ | |
| try: | |
| # Use Hugging Face Inference API for image generation | |
| # (Simpler than loading local models) | |
| inference = InferenceApi(repo_id=config["model_id"], token=os.getenv("HF_TOKEN")) | |
| # For demonstration, we'll return a placeholder URL | |
| # In production, you'd call the actual inference | |
| avatar_url = f"https://image.pollinations.ai/prompt/{avatar_prompt.replace(' ', '%20')}" | |
| return avatar_url | |
| except Exception as e: | |
| return f"https://api.dicebear.com/7.x/avataaars/svg?seed={hash(personality_description)}" | |
| class PersonalityGenerator: | |
| """Generate AI personalities using language models""" | |
| def __init__(self, model_manager): | |
| self.model_manager = model_manager | |
| def generate_personality_traits(self, model_key, context=""): | |
| """Generate personality traits using AI""" | |
| prompt = f""" | |
| Generate a detailed personality profile for a fictional character. | |
| Include specific personality traits, communication style, background story. | |
| {context} | |
| Format as JSON: | |
| {{ | |
| "name": "Character name", | |
| "traits": {{ | |
| "welcoming": 0.8, | |
| "empathetic": 0.7, | |
| "cautious": 0.3, | |
| "enthusiastic": 0.6, | |
| "mysterious": 0.4 | |
| }}, | |
| "background_story": "Brief background", | |
| "communication_style": "How they talk", | |
| "description": "Physical appearance description" | |
| }} | |
| """ | |
| response = self.model_manager.generate_text(model_key, prompt, max_length=300) | |
| try: | |
| # Extract JSON from response | |
| import re | |
| json_match = re.search(r'\\{.*\\}', response, re.DOTALL) | |
| if json_match: | |
| return json.loads(json_match.group()) | |
| except: | |
| pass | |
| # Fallback to basic personality | |
| return { | |
| "name": f"AI_Personality_{random.randint(1000, 9999)}", | |
| "traits": { | |
| "welcoming": random.uniform(0.5, 1.0), | |
| "empathetic": random.uniform(0.3, 0.9), | |
| "cautious": random.uniform(0.2, 0.8), | |
| "enthusiastic": random.uniform(0.4, 1.0), | |
| "mysterious": random.uniform(0.1, 0.7) | |
| }, | |
| "background_story": "An AI-generated personality created for social simulation.", | |
| "communication_style": "Friendly and engaging", | |
| "description": "A unique AI-generated character" | |
| } | |
| def generate_system_prompt(self, personality_data, context=""): | |
| """Generate dynamic system prompt for AI personality""" | |
| traits_desc = [] | |
| for trait, value in personality_data["traits"].items(): | |
| if value > 0.7: | |
| traits_desc.append(f"very {trait}") | |
| elif value > 0.4: | |
| traits_desc.append(f"somewhat {trait}") | |
| prompt = f""" | |
| You are {personality_data["name"]}, a character in a social simulation. | |
| Your personality: {', '.join(traits_desc)}. | |
| Background: {personality_data["background_story"]} | |
| Communication style: {personality_data["communication_style"]} | |
| {context} | |
| Respond naturally as this character would, maintaining your personality traits. | |
| Be engaging but authentic to your character. | |
| """ | |
| return prompt.strip() | |
| class CultSimulatorApp: | |
| """Main application class""" | |
| def __init__(self): | |
| self.db = DatabaseManager() | |
| self.model_manager = HuggingFaceModelManager() | |
| self.personality_generator = PersonalityGenerator(self.model_manager) | |
| self.active_personalities = [] | |
| self.simulation_running = False | |
| def create_personality(self, name, model_key, context=""): | |
| """Create a new AI personality""" | |
| if not name: | |
| name = f"AI_Character_{random.randint(1000, 9999)}" | |
| # Generate personality traits | |
| personality_data = self.personality_generator.generate_personality_traits(model_key, context) | |
| personality_data["name"] = name | |
| # Generate system prompt | |
| system_prompt = self.personality_generator.generate_system_prompt(personality_data) | |
| # Generate avatar | |
| avatar_url = self.model_manager.generate_avatar("FLUX.1-dev", personality_data["description"]) | |
| # Save to database | |
| personality_id = self.db.save_personality( | |
| name=name, | |
| personality_type=model_key, | |
| avatar_prompt=personality_data["description"], | |
| avatar_blob=avatar_url.encode(), # Store URL as bytes for demo | |
| traits=personality_data["traits"], | |
| background_story=personality_data["background_story"], | |
| system_prompt=system_prompt | |
| ) | |
| personality_data["id"] = personality_id | |
| personality_data["avatar_url"] = avatar_url | |
| personality_data["system_prompt"] = system_prompt | |
| self.active_personalities.append(personality_data) | |
| return personality_data | |
| def generate_response(self, personality_id, message, model_key, context=""): | |
| """Generate response from AI personality""" | |
| # Get personality data | |
| personality = next((p for p in self.active_personalities if p.get("id") == personality_id), None) | |
| if not personality: | |
| return "Personality not found" | |
| # Create full prompt | |
| full_prompt = f"{personality['system_prompt']}\n\nUser message: {message}\n\nResponse as {personality['name']}:" | |
| # Generate response | |
| response = self.model_manager.generate_text(model_key, full_prompt, max_length=150) | |
| # Save conversation | |
| self.db.save_conversation(personality_id, message, {"context": context}, model_key) | |
| return response | |
| def send_webhook_message(self, webhook_url, content, username, avatar_url): | |
| """Send message via Discord webhook""" | |
| try: | |
| data = { | |
| "content": content, | |
| "username": username, | |
| "avatar_url": avatar_url | |
| } | |
| response = requests.post(webhook_url, json=data, timeout=10) | |
| return response.status_code == 204 | |
| except Exception as e: | |
| print(f"Webhook error: {e}") | |
| return False | |
| def simulate_conversation(self, trigger_message="", model_key="Qwen2.5-3B-Instruct", participants=None): | |
| """Simulate conversation between AI personalities""" | |
| if not participants: | |
| participants = random.sample(self.active_personalities, min(3, len(self.active_personalities))) | |
| if len(participants) < 2: | |
| return ["Need at least 2 personalities for conversation"] | |
| conversation_log = [] | |
| # Start conversation | |
| starter = random.choice(participants) | |
| starter_response = self.generate_response(starter["id"], trigger_message or "Start a conversation", model_key) | |
| conversation_log.append(f"**{starter['name']}**: {starter_response}") | |
| # Send webhook if available | |
| if hasattr(starter, 'webhook_url'): | |
| self.send_webhook_message(starter['webhook_url'], starter_response, starter['name'], starter['avatar_url']) | |
| time.sleep(1) | |
| # Other participants respond | |
| for participant in participants: | |
| if participant != starter and random.random() > 0.3: | |
| context = f"Responding to: {starter_response}" | |
| response = self.generate_response(participant["id"], "What do you think about that?", model_key, context) | |
| conversation_log.append(f"**{participant['name']}**: {response}") | |
| # Send webhook if available | |
| if hasattr(participant, 'webhook_url'): | |
| self.send_webhook_message(participant['webhook_url'], response, participant['name'], participant['avatar_url']) | |
| time.sleep(random.uniform(0.5, 2)) | |
| return conversation_log | |
| def get_personalities_display(self): | |
| """Get formatted display of all personalities""" | |
| personalities = self.db.get_personalities() | |
| if not personalities: | |
| return "No personalities created yet" | |
| display = "## AI Personalities Database\n\n" | |
| for personality in personalities: | |
| id, name, personality_type, avatar_prompt, _, traits_json, background, system_prompt, created_at = personality | |
| try: | |
| traits = json.loads(traits_json) if traits_json else {} | |
| display += f"### {name}\n" | |
| display += f"- **Type**: {personality_type}\n" | |
| display += f"- **Created**: {created_at}\n" | |
| display += f"- **Traits**: {', '.join([f'{k}: {v:.2f}' for k, v in traits.items()])}\n" | |
| display += f"- **Background**: {background[:100]}...\n\n" | |
| except Exception as e: | |
| display += f"### {name}\nError loading personality data\n\n" | |
| return display | |
| # Initialize application | |
| app = CultSimulatorApp() | |
| def create_gradio_interface(): | |
| """Create the Gradio interface""" | |
| with gr.Blocks(title="π€ Hugging Face Cult Simulator", theme=gr.themes.Soft()) as interface: | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px; margin-bottom: 20px;"> | |
| <h1>π€ Hugging Face Cult Simulator</h1> | |
| <h2>AI-Powered Personality Generation & Social Simulation</h2> | |
| <p>Generate unique AI personalities using state-of-the-art models and watch them interact</p> | |
| </div> | |
| """) | |
| with gr.Tabs(): | |
| # Tab 1: Personality Generation | |
| with gr.Tab("π§ AI Personality Generation"): | |
| gr.Markdown("### Generate AI Personalities Using Language Models") | |
| with gr.Row(): | |
| name_input = gr.Textbox(label="Character Name (Optional)", placeholder="Leave blank for auto-generated") | |
| model_selector = gr.Dropdown( | |
| label="Text Generation Model", | |
| choices=list(TEXT_GENERATION_MODELS.keys()), | |
| value="Qwen2.5-3B-Instruct" | |
| ) | |
| avatar_model_selector = gr.Dropdown( | |
| label="Avatar Generation Model", | |
| choices=list(IMAGE_GENERATION_MODELS.keys()), | |
| value="FLUX.1-dev" | |
| ) | |
| context_input = gr.Textbox( | |
| label="Generation Context (Optional)", | |
| placeholder="E.g., 'Create a friendly but mysterious character who welcomes newcomers'", | |
| lines=2 | |
| ) | |
| generate_btn = gr.Button("π Generate AI Personality", variant="primary") | |
| personality_output = gr.JSON(label="Generated Personality") | |
| avatar_output = gr.Image(label="Generated Avatar") | |
| # Personality display | |
| personality_display = gr.Markdown(app.get_personalities_display()) | |
| # Tab 2: Conversation Simulator | |
| with gr.Tab("π¬ Conversation Simulator"): | |
| gr.Markdown("### Simulate Conversations Between AI Personalities") | |
| with gr.Row(): | |
| conversation_model = gr.Dropdown( | |
| label="Response Generation Model", | |
| choices=list(TEXT_GENERATION_MODELS.keys()), | |
| value="Qwen2.5-3B-Instruct" | |
| ) | |
| trigger_message = gr.Textbox( | |
| label="Conversation Starter", | |
| placeholder="What should they talk about?", | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| simulate_btn = gr.Button("π¬ Start Conversation", variant="primary") | |
| auto_simulate_btn = gr.Button("π Auto-Simulate", variant="secondary") | |
| stop_btn = gr.Button("βΉοΈ Stop", variant="stop") | |
| conversation_output = gr.Textbox( | |
| label="Conversation Log", | |
| lines=15, | |
| interactive=False | |
| ) | |
| # Tab 3: Webhook Integration | |
| with gr.Tab("π‘ Webhook Integration"): | |
| gr.Markdown("### Connect AI Personalities to Discord Webhooks") | |
| with gr.Row(): | |
| personality_selector = gr.Dropdown( | |
| label="Select Personality", | |
| choices=[], | |
| value=None | |
| ) | |
| webhook_url = gr.Textbox( | |
| label="Discord Webhook URL", | |
| placeholder="https://discord.com/api/webhooks/...", | |
| type="text" | |
| ) | |
| channel_id = gr.Textbox( | |
| label="Discord Channel ID", | |
| placeholder="123456789012345678" | |
| ) | |
| with gr.Row(): | |
| connect_btn = gr.Button("π Connect Webhook", variant="primary") | |
| test_btn = gr.Button("π§ͺ Test Webhook", variant="secondary") | |
| webhook_status = gr.Textbox( | |
| label="Webhook Status", | |
| lines=5, | |
| interactive=False | |
| ) | |
| # Tab 4: Database & Analytics | |
| with gr.Tab("π Database & Analytics"): | |
| gr.Markdown("### View Stored Data and Analytics") | |
| with gr.Row(): | |
| refresh_btn = gr.Button("π Refresh Data", variant="secondary") | |
| export_btn = gr.Button("π€ Export Database", variant="primary") | |
| database_display = gr.Markdown(app.get_personalities_display()) | |
| analytics_display = gr.JSON(label="Analytics Data") | |
| # Event handlers | |
| def generate_personality_handler(name, model_key, avatar_model, context): | |
| personality = app.create_personality(name, model_key, context) | |
| return personality, personality["avatar_url"], app.get_personalities_display() | |
| def simulate_conversation_handler(model_key, trigger): | |
| if not app.active_personalities: | |
| return "β No personalities available. Create some personalities first!" | |
| conversation = app.simulate_conversation(trigger, model_key) | |
| return "\n\n".join(conversation) | |
| def update_personalities_list(): | |
| personalities = app.db.get_personalities() | |
| choices = [(f"{p[1]} (ID: {p[0]})", p[0]) for p in personalities] | |
| return gr.Dropdown(choices=choices, value=None) | |
| def connect_webhook_handler(personality_id, webhook_url, channel_id): | |
| if not personality_id or not webhook_url: | |
| return "β Please select a personality and provide webhook URL" | |
| # In real implementation, save webhook to database | |
| personality = next((p for p in app.active_personalities if p.get("id") == personality_id), None) | |
| if personality: | |
| personality["webhook_url"] = webhook_url | |
| personality["channel_id"] = channel_id | |
| return f"β Connected {personality['name']} to webhook" | |
| return "β Personality not found" | |
| def test_webhook_handler(personality_id, webhook_url): | |
| personality = next((p for p in app.active_personalities if p.get("id") == personality_id), None) | |
| if personality and webhook_url: | |
| success = app.send_webhook_message( | |
| webhook_url, | |
| "π§ͺ Testing webhook connection from Hugging Face Cult Simulator!", | |
| personality["name"], | |
| personality["avatar_url"] | |
| ) | |
| return "β Webhook test successful!" if success else "β Webhook test failed" | |
| return "β Invalid personality or webhook URL" | |
| def export_database_handler(): | |
| personalities = app.db.get_personalities() | |
| export_data = { | |
| "timestamp": datetime.now().isoformat(), | |
| "total_personalities": len(personalities), | |
| "personalities": [] | |
| } | |
| for personality in personalities: | |
| export_data["personalities"].append({ | |
| "id": personality[0], | |
| "name": personality[1], | |
| "type": personality[2], | |
| "created_at": personality[8] | |
| }) | |
| return export_data | |
| # Connect event handlers | |
| generate_btn.click( | |
| generate_personality_handler, | |
| inputs=[name_input, model_selector, avatar_model_selector, context_input], | |
| outputs=[personality_output, avatar_output, personality_display] | |
| ) | |
| simulate_btn.click( | |
| simulate_conversation_handler, | |
| inputs=[conversation_model, trigger_message], | |
| outputs=[conversation_output] | |
| ) | |
| connect_btn.click( | |
| connect_webhook_handler, | |
| inputs=[personality_selector, webhook_url, channel_id], | |
| outputs=[webhook_status] | |
| ) | |
| test_btn.click( | |
| test_webhook_handler, | |
| inputs=[personality_selector, webhook_url], | |
| outputs=[webhook_status] | |
| ) | |
| refresh_btn.click( | |
| app.get_personalities_display, | |
| outputs=[database_display] | |
| ) | |
| export_btn.click( | |
| export_database_handler, | |
| outputs=[analytics_display] | |
| ) | |
| # Update personality selector when personalities are created | |
| generate_btn.click( | |
| update_personalities_list, | |
| outputs=[personality_selector] | |
| ) | |
| return interface | |
| # Launch the application | |
| if __name__ == "__main__": | |
| interface = create_gradio_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True | |
| ) |