qwen4bit / run_cloud_training.py
George-API's picture
Upload run_cloud_training.py with huggingface_hub
467f05c verified
raw
history blame
16 kB
#!/usr/bin/env python
"""
Simplified fine-tuning script for DeepSeek-R1-Distill-Qwen-14B-unsloth-bnb-4bit
- Optimized for L40S GPU
- Works with pre-tokenized datasets
- Research training only (no inference)
"""
import os
import logging
import json
import torch
import argparse
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, AutoConfig, BitsAndBytesConfig
from transformers.data.data_collator import DataCollatorMixin
from peft import LoraConfig, get_peft_model
from dotenv import load_dotenv
# Basic environment setup for L40S
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True,max_split_size_mb:256"
os.environ["TRANSFORMERS_NO_FLASH_ATTENTION"] = "1"
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Create a marker file to indicate training is active
def create_training_marker(output_dir):
os.makedirs(output_dir, exist_ok=True)
with open("TRAINING_ACTIVE", "w") as f:
f.write(f"Training active in {output_dir}")
with open(os.path.join(output_dir, "RESEARCH_TRAINING_ONLY"), "w") as f:
f.write("This model is for research training only. No interactive outputs.")
# Remove the training marker file
def remove_training_marker():
if os.path.exists("TRAINING_ACTIVE"):
os.remove("TRAINING_ACTIVE")
logger.info("Removed training active marker")
# Custom data collator for pre-tokenized data
class PreTokenizedCollator(DataCollatorMixin):
def __init__(self, pad_token_id=0, tokenizer=None):
self.pad_token_id = pad_token_id
self.tokenizer = tokenizer # Keep reference to tokenizer for fallback
def __call__(self, features):
# Extract features properly from the batch
processed_features = []
for feature in features:
# If input_ids is directly available, use it
if 'input_ids' in feature and isinstance(feature['input_ids'], list):
processed_features.append(feature)
continue
# If input_ids is not available, try to extract from conversations
if 'input_ids' not in feature and 'conversations' in feature:
conversations = feature['conversations']
if isinstance(conversations, list) and len(conversations) > 0:
# Case 1: If conversations has 'input_ids' field (pre-tokenized)
if isinstance(conversations[0], dict) and 'input_ids' in conversations[0]:
feature['input_ids'] = conversations[0]['input_ids']
# Case 2: If conversations itself contains input_ids
elif all(isinstance(x, int) for x in conversations):
feature['input_ids'] = conversations
# Case 3: If conversations has 'content' field
elif isinstance(conversations[0], dict) and 'content' in conversations[0]:
content = conversations[0]['content']
# If content is already tokens, use directly
if isinstance(content, list) and all(isinstance(x, int) for x in content):
feature['input_ids'] = content
# If content is a string and we have tokenizer, tokenize as fallback
elif isinstance(content, str) and self.tokenizer:
logger.warning("Tokenizing string content as fallback")
feature['input_ids'] = self.tokenizer.encode(content, add_special_tokens=False)
# Ensure input_ids is present and is a list of integers
if 'input_ids' in feature:
if isinstance(feature['input_ids'], str) and self.tokenizer:
feature['input_ids'] = self.tokenizer.encode(feature['input_ids'], add_special_tokens=False)
elif not isinstance(feature['input_ids'], list):
try:
feature['input_ids'] = list(feature['input_ids'])
except Exception as e:
logger.error(f"Could not convert input_ids to list: {e}")
continue
processed_features.append(feature)
if len(processed_features) == 0:
raise ValueError("No valid examples found. Check dataset structure.")
# Determine max length in this batch
batch_max_len = max(len(x["input_ids"]) for x in processed_features)
# Initialize batch tensors
batch = {
"input_ids": torch.ones((len(processed_features), batch_max_len), dtype=torch.long) * self.pad_token_id,
"attention_mask": torch.zeros((len(processed_features), batch_max_len), dtype=torch.long),
"labels": torch.ones((len(processed_features), batch_max_len), dtype=torch.long) * -100 # -100 is ignored in loss
}
# Fill batch tensors
for i, feature in enumerate(processed_features):
input_ids = feature["input_ids"]
seq_len = len(input_ids)
# Convert to tensor if it's a list
if isinstance(input_ids, list):
input_ids = torch.tensor(input_ids, dtype=torch.long)
# Copy data to batch tensors
batch["input_ids"][i, :seq_len] = input_ids
batch["attention_mask"][i, :seq_len] = 1
# If there are labels, use them, otherwise use input_ids
if "labels" in feature:
labels = feature["labels"]
if isinstance(labels, list):
labels = torch.tensor(labels, dtype=torch.long)
batch["labels"][i, :len(labels)] = labels
else:
batch["labels"][i, :seq_len] = input_ids
return batch
# Load and prepare dataset with proper sorting
def load_and_prepare_dataset(dataset_name, config):
"""Load and prepare the dataset for fine-tuning with proper sorting"""
logger.info(f"Loading dataset: {dataset_name}")
try:
# Load dataset
dataset = load_dataset(dataset_name)
# Extract the split we want to use (usually 'train')
if 'train' in dataset:
dataset = dataset['train']
# Get the dataset config
dataset_config = config.get("dataset_config", {})
sort_field = dataset_config.get("sort_by_field", "prompt_number")
# Sort in ascending order by specified field
logger.info(f"Sorting dataset by {sort_field} in ascending order")
dataset = dataset.sort(sort_field)
# Print dataset info
logger.info(f"Dataset loaded with {len(dataset)} entries")
logger.info(f"Dataset columns: {dataset.column_names}")
# Print sample for debugging
if len(dataset) > 0:
logger.info(f"Sample entry structure: {list(dataset[0].keys())}")
return dataset
except Exception as e:
logger.error(f"Error loading dataset: {str(e)}")
raise
# Main training function
def train(config_path, dataset_name, output_dir):
# Load environment variables
load_dotenv()
# Load config
with open(config_path, 'r') as f:
config = json.load(f)
# Create training marker
create_training_marker(output_dir)
try:
# Extract configs
model_config = config.get("model_config", {})
training_config = config.get("training_config", {})
hardware_config = config.get("hardware_config", {})
lora_config = config.get("lora_config", {})
dataset_config = config.get("dataset_config", {})
# Load and prepare dataset with proper sorting
dataset = load_and_prepare_dataset(dataset_name, config)
# Load model settings
model_name = model_config.get("model_name_or_path")
logger.info(f"Using model: {model_name}")
# Initialize tokenizer
logger.info("Loading tokenizer")
tokenizer = AutoTokenizer.from_pretrained(
model_name,
trust_remote_code=True
)
tokenizer.pad_token = tokenizer.eos_token
# Create quantization config
quant_config = config.get("quantization_config", {})
bnb_config = BitsAndBytesConfig(
load_in_4bit=quant_config.get("load_in_4bit", True),
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type=quant_config.get("bnb_4bit_quant_type", "nf4"),
bnb_4bit_use_double_quant=quant_config.get("bnb_4bit_use_double_quant", True)
)
# Create model with proper configuration
logger.info("Loading pre-quantized model")
model = AutoModelForCausalLM.from_pretrained(
model_name,
quantization_config=bnb_config,
device_map="auto",
torch_dtype=torch.float16,
trust_remote_code=True,
use_cache=model_config.get("use_cache", False),
attn_implementation=hardware_config.get("attn_implementation", "eager")
)
# Apply rope scaling if configured
if "rope_scaling" in model_config:
logger.info(f"Applying rope scaling: {model_config['rope_scaling']}")
if hasattr(model.config, "rope_scaling"):
model.config.rope_scaling = model_config["rope_scaling"]
# Create LoRA config
logger.info("Creating LoRA configuration")
lora_config_obj = LoraConfig(
r=lora_config.get("r", 16),
lora_alpha=lora_config.get("lora_alpha", 32),
lora_dropout=lora_config.get("lora_dropout", 0.05),
bias=lora_config.get("bias", "none"),
target_modules=lora_config.get("target_modules", ["q_proj", "k_proj", "v_proj", "o_proj"])
)
# Apply LoRA to model
logger.info("Applying LoRA to model")
model = get_peft_model(model, lora_config_obj)
logger.info("Successfully applied LoRA")
# Check for L40S GPU and optimize batch size
if torch.cuda.is_available():
gpu_info = torch.cuda.get_device_properties(0)
logger.info(f"GPU: {gpu_info.name}, VRAM: {gpu_info.total_memory / 1e9:.2f} GB")
# Check if it's an L40S or high-memory GPU
if "L40S" in gpu_info.name or gpu_info.total_memory > 40e9:
logger.info("Detected L40S GPU - optimizing for high-memory GPU")
per_device_train_batch_size = training_config.get("per_device_train_batch_size", 4)
else:
# Use a smaller batch size for other GPUs
per_device_train_batch_size = 2
logger.info(f"Using conservative batch size for non-L40S GPU: {per_device_train_batch_size}")
else:
per_device_train_batch_size = 1
logger.warning("No GPU detected - using minimal batch size")
# Configure reporting backends
reports = training_config.get("report_to", ["tensorboard"])
# Create training arguments
logger.info("Creating training arguments")
training_args = TrainingArguments(
output_dir=output_dir,
num_train_epochs=training_config.get("num_train_epochs", 3),
per_device_train_batch_size=per_device_train_batch_size,
gradient_accumulation_steps=training_config.get("gradient_accumulation_steps", 4),
learning_rate=training_config.get("learning_rate", 2e-5),
lr_scheduler_type=training_config.get("lr_scheduler_type", "cosine"),
warmup_ratio=training_config.get("warmup_ratio", 0.03),
weight_decay=training_config.get("weight_decay", 0.01),
optim=training_config.get("optim", "adamw_torch"),
fp16=hardware_config.get("fp16", True),
bf16=hardware_config.get("bf16", False),
max_grad_norm=training_config.get("max_grad_norm", 0.3),
logging_steps=training_config.get("logging_steps", 10),
save_steps=training_config.get("save_steps", 200),
save_total_limit=training_config.get("save_total_limit", 3),
evaluation_strategy=training_config.get("evaluation_strategy", "steps"),
eval_steps=training_config.get("eval_steps", 200),
load_best_model_at_end=training_config.get("load_best_model_at_end", True),
report_to=reports,
logging_first_step=training_config.get("logging_first_step", True),
disable_tqdm=training_config.get("disable_tqdm", False),
remove_unused_columns=False,
gradient_checkpointing=hardware_config.get("gradient_checkpointing", True),
dataloader_num_workers=training_config.get("dataloader_num_workers", 4)
)
# Create trainer with pre-tokenized collator
logger.info("Creating trainer with pre-tokenized collator")
trainer = Trainer(
model=model,
args=training_args,
train_dataset=dataset,
data_collator=PreTokenizedCollator(
pad_token_id=tokenizer.pad_token_id,
tokenizer=tokenizer
),
)
# Start training
logger.info("Starting training - RESEARCH PHASE ONLY")
trainer.train()
# Save the model
logger.info(f"Saving model to {output_dir}")
trainer.save_model(output_dir)
# Save LoRA adapter separately
lora_output_dir = os.path.join(output_dir, "lora_adapter")
model.save_pretrained(lora_output_dir)
logger.info(f"Saved LoRA adapter to {lora_output_dir}")
# Save tokenizer
tokenizer_output_dir = os.path.join(output_dir, "tokenizer")
tokenizer.save_pretrained(tokenizer_output_dir)
logger.info(f"Saved tokenizer to {tokenizer_output_dir}")
# Save config for reference
with open(os.path.join(output_dir, "training_config.json"), "w") as f:
json.dump(config, f, indent=2)
logger.info("Training complete - RESEARCH PHASE ONLY")
return output_dir
finally:
# Always remove the training marker when done
remove_training_marker()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fine-tune DeepSeek model (Research Only)")
parser.add_argument("--config", type=str, default="transformers_config.json",
help="Path to the configuration file")
parser.add_argument("--dataset", type=str, default="phi4-cognitive-dataset",
help="Dataset name or path")
parser.add_argument("--output_dir", type=str, default="fine_tuned_model",
help="Output directory for the fine-tuned model")
args = parser.parse_args()
try:
output_path = train(args.config, args.dataset, args.output_dir)
print(f"Research training completed. Model saved to: {output_path}")
except Exception as e:
logging.error(f"Training failed: {str(e)}")
remove_training_marker() # Clean up marker if training fails
raise