Spaces:
Sleeping
Sleeping
""" | |
Fine-tuning script for Iain Morris style article generation | |
Uses QLoRA for efficient training | |
""" | |
import os | |
import json | |
import torch | |
from transformers import ( | |
AutoTokenizer, | |
AutoModelForCausalLM, | |
TrainingArguments, | |
Trainer, | |
DataCollatorForLanguageModeling, | |
BitsAndBytesConfig | |
) | |
from peft import ( | |
LoraConfig, | |
get_peft_model, | |
TaskType, | |
prepare_model_for_kbit_training | |
) | |
from datasets import Dataset, load_from_disk | |
import logging | |
from typing import Dict, List | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class IainMorrisFineTuner: | |
def __init__(self, model_name: str = "microsoft/DialoGPT-medium"): | |
""" | |
Initialize the fine-tuner | |
Args: | |
model_name: Base model to fine-tune | |
""" | |
# Use Zephyr-7B-Beta - excellent for instruction following, no auth required | |
self.model_name = "HuggingFaceH4/zephyr-7b-beta" | |
# Configure device for Apple Silicon M3 | |
if torch.backends.mps.is_available(): | |
self.device = torch.device("mps") | |
self.use_mps = True | |
self.use_cuda = False | |
logger.info("Using Apple Silicon MPS acceleration") | |
elif torch.cuda.is_available(): | |
self.device = torch.device("cuda") | |
self.use_mps = False | |
self.use_cuda = True | |
logger.info("Using CUDA acceleration") | |
else: | |
self.device = torch.device("cpu") | |
self.use_mps = False | |
self.use_cuda = False | |
logger.info("Using CPU") | |
logger.info(f"Using device: {self.device}") | |
# Skip quantization for MPS - not supported yet | |
if self.use_cuda: | |
self.bnb_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_use_double_quant=True, | |
bnb_4bit_quant_type="nf4", | |
bnb_4bit_compute_dtype=torch.bfloat16 | |
) | |
else: | |
self.bnb_config = None | |
if self.use_mps: | |
logger.info("Quantization not supported on MPS. Using full precision with memory optimization.") | |
else: | |
logger.info("Quantization not available on CPU. Using full precision.") | |
# LoRA configuration optimized for M3 | |
lora_rank = 16 if (self.use_mps or self.use_cuda) else 8 # Full rank for M3/CUDA | |
self.lora_config = LoraConfig( | |
r=lora_rank, # Rank | |
lora_alpha=32, # Alpha parameter for LoRA scaling | |
target_modules=[ | |
"q_proj", | |
"k_proj", | |
"v_proj", | |
"o_proj", | |
"gate_proj", | |
"up_proj", | |
"down_proj", | |
"lm_head", | |
], | |
bias="none", | |
lora_dropout=0.05, | |
task_type=TaskType.CAUSAL_LM, | |
) | |
def load_model_and_tokenizer(self): | |
"""Load the base model and tokenizer""" | |
logger.info(f"Loading model: {self.model_name}") | |
# Load tokenizer | |
self.tokenizer = AutoTokenizer.from_pretrained( | |
self.model_name, | |
trust_remote_code=True, | |
padding_side="left" | |
) | |
# Add pad token if it doesn't exist | |
if self.tokenizer.pad_token is None: | |
self.tokenizer.pad_token = self.tokenizer.eos_token | |
# Load model with M3-optimized settings | |
model_kwargs = { | |
"trust_remote_code": True, | |
"low_cpu_mem_usage": True, | |
} | |
if self.use_cuda: | |
# CUDA settings with quantization | |
model_kwargs.update({ | |
"quantization_config": self.bnb_config, | |
"device_map": "auto", | |
"torch_dtype": torch.bfloat16 | |
}) | |
elif self.use_mps: | |
# MPS (Apple Silicon) optimized settings | |
model_kwargs.update({ | |
"torch_dtype": torch.float16, # float16 works well on MPS | |
"device_map": None, # Let us handle device placement manually | |
}) | |
else: | |
# CPU settings | |
model_kwargs.update({ | |
"torch_dtype": torch.float32, | |
"device_map": None, | |
}) | |
self.model = AutoModelForCausalLM.from_pretrained( | |
self.model_name, | |
**model_kwargs | |
) | |
# Move model to device if not using device_map | |
if not self.use_cuda: | |
self.model = self.model.to(self.device) | |
# Prepare model for training | |
if self.use_cuda: | |
self.model = prepare_model_for_kbit_training(self.model) | |
else: | |
# For MPS/CPU training, just ensure model is in training mode | |
self.model.train() | |
# Add LoRA adapters | |
self.model = get_peft_model(self.model, self.lora_config) | |
# Print trainable parameters | |
self.model.print_trainable_parameters() | |
logger.info("Model and tokenizer loaded successfully") | |
def format_chat_template(self, example: Dict) -> str: | |
""" | |
Format example using chat template | |
Args: | |
example: Training example with messages | |
Returns: | |
Formatted text | |
""" | |
messages = example['messages'] | |
# Use the tokenizer's chat template if available | |
if hasattr(self.tokenizer, 'apply_chat_template'): | |
try: | |
return self.tokenizer.apply_chat_template( | |
messages, | |
tokenize=False, | |
add_generation_prompt=False | |
) | |
except: | |
pass | |
# Fallback formatting | |
formatted = "" | |
for message in messages: | |
role = message['role'] | |
content = message['content'] | |
if role == 'system': | |
formatted += f"<|system|>\n{content}\n" | |
elif role == 'user': | |
formatted += f"<|user|>\n{content}\n" | |
elif role == 'assistant': | |
formatted += f"<|assistant|>\n{content}\n" | |
return formatted | |
def tokenize_function(self, examples: Dict) -> Dict: | |
""" | |
Tokenize examples for training | |
Args: | |
examples: Batch of examples | |
Returns: | |
Tokenized examples | |
""" | |
# Format each example | |
texts = [] | |
for i in range(len(examples['messages'])): | |
example = {'messages': examples['messages'][i]} | |
formatted_text = self.format_chat_template(example) | |
texts.append(formatted_text) | |
# Tokenize | |
tokenized = self.tokenizer( | |
texts, | |
truncation=True, | |
padding=False, | |
max_length=2048, | |
return_overflowing_tokens=False, | |
) | |
# Set labels for causal language modeling | |
tokenized["labels"] = tokenized["input_ids"].copy() | |
return tokenized | |
def load_datasets(self, data_dir: str = "data"): | |
""" | |
Load training and validation datasets | |
Args: | |
data_dir: Directory containing the datasets | |
""" | |
logger.info("Loading datasets...") | |
try: | |
# Try to load HF datasets first | |
self.train_dataset = load_from_disk(f"{data_dir}/train_hf_dataset") | |
self.val_dataset = load_from_disk(f"{data_dir}/val_hf_dataset") | |
except: | |
# Fallback to JSON files - prioritize enhanced dataset | |
try: | |
# Try enhanced dataset first (includes non-telecom examples) | |
with open(f"{data_dir}/enhanced_train_dataset.json", 'r') as f: | |
train_data = json.load(f) | |
logger.info("Using enhanced training dataset with non-telecom examples") | |
except FileNotFoundError: | |
try: | |
# Fall back to improved dataset (updated system prompts) | |
with open(f"{data_dir}/improved_train_dataset.json", 'r') as f: | |
train_data = json.load(f) | |
logger.info("Using improved training dataset with updated system prompts") | |
except FileNotFoundError: | |
# Final fallback to original dataset | |
with open(f"{data_dir}/train_dataset.json", 'r') as f: | |
train_data = json.load(f) | |
logger.info("Using original training dataset") | |
# Load validation dataset (use improved if available) | |
try: | |
with open(f"{data_dir}/improved_val_dataset.json", 'r') as f: | |
val_data = json.load(f) | |
logger.info("Using improved validation dataset") | |
except FileNotFoundError: | |
with open(f"{data_dir}/val_dataset.json", 'r') as f: | |
val_data = json.load(f) | |
logger.info("Using original validation dataset") | |
self.train_dataset = Dataset.from_list(train_data) | |
self.val_dataset = Dataset.from_list(val_data) | |
logger.info(f"Loaded {len(self.train_dataset)} training examples") | |
logger.info(f"Loaded {len(self.val_dataset)} validation examples") | |
# Tokenize datasets | |
logger.info("Tokenizing datasets...") | |
self.train_dataset = self.train_dataset.map( | |
self.tokenize_function, | |
batched=True, | |
remove_columns=self.train_dataset.column_names | |
) | |
self.val_dataset = self.val_dataset.map( | |
self.tokenize_function, | |
batched=True, | |
remove_columns=self.val_dataset.column_names | |
) | |
logger.info("Datasets tokenized successfully") | |
def setup_training_args(self, output_dir: str = "models/iain-morris-model-enhanced"): | |
""" | |
Setup training arguments optimized for M3 | |
Args: | |
output_dir: Directory to save the model | |
""" | |
# Base training arguments - improved based on training guide recommendations | |
training_kwargs = { | |
"output_dir": output_dir, | |
"num_train_epochs": 4 if self.use_mps else 4, # Increased epochs for better style learning | |
"per_device_train_batch_size": 1, | |
"per_device_eval_batch_size": 1, | |
"gradient_accumulation_steps": 8 if self.use_mps else 4, # More accumulation for MPS | |
"save_steps": 50, | |
"logging_steps": 10, | |
"learning_rate": 5e-5 if self.use_mps else 5e-5, # Lower LR as recommended (5e-5) | |
"weight_decay": 0.001, | |
"max_grad_norm": 0.3, | |
"max_steps": -1, | |
"warmup_ratio": 0.03, | |
"group_by_length": True, | |
"lr_scheduler_type": "constant", | |
"report_to": "none", # Disable reporting to avoid tensorboard dependency | |
"eval_strategy": "steps", | |
"eval_steps": 50, | |
"save_total_limit": 3, # Keep more checkpoints for better model selection | |
"load_best_model_at_end": True, | |
"metric_for_best_model": "eval_loss", | |
"greater_is_better": False, | |
"dataloader_pin_memory": False, | |
} | |
# Device-specific optimizations | |
if self.use_cuda: | |
training_kwargs.update({ | |
"optim": "paged_adamw_32bit", | |
"fp16": False, | |
"bf16": True, | |
}) | |
elif self.use_mps: | |
training_kwargs.update({ | |
"optim": "adamw_torch", # Standard optimizer for MPS | |
"fp16": False, # fp16 not supported on MPS in this version | |
"bf16": False, # bf16 not supported on MPS | |
"dataloader_num_workers": 0, # Avoid multiprocessing issues on MPS | |
}) | |
else: | |
training_kwargs.update({ | |
"optim": "adamw_torch", | |
"fp16": False, | |
"bf16": False, | |
"dataloader_num_workers": 0, | |
}) | |
self.training_args = TrainingArguments(**training_kwargs) | |
logger.info(f"Training configured for {self.device} with {training_kwargs['num_train_epochs']} epochs") | |
def train(self): | |
"""Train the model""" | |
logger.info("Starting training...") | |
# Data collator | |
data_collator = DataCollatorForLanguageModeling( | |
tokenizer=self.tokenizer, | |
mlm=False, | |
) | |
# Initialize trainer | |
trainer = Trainer( | |
model=self.model, | |
args=self.training_args, | |
train_dataset=self.train_dataset, | |
eval_dataset=self.val_dataset, | |
tokenizer=self.tokenizer, | |
data_collator=data_collator, | |
) | |
# Train | |
trainer.train() | |
# Save the final model | |
trainer.save_model() | |
self.tokenizer.save_pretrained(self.training_args.output_dir) | |
logger.info(f"Training completed. Model saved to {self.training_args.output_dir}") | |
def save_lora_adapters(self, output_dir: str = "models/lora_adapters"): | |
""" | |
Save only the LoRA adapters | |
Args: | |
output_dir: Directory to save adapters | |
""" | |
os.makedirs(output_dir, exist_ok=True) | |
self.model.save_pretrained(output_dir) | |
self.tokenizer.save_pretrained(output_dir) | |
logger.info(f"LoRA adapters saved to {output_dir}") | |
def run_full_pipeline(self, data_dir: str = "data"): | |
""" | |
Run the complete fine-tuning pipeline | |
Args: | |
data_dir: Directory containing training data | |
""" | |
try: | |
# Load model and tokenizer | |
self.load_model_and_tokenizer() | |
# Load datasets | |
self.load_datasets(data_dir) | |
# Setup training arguments | |
self.setup_training_args() | |
# Train | |
self.train() | |
# Save LoRA adapters separately | |
self.save_lora_adapters() | |
logger.info("Fine-tuning pipeline completed successfully!") | |
except Exception as e: | |
logger.error(f"Error in fine-tuning pipeline: {e}") | |
raise | |
def main(): | |
""" | |
Main function to run fine-tuning | |
""" | |
# Check if CUDA is available | |
if torch.cuda.is_available(): | |
logger.info(f"CUDA available. GPU: {torch.cuda.get_device_name()}") | |
logger.info(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") | |
else: | |
logger.warning("CUDA not available. Training will be slow on CPU.") | |
# Initialize fine-tuner | |
fine_tuner = IainMorrisFineTuner() | |
# Run the pipeline | |
fine_tuner.run_full_pipeline() | |
if __name__ == "__main__": | |
main() | |