llm / app_simplified.py
lemms's picture
Upload app_simplified.py with huggingface_hub
aef4134 verified
#!/usr/bin/env python3
"""
OpenLLM Inference Space - Simplified Gradio Interface
Loads models from Hugging Face repositories to avoid storage limits
"""
import gradio as gr
import torch
import json
import os
import math
from pathlib import Path
from typing import Dict, Any, Optional
import logging
from dataclasses import dataclass
import torch.nn as nn
import torch.nn.functional as F
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class GPTConfig:
"""Configuration class for GPT model hyperparameters."""
vocab_size: int = 32000
n_layer: int = 6
n_head: int = 8
n_embd: int = 512
block_size: int = 1024
dropout: float = 0.1
bias: bool = True
model_name: str = "gpt-small"
class CausalSelfAttention(nn.Module):
"""Multi-head causal self-attention mechanism."""
def __init__(self, config):
super().__init__()
assert config.n_embd % config.n_head == 0
self.config = config
self.n_head = config.n_head
self.n_embd = config.n_embd
self.head_dim = self.n_embd // self.n_head
self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
self.attn_dropout = nn.Dropout(config.dropout)
self.resid_dropout = nn.Dropout(config.dropout)
# Causal mask
self.register_buffer(
"bias",
torch.tril(torch.ones(config.block_size, config.block_size)).view(
1, 1, config.block_size, config.block_size
),
)
def forward(self, x):
B, T, C = x.size()
q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
q = q.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
k = k.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
v = v.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(self.head_dim))
att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float("-inf"))
att = F.softmax(att, dim=-1)
att = self.attn_dropout(att)
y = att @ v
y = y.transpose(1, 2).contiguous().view(B, T, C)
y = self.resid_dropout(self.c_proj(y))
return y
class MLP(nn.Module):
"""Multi-Layer Perceptron for Transformer."""
def __init__(self, config):
super().__init__()
self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)
self.gelu = nn.GELU()
self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)
self.dropout = nn.Dropout(config.dropout)
def forward(self, x):
x = self.c_fc(x)
x = self.gelu(x)
x = self.c_proj(x)
x = self.dropout(x)
return x
class Block(nn.Module):
"""Single Transformer block."""
def __init__(self, config):
super().__init__()
self.ln_1 = nn.LayerNorm(config.n_embd)
self.attn = CausalSelfAttention(config)
self.ln_2 = nn.LayerNorm(config.n_embd)
self.mlp = MLP(config)
def forward(self, x):
x = x + self.attn(self.ln_1(x))
x = x + self.mlp(self.ln_2(x))
return x
class GPTModel(nn.Module):
"""Complete GPT Language Model."""
def __init__(self, config):
super().__init__()
self.config = config
self.transformer = nn.ModuleDict(
dict(
wte=nn.Embedding(config.vocab_size, config.n_embd),
wpe=nn.Embedding(config.block_size, config.n_embd),
drop=nn.Dropout(config.dropout),
h=nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
ln_f=nn.LayerNorm(config.n_embd),
)
)
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
self.transformer.wte.weight = self.lm_head.weight
self.apply(self._init_weights)
def _init_weights(self, module):
if isinstance(module, nn.Linear):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
def forward(self, input_ids, attention_mask=None, labels=None):
device = input_ids.device
b, t = input_ids.size()
assert t <= self.config.block_size
# Token embeddings
tok_emb = self.transformer.wte(input_ids)
# Position embeddings
pos = torch.arange(0, t, dtype=torch.long, device=device)
pos_emb = self.transformer.wpe(pos)
# Combine embeddings
x = self.transformer.drop(tok_emb + pos_emb)
# Pass through transformer blocks
for block in self.transformer.h:
x = block(x)
# Final layer normalization
x = self.transformer.ln_f(x)
# Language modeling head
logits = self.lm_head(x)
loss = None
if labels is not None:
# Shift so that tokens < n predict n
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
loss = F.cross_entropy(
shift_logits.view(-1, shift_logits.size(-1)),
shift_labels.view(-1),
ignore_index=-1
)
return (loss, logits) if loss is not None else (logits,)
def generate(self, input_ids, max_length=100, temperature=1.0, **kwargs):
"""Generate text using the model."""
self.eval()
with torch.no_grad():
for _ in range(max_length - input_ids.size(1)):
# Crop sequence if it exceeds block size
idx_cond = (
input_ids
if input_ids.size(1) <= self.config.block_size
else input_ids[:, -self.config.block_size:]
)
# Forward pass
logits = self(idx_cond)[0]
# Get logits for the last token
logits = logits[:, -1, :] / temperature
# Apply softmax and sample
probs = F.softmax(logits, dim=-1)
idx_next = torch.multinomial(probs, num_samples=1)
# Append to sequence
input_ids = torch.cat((input_ids, idx_next), dim=1)
self.train()
return input_ids
class OpenLLMInferenceEngine:
"""Simplified inference engine that loads models from Hugging Face repositories"""
def __init__(self):
self.models = {}
self.tokenizers = {}
self.current_model = None
self.current_tokenizer = None
# Model configurations with Hugging Face repository IDs
self.model_configs = {
"openllm-small-extended-4k": {
"name": "OpenLLM Small (4k steps)",
"description": "Small model trained for 4,000 steps - Early training stage",
"hf_repo": "lemms/openllm-small-extended-4k",
"local_path": "models/small-extended-4k",
"checkpoint": "best_model.pt",
"config": "config.json"
},
"openllm-small-extended-6k": {
"name": "OpenLLM Small (6k steps)",
"description": "Small model trained for 6,000 steps - Improved coherence",
"hf_repo": "lemms/openllm-small-extended-6k",
"local_path": "models/small-extended-6k",
"checkpoint": "best_model.pt",
"config": "config.json"
},
"openllm-small-extended-7k": {
"name": "OpenLLM Small (7k steps)",
"description": "Small model trained for 7,000 steps - Enhanced quality",
"hf_repo": "lemms/openllm-small-extended-7k",
"local_path": "models/small-extended-7k",
"checkpoint": "best_model.pt",
"config": "config.json"
},
"openllm-small-extended-8k": {
"name": "OpenLLM Small (8k steps)",
"description": "Small model trained for 8,000 steps - Sophisticated understanding",
"hf_repo": "lemms/openllm-small-extended-8k",
"local_path": "models/small-extended-8k",
"checkpoint": "best_model.pt",
"config": "config.json"
},
"openllm-small-extended-9k": {
"name": "OpenLLM Small (9k steps)",
"description": "Small model trained for 9,000 steps - Best performing model",
"hf_repo": "lemms/openllm-small-extended-9k",
"local_path": "models/small-extended-9k",
"checkpoint": "best_model.pt",
"config": "config.json"
},
"openllm-small-extended-10k": {
"name": "OpenLLM Small (10k steps)",
"description": "Small model trained for 10,000 steps - Latest extended training",
"hf_repo": "lemms/openllm-small-extended-10k",
"local_path": "models/small-extended-10k",
"checkpoint": "best_model.pt",
"config": "config.json"
}
}
logger.info("πŸš€ OpenLLM Inference Engine initialized")
logger.info(f"πŸ“‹ Available models: {list(self.model_configs.keys())}")
def load_model_from_hf(self, model_id: str) -> bool:
"""Load model from Hugging Face repository"""
try:
from huggingface_hub import snapshot_download
config = self.model_configs.get(model_id)
if not config:
logger.error(f"❌ Unknown model ID: {model_id}")
return False
logger.info(f"πŸ“₯ Loading model from HF: {config['hf_repo']}")
# Download model files from Hugging Face
local_dir = snapshot_download(
repo_id=config['hf_repo'],
repo_type="model",
local_dir=f"temp_{model_id}",
allow_patterns=["*.pt", "*.json", "*.model"]
)
logger.info(f"βœ… Downloaded model to: {local_dir}")
# Load configuration
config_path = os.path.join(local_dir, "config.json")
if os.path.exists(config_path):
with open(config_path, 'r') as f:
config_data = json.load(f)
# Create model config
model_config = GPTConfig(
vocab_size=config_data["model_config"]["vocab_size"],
n_layer=config_data["model_config"]["n_layer"],
n_head=config_data["model_config"]["n_head"],
n_embd=config_data["model_config"]["n_embd"],
block_size=config_data["model_config"]["block_size"],
dropout=config_data["model_config"]["dropout"],
bias=config_data["model_config"]["bias"]
)
# Create model
model = GPTModel(model_config)
# Load weights if available
model_path = os.path.join(local_dir, "best_model.pt")
if os.path.exists(model_path):
model.load_state_dict(torch.load(model_path, map_location="cpu"))
logger.info("βœ… Loaded model weights")
self.models[model_id] = model
self.current_model = model_id
logger.info(f"βœ… Successfully loaded model: {model_id}")
return True
else:
logger.error(f"❌ Config file not found: {config_path}")
return False
except Exception as e:
logger.error(f"❌ Failed to load model from HF {model_id}: {e}")
return False
def generate_text(self, prompt: str, model_id: str, max_length: int = 100, temperature: float = 0.7) -> str:
"""Generate text using the specified model"""
try:
# Load model if not already loaded
if model_id not in self.models:
if not self.load_model_from_hf(model_id):
return f"❌ Failed to load model: {model_id}"
model = self.models[model_id]
model.eval()
# Simple tokenization (for demo purposes)
# In a real implementation, you'd use the actual tokenizer
tokens = [ord(c) % 32000 for c in prompt] # Simple character-based tokenization
input_ids = torch.tensor([tokens], dtype=torch.long)
with torch.no_grad():
outputs = model.generate(
input_ids,
max_length=max_length,
temperature=temperature
)
# Simple detokenization
generated_text = ''.join([chr(t % 65536) for t in outputs[0].tolist()])
return generated_text
except Exception as e:
logger.error(f"❌ Generation failed: {e}")
return f"❌ Generation failed: {str(e)}"
# Initialize the inference engine
inference_engine = OpenLLMInferenceEngine()
def generate_text_interface(prompt: str, model_choice: str, max_length: int, temperature: float) -> str:
"""Gradio interface function for text generation"""
try:
result = inference_engine.generate_text(
prompt=prompt,
model_id=model_choice,
max_length=max_length,
temperature=temperature
)
return result
except Exception as e:
return f"❌ Error: {str(e)}"
def get_model_info(model_choice: str) -> str:
"""Get information about the selected model"""
config = inference_engine.model_configs.get(model_choice)
if config:
return f"""
**Model Information:**
- **Name**: {config['name']}
- **Description**: {config['description']}
- **Repository**: {config['hf_repo']}
- **Status**: Ready to load
"""
else:
return "❌ Unknown model selected"
# Create Gradio interface
with gr.Blocks(title="OpenLLM Inference Space", theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸš€ OpenLLM Inference Space")
gr.Markdown("Welcome to the OpenLLM Inference Space! Select a model and generate text.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## 🎯 Model Selection")
model_choice = gr.Dropdown(
choices=list(inference_engine.model_configs.keys()),
value="openllm-small-extended-10k",
label="Select Model",
info="Choose from our trained models"
)
model_info = gr.Markdown("Select a model to see information")
def update_model_info(choice):
return get_model_info(choice)
model_choice.change(fn=update_model_info, inputs=model_choice, outputs=model_info)
with gr.Column(scale=2):
gr.Markdown("## ✍️ Text Generation")
prompt_input = gr.Textbox(
label="Enter your prompt",
placeholder="The future of artificial intelligence...",
lines=3
)
with gr.Row():
max_length = gr.Slider(
minimum=10,
maximum=500,
value=100,
step=10,
label="Max Length",
info="Number of tokens to generate"
)
temperature = gr.Slider(
minimum=0.1,
maximum=2.0,
value=0.7,
step=0.1,
label="Temperature",
info="Controls randomness (higher = more random)"
)
generate_btn = gr.Button("πŸš€ Generate Text", variant="primary")
output_text = gr.Textbox(label="Generated Text", lines=10)
gr.Markdown("## πŸ“Š Available Models")
gr.Markdown("""
| Model | Training Steps | Description | Best Loss |
|-------|---------------|-------------|-----------|
| **4k Model** | 4,000 | Early training stage, basic language patterns | ~6.2 |
| **6k Model** | 6,000 | Improved coherence, better vocabulary usage | ~5.8 |
| **7k Model** | 7,000 | Enhanced text generation quality | ~5.5 |
| **8k Model** | 8,000 | More sophisticated language understanding | ~5.3 |
| **9k Model** | 9,000 | Best performing model (latest training) | ~5.2 |
| **10k Model** | 10,000 | Latest extended training, maximum performance | ~5.22 |
""")
# Connect the generate button
generate_btn.click(
fn=generate_text_interface,
inputs=[prompt_input, model_choice, max_length, temperature],
outputs=output_text
)
# Launch the app
if __name__ == "__main__":
demo.launch()