Mango-Metrics-NLM
feat: Phi-3.5-MoE multi-agent model repository
c8b77b5
"""
Data Processor for MangoMAS Local Training
This module processes the original MangoMAS datasets from JSONL format
into training-ready datasets with proper splits and preprocessing.
"""
import argparse
import json
import logging
from pathlib import Path
from typing import Dict, List, Tuple
import yaml
from sklearn.model_selection import train_test_split
from tqdm import tqdm
logger = logging.getLogger(__name__)
class MangoMASDataProcessor:
"""Process MangoMAS datasets for local training."""
def __init__(
self,
input_dir,
output_dir=None,
min_length: int = 10,
max_length: int = 2048,
config_path: str = None,
):
"""Initialize with input/output directories or config path for flexibility."""
# Support both test interface and config-driven approach
if config_path is not None:
# Config-driven initialization (original functionality)
with open(config_path, "r") as f:
self.config = yaml.safe_load(f)
self.data_config = self.config["data"]
self.agents_config = self.config["agents"]
self.input_dir = Path(input_dir) if input_dir else None
self.output_dir = Path(output_dir) if output_dir else Path("/Volumes/Mango_MAS/data/processed")
self.min_length = self.data_config.get("preprocessing", {}).get(
"min_length", min_length
)
self.max_length = self.data_config.get("preprocessing", {}).get(
"max_length", max_length
)
else:
# Direct initialization (test interface)
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir) if output_dir else Path("/Volumes/Mango_MAS/data/processed")
self.min_length = min_length
self.max_length = max_length
self.config = None
self.data_config = None
self.agents_config = None
logging.basicConfig(level=logging.INFO)
def process_datasets(
self, input_dir: str, output_dir: str = "/Volumes/Mango_MAS/data/processed"
) -> None:
"""
Process all agent datasets from input directory.
Args:
input_dir: Directory containing original JSONL files
output_dir: Directory to save processed datasets
"""
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Define dataset mappings
datasets = {
"infrastructure": input_path
/ "infrastructure_agent_synthetic_prompts.jsonl",
"devsecops": input_path / "devsecops_agent_synthetic_prompts.jsonl",
"risk_assessment": input_path
/ "risk_assessment_agent_synthetic_prompts.jsonl",
}
for agent_type, file_path in datasets.items():
if file_path.exists():
logger.info(f"Processing {agent_type} dataset from {file_path}")
self._process_single_dataset(file_path, output_path, agent_type)
else:
logger.warning(f"Dataset file not found: {file_path}")
def _process_single_dataset(
self, input_file: Path, output_dir: Path, agent_type: str
) -> None:
"""Process a single agent dataset."""
# Load data
data = self._load_jsonl(input_file)
logger.info(f"Loaded {len(data)} samples for {agent_type}")
# Clean and preprocess
cleaned_data = self._clean_data(data)
logger.info(f"After cleaning: {len(cleaned_data)} samples")
# Convert to training format
training_data = self._convert_to_training_format(cleaned_data, agent_type)
# Create splits
train_data, val_data, test_data = self._create_splits(training_data)
# Save processed datasets
self._save_datasets(train_data, val_data, test_data, output_dir, agent_type)
logger.info(
f"Saved {agent_type} dataset: "
f"{len(train_data)} train, {len(val_data)} val, {len(test_data)} test"
)
def _load_jsonl(self, file_path: Path) -> List[Dict]:
"""Load data from JSONL file."""
data = []
with open(file_path, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
try:
data.append(json.loads(line.strip()))
except json.JSONDecodeError as e:
logger.warning(f"Skipping invalid JSON on line {line_num}: {e}")
return data
def _clean_data(self, data: List[Dict]) -> List[Dict]:
"""Clean and validate the data."""
cleaned = []
for item in tqdm(data, desc="Cleaning data"):
# Check required fields
if not all(key in item for key in ["instruction", "input", "output"]):
continue
# Check text lengths
input_text = f"{item['instruction']} {item['input']}"
output_text = item["output"]
if (
len(input_text) < self.data_config["preprocessing"]["min_length"]
or len(input_text) > self.data_config["preprocessing"]["max_length"]
):
continue
if (
len(output_text) < self.data_config["preprocessing"]["min_length"]
or len(output_text) > self.data_config["preprocessing"]["max_length"]
):
continue
cleaned.append(item)
# Remove duplicates if configured
if self.data_config["preprocessing"]["remove_duplicates"]:
cleaned = self._remove_duplicates(cleaned)
return cleaned
def _remove_duplicates(self, data: List[Dict]) -> List[Dict]:
"""Remove duplicate entries based on input text."""
seen_inputs = set()
unique_data = []
for item in data:
input_text = f"{item['instruction']} {item['input']}"
if input_text not in seen_inputs:
seen_inputs.add(input_text)
unique_data.append(item)
logger.info(f"Removed {len(data) - len(unique_data)} duplicates")
return unique_data
def _validate_sample(self, sample: Dict) -> bool:
"""Validate a single sample for required fields and length constraints."""
# Check required fields
required_fields = ["instruction", "input", "output", "agent_type"]
if not all(key in sample for key in required_fields):
return False
# Check text lengths
combined_text = f"{sample['instruction']} {sample['input']} {sample['output']}"
if len(combined_text) < self.min_length or len(combined_text) > self.max_length:
return False
return True
def _clean_text(self, text: str) -> str:
"""Clean text by normalizing whitespace and removing extra spaces."""
import re
# Remove extra whitespace and normalize
cleaned = re.sub(r"\s+", " ", text.strip())
return cleaned
def _format_conversation(self, sample: Dict) -> Dict:
"""Format sample into conversation format suitable for training."""
# Create conversation text
if sample.get("input", "").strip():
conversation_text = f"Human: {sample['instruction']}\n{sample['input']}\n\nAssistant: {sample['output']}"
else:
conversation_text = (
f"Human: {sample['instruction']}\n\nAssistant: {sample['output']}"
)
return {
"text": conversation_text,
"agent_type": sample["agent_type"],
"instruction": sample["instruction"],
"input": sample["input"],
"output": sample["output"],
}
def _split_dataset(
self,
data: List[Dict],
train_ratio: float = 0.8,
val_ratio: float = 0.1,
test_ratio: float = 0.1,
) -> Tuple[List[Dict], List[Dict], List[Dict]]:
"""Split dataset into train/validation/test sets."""
if abs(train_ratio + val_ratio + test_ratio - 1.0) > 1e-6:
raise ValueError(
f"Split ratios must sum to 1.0, got {train_ratio + val_ratio + test_ratio}"
)
if not data:
return [], [], []
# Use sklearn for consistent splitting
from sklearn.model_selection import train_test_split
# First split: train vs (val + test)
if len(data) == 1:
return data, [], []
train_data, temp_data = train_test_split(
data, test_size=(val_ratio + test_ratio), random_state=42, shuffle=True
)
# Second split: val vs test
if temp_data and val_ratio > 0 and test_ratio > 0:
val_ratio_normalized = val_ratio / (val_ratio + test_ratio)
val_data, test_data = train_test_split(
temp_data,
test_size=(1 - val_ratio_normalized),
random_state=42,
shuffle=True,
)
elif val_ratio > 0:
val_data, test_data = temp_data, []
else:
val_data, test_data = [], temp_data
return train_data, val_data, test_data
def _calculate_stats(self, data: List[Dict]) -> Dict:
"""Calculate statistics for the dataset."""
if not data:
return {
"total_samples": 0,
"avg_length": 0,
"min_length": 0,
"max_length": 0,
"agent_distribution": {},
}
lengths = [len(item.get("text", "")) for item in data]
agent_counts = {}
for item in data:
agent = item.get("agent_type", "unknown")
agent_counts[agent] = agent_counts.get(agent, 0) + 1
return {
"total_samples": len(data),
"avg_length": sum(lengths) / len(lengths),
"min_length": min(lengths),
"max_length": max(lengths),
"agent_distribution": agent_counts,
}
def _load_agent_data(self, agent_type: str) -> List[Dict]:
"""Load data for a specific agent type."""
if not self.input_dir:
return []
# Look for files matching the agent type. We intentionally call glob even
# if the directory may not exist in test environments, since tests patch
# pathlib.Path.glob.
pattern = f"*{agent_type}*.jsonl"
matching_files = list(self.input_dir.glob(pattern))
data = []
for file_path in matching_files:
file_data = self._load_jsonl(file_path)
data.extend(file_data)
return data
def _save_jsonl(self, data: List[Dict], output_path: Path) -> None:
"""Save data to JSONL file."""
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
def _save_stats(self, stats: Dict, output_path: Path) -> None:
"""Save statistics to JSON file."""
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(stats, f, indent=2, ensure_ascii=False)
def process_agent(
self,
agent_type: str,
train_ratio: float = 0.8,
val_ratio: float = 0.1,
test_ratio: float = 0.1,
) -> None:
"""Process data for a single agent type."""
# Load data
data = self._load_agent_data(agent_type)
if not data:
raise ValueError(f"No valid data found for agent type: {agent_type}")
# Validate and clean data
valid_data = []
for sample in data:
if self._validate_sample(sample):
formatted = self._format_conversation(sample)
valid_data.append(formatted)
if not valid_data:
raise ValueError(
f"No valid data found after processing for agent type: {agent_type}"
)
# Remove duplicates
unique_data = self._remove_duplicates(valid_data)
# Split dataset
train_data, val_data, test_data = self._split_dataset(
unique_data, train_ratio, val_ratio, test_ratio
)
# Save datasets
self.output_dir.mkdir(parents=True, exist_ok=True)
self._save_jsonl(train_data, self.output_dir / f"{agent_type}_train.jsonl")
self._save_jsonl(val_data, self.output_dir / f"{agent_type}_val.jsonl")
self._save_jsonl(test_data, self.output_dir / f"{agent_type}_test.jsonl")
# Save statistics
stats = self._calculate_stats(unique_data)
self._save_stats(stats, self.output_dir / f"{agent_type}_stats.json")
logger.info(
f"Processed {agent_type}: {len(train_data)} train, {len(val_data)} val, {len(test_data)} test samples"
)
def _convert_to_training_format(
self, data: List[Dict], agent_type: str
) -> List[Dict]:
"""Convert to format suitable for training."""
training_data = []
for item in data:
# Create conversation format suitable for language modeling
conversation = {
"messages": [
{
"role": "system",
"content": f"You are a {agent_type.replace('_', ' ')} specialist. "
f"Provide expert recommendations and analysis.",
},
{
"role": "user",
"content": f"{item['instruction']}\n\n{item['input']}",
},
{"role": "assistant", "content": item["output"]},
],
"metadata": item.get("metadata", {}),
"agent_type": agent_type,
}
training_data.append(conversation)
return training_data
def _create_splits(
self, data: List[Dict]
) -> Tuple[List[Dict], List[Dict], List[Dict]]:
"""Create train/validation/test splits."""
train_size = self.data_config["train_split"]
val_size = self.data_config["validation_split"]
test_size = self.data_config["test_split"]
# Normalize splits to sum to 1
total = train_size + val_size + test_size
train_size /= total
val_size /= total
test_size /= total
# First split: train vs (val + test)
train_data, temp_data = train_test_split(
data, test_size=(val_size + test_size), random_state=42, shuffle=True
)
# Second split: val vs test
val_ratio = val_size / (val_size + test_size)
val_data, test_data = train_test_split(
temp_data, test_size=(1 - val_ratio), random_state=42, shuffle=True
)
return train_data, val_data, test_data
def _save_datasets(
self,
train_data: List[Dict],
val_data: List[Dict],
test_data: List[Dict],
output_dir: Path,
agent_type: str,
) -> None:
"""Save processed datasets to files."""
datasets = {"train": train_data, "validation": val_data, "test": test_data}
for split_name, split_data in datasets.items():
output_file = output_dir / f"{agent_type}_{split_name}.jsonl"
with open(output_file, "w", encoding="utf-8") as f:
for item in split_data:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
logger.info(f"Saved {len(split_data)} samples to {output_file}")
def create_combined_dataset(self, output_dir: str = "/Volumes/Mango_MAS/data/processed") -> None:
"""Create combined dataset with all agent types for multi-task training."""
output_path = Path(output_dir)
# Collect all processed data
all_train_data = []
all_val_data = []
all_test_data = []
for agent_type in self.agents_config.keys():
for split in ["train", "validation", "test"]:
file_path = output_path / f"{agent_type}_{split}.jsonl"
if file_path.exists():
data = self._load_jsonl(file_path)
if split == "train":
all_train_data.extend(data)
elif split == "validation":
all_val_data.extend(data)
else:
all_test_data.extend(data)
# Shuffle combined datasets
import random
random.seed(42)
random.shuffle(all_train_data)
random.shuffle(all_val_data)
random.shuffle(all_test_data)
# Save combined datasets
combined_datasets = {
"train": all_train_data,
"validation": all_val_data,
"test": all_test_data,
}
for split_name, split_data in combined_datasets.items():
output_file = output_path / f"combined_{split_name}.jsonl"
with open(output_file, "w", encoding="utf-8") as f:
for item in split_data:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
logger.info(
f"Saved combined {split_name} dataset: {len(split_data)} samples"
)
def generate_statistics(self, output_dir: str = "/Volumes/Mango_MAS/data/processed") -> Dict:
"""Generate statistics about the processed datasets."""
output_path = Path(output_dir)
stats = {}
for agent_type in list(self.agents_config.keys()) + ["combined"]:
agent_stats = {}
for split in ["train", "validation", "test"]:
file_path = output_path / f"{agent_type}_{split}.jsonl"
if file_path.exists():
data = self._load_jsonl(file_path)
# Calculate statistics
lengths = []
for item in data:
if "messages" in item:
# Calculate total text length
total_length = sum(
len(msg["content"]) for msg in item["messages"]
)
lengths.append(total_length)
agent_stats[split] = {
"count": len(data),
"avg_length": sum(lengths) / len(lengths) if lengths else 0,
"min_length": min(lengths) if lengths else 0,
"max_length": max(lengths) if lengths else 0,
}
stats[agent_type] = agent_stats
# Save statistics
stats_file = output_path / "dataset_statistics.json"
with open(stats_file, "w") as f:
json.dump(stats, f, indent=2)
logger.info(f"Generated dataset statistics: {stats_file}")
return stats
def main():
parser = argparse.ArgumentParser(
description="Process MangoMAS datasets for local training"
)
parser.add_argument(
"--input_dir",
type=str,
default="/Users/iancruickshank/Documents/Model/mangomas-datasets/agents/",
help="Directory containing original JSONL files",
)
parser.add_argument(
"--output_dir",
type=str,
default="/Volumes/Mango_MAS/data/processed",
help="Directory to save processed datasets",
)
parser.add_argument(
"--config",
type=str,
default="config/training/distillation.yaml",
help="Path to configuration file",
)
parser.add_argument(
"--create_combined",
action="store_true",
help="Create combined multi-agent dataset",
)
args = parser.parse_args()
# Initialize processor
processor = MangoMASDataProcessor(args.config)
# Process datasets
processor.process_datasets(args.input_dir, args.output_dir)
# Create combined dataset if requested
if args.create_combined:
processor.create_combined_dataset(args.output_dir)
# Generate statistics
stats = processor.generate_statistics(args.output_dir)
print("\nDataset Statistics:")
print("=" * 50)
for agent_type, agent_stats in stats.items():
print(f"\n{agent_type.upper()}:")
for split, split_stats in agent_stats.items():
print(
f" {split}: {split_stats['count']} samples, "
f"avg length: {split_stats['avg_length']:.0f} chars"
)
if __name__ == "__main__":
main()