|
|
""" |
|
|
Data Processor for MangoMAS Local Training |
|
|
|
|
|
This module processes the original MangoMAS datasets from JSONL format |
|
|
into training-ready datasets with proper splits and preprocessing. |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Tuple |
|
|
|
|
|
import yaml |
|
|
from sklearn.model_selection import train_test_split |
|
|
from tqdm import tqdm |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class MangoMASDataProcessor: |
|
|
"""Process MangoMAS datasets for local training.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
input_dir, |
|
|
output_dir=None, |
|
|
min_length: int = 10, |
|
|
max_length: int = 2048, |
|
|
config_path: str = None, |
|
|
): |
|
|
"""Initialize with input/output directories or config path for flexibility.""" |
|
|
|
|
|
if config_path is not None: |
|
|
|
|
|
with open(config_path, "r") as f: |
|
|
self.config = yaml.safe_load(f) |
|
|
self.data_config = self.config["data"] |
|
|
self.agents_config = self.config["agents"] |
|
|
self.input_dir = Path(input_dir) if input_dir else None |
|
|
self.output_dir = Path(output_dir) if output_dir else Path("/Volumes/Mango_MAS/data/processed") |
|
|
self.min_length = self.data_config.get("preprocessing", {}).get( |
|
|
"min_length", min_length |
|
|
) |
|
|
self.max_length = self.data_config.get("preprocessing", {}).get( |
|
|
"max_length", max_length |
|
|
) |
|
|
else: |
|
|
|
|
|
self.input_dir = Path(input_dir) |
|
|
self.output_dir = Path(output_dir) if output_dir else Path("/Volumes/Mango_MAS/data/processed") |
|
|
self.min_length = min_length |
|
|
self.max_length = max_length |
|
|
self.config = None |
|
|
self.data_config = None |
|
|
self.agents_config = None |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
def process_datasets( |
|
|
self, input_dir: str, output_dir: str = "/Volumes/Mango_MAS/data/processed" |
|
|
) -> None: |
|
|
""" |
|
|
Process all agent datasets from input directory. |
|
|
|
|
|
Args: |
|
|
input_dir: Directory containing original JSONL files |
|
|
output_dir: Directory to save processed datasets |
|
|
""" |
|
|
input_path = Path(input_dir) |
|
|
output_path = Path(output_dir) |
|
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
datasets = { |
|
|
"infrastructure": input_path |
|
|
/ "infrastructure_agent_synthetic_prompts.jsonl", |
|
|
"devsecops": input_path / "devsecops_agent_synthetic_prompts.jsonl", |
|
|
"risk_assessment": input_path |
|
|
/ "risk_assessment_agent_synthetic_prompts.jsonl", |
|
|
} |
|
|
|
|
|
for agent_type, file_path in datasets.items(): |
|
|
if file_path.exists(): |
|
|
logger.info(f"Processing {agent_type} dataset from {file_path}") |
|
|
self._process_single_dataset(file_path, output_path, agent_type) |
|
|
else: |
|
|
logger.warning(f"Dataset file not found: {file_path}") |
|
|
|
|
|
def _process_single_dataset( |
|
|
self, input_file: Path, output_dir: Path, agent_type: str |
|
|
) -> None: |
|
|
"""Process a single agent dataset.""" |
|
|
|
|
|
data = self._load_jsonl(input_file) |
|
|
logger.info(f"Loaded {len(data)} samples for {agent_type}") |
|
|
|
|
|
|
|
|
cleaned_data = self._clean_data(data) |
|
|
logger.info(f"After cleaning: {len(cleaned_data)} samples") |
|
|
|
|
|
|
|
|
training_data = self._convert_to_training_format(cleaned_data, agent_type) |
|
|
|
|
|
|
|
|
train_data, val_data, test_data = self._create_splits(training_data) |
|
|
|
|
|
|
|
|
self._save_datasets(train_data, val_data, test_data, output_dir, agent_type) |
|
|
|
|
|
logger.info( |
|
|
f"Saved {agent_type} dataset: " |
|
|
f"{len(train_data)} train, {len(val_data)} val, {len(test_data)} test" |
|
|
) |
|
|
|
|
|
def _load_jsonl(self, file_path: Path) -> List[Dict]: |
|
|
"""Load data from JSONL file.""" |
|
|
data = [] |
|
|
with open(file_path, "r", encoding="utf-8") as f: |
|
|
for line_num, line in enumerate(f, 1): |
|
|
try: |
|
|
data.append(json.loads(line.strip())) |
|
|
except json.JSONDecodeError as e: |
|
|
logger.warning(f"Skipping invalid JSON on line {line_num}: {e}") |
|
|
return data |
|
|
|
|
|
def _clean_data(self, data: List[Dict]) -> List[Dict]: |
|
|
"""Clean and validate the data.""" |
|
|
cleaned = [] |
|
|
|
|
|
for item in tqdm(data, desc="Cleaning data"): |
|
|
|
|
|
if not all(key in item for key in ["instruction", "input", "output"]): |
|
|
continue |
|
|
|
|
|
|
|
|
input_text = f"{item['instruction']} {item['input']}" |
|
|
output_text = item["output"] |
|
|
|
|
|
if ( |
|
|
len(input_text) < self.data_config["preprocessing"]["min_length"] |
|
|
or len(input_text) > self.data_config["preprocessing"]["max_length"] |
|
|
): |
|
|
continue |
|
|
|
|
|
if ( |
|
|
len(output_text) < self.data_config["preprocessing"]["min_length"] |
|
|
or len(output_text) > self.data_config["preprocessing"]["max_length"] |
|
|
): |
|
|
continue |
|
|
|
|
|
cleaned.append(item) |
|
|
|
|
|
|
|
|
if self.data_config["preprocessing"]["remove_duplicates"]: |
|
|
cleaned = self._remove_duplicates(cleaned) |
|
|
|
|
|
return cleaned |
|
|
|
|
|
def _remove_duplicates(self, data: List[Dict]) -> List[Dict]: |
|
|
"""Remove duplicate entries based on input text.""" |
|
|
seen_inputs = set() |
|
|
unique_data = [] |
|
|
|
|
|
for item in data: |
|
|
input_text = f"{item['instruction']} {item['input']}" |
|
|
if input_text not in seen_inputs: |
|
|
seen_inputs.add(input_text) |
|
|
unique_data.append(item) |
|
|
|
|
|
logger.info(f"Removed {len(data) - len(unique_data)} duplicates") |
|
|
return unique_data |
|
|
|
|
|
def _validate_sample(self, sample: Dict) -> bool: |
|
|
"""Validate a single sample for required fields and length constraints.""" |
|
|
|
|
|
required_fields = ["instruction", "input", "output", "agent_type"] |
|
|
if not all(key in sample for key in required_fields): |
|
|
return False |
|
|
|
|
|
|
|
|
combined_text = f"{sample['instruction']} {sample['input']} {sample['output']}" |
|
|
if len(combined_text) < self.min_length or len(combined_text) > self.max_length: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
"""Clean text by normalizing whitespace and removing extra spaces.""" |
|
|
import re |
|
|
|
|
|
|
|
|
cleaned = re.sub(r"\s+", " ", text.strip()) |
|
|
return cleaned |
|
|
|
|
|
def _format_conversation(self, sample: Dict) -> Dict: |
|
|
"""Format sample into conversation format suitable for training.""" |
|
|
|
|
|
if sample.get("input", "").strip(): |
|
|
conversation_text = f"Human: {sample['instruction']}\n{sample['input']}\n\nAssistant: {sample['output']}" |
|
|
else: |
|
|
conversation_text = ( |
|
|
f"Human: {sample['instruction']}\n\nAssistant: {sample['output']}" |
|
|
) |
|
|
|
|
|
return { |
|
|
"text": conversation_text, |
|
|
"agent_type": sample["agent_type"], |
|
|
"instruction": sample["instruction"], |
|
|
"input": sample["input"], |
|
|
"output": sample["output"], |
|
|
} |
|
|
|
|
|
def _split_dataset( |
|
|
self, |
|
|
data: List[Dict], |
|
|
train_ratio: float = 0.8, |
|
|
val_ratio: float = 0.1, |
|
|
test_ratio: float = 0.1, |
|
|
) -> Tuple[List[Dict], List[Dict], List[Dict]]: |
|
|
"""Split dataset into train/validation/test sets.""" |
|
|
if abs(train_ratio + val_ratio + test_ratio - 1.0) > 1e-6: |
|
|
raise ValueError( |
|
|
f"Split ratios must sum to 1.0, got {train_ratio + val_ratio + test_ratio}" |
|
|
) |
|
|
|
|
|
if not data: |
|
|
return [], [], [] |
|
|
|
|
|
|
|
|
from sklearn.model_selection import train_test_split |
|
|
|
|
|
|
|
|
if len(data) == 1: |
|
|
return data, [], [] |
|
|
|
|
|
train_data, temp_data = train_test_split( |
|
|
data, test_size=(val_ratio + test_ratio), random_state=42, shuffle=True |
|
|
) |
|
|
|
|
|
|
|
|
if temp_data and val_ratio > 0 and test_ratio > 0: |
|
|
val_ratio_normalized = val_ratio / (val_ratio + test_ratio) |
|
|
val_data, test_data = train_test_split( |
|
|
temp_data, |
|
|
test_size=(1 - val_ratio_normalized), |
|
|
random_state=42, |
|
|
shuffle=True, |
|
|
) |
|
|
elif val_ratio > 0: |
|
|
val_data, test_data = temp_data, [] |
|
|
else: |
|
|
val_data, test_data = [], temp_data |
|
|
|
|
|
return train_data, val_data, test_data |
|
|
|
|
|
def _calculate_stats(self, data: List[Dict]) -> Dict: |
|
|
"""Calculate statistics for the dataset.""" |
|
|
if not data: |
|
|
return { |
|
|
"total_samples": 0, |
|
|
"avg_length": 0, |
|
|
"min_length": 0, |
|
|
"max_length": 0, |
|
|
"agent_distribution": {}, |
|
|
} |
|
|
|
|
|
lengths = [len(item.get("text", "")) for item in data] |
|
|
agent_counts = {} |
|
|
|
|
|
for item in data: |
|
|
agent = item.get("agent_type", "unknown") |
|
|
agent_counts[agent] = agent_counts.get(agent, 0) + 1 |
|
|
|
|
|
return { |
|
|
"total_samples": len(data), |
|
|
"avg_length": sum(lengths) / len(lengths), |
|
|
"min_length": min(lengths), |
|
|
"max_length": max(lengths), |
|
|
"agent_distribution": agent_counts, |
|
|
} |
|
|
|
|
|
def _load_agent_data(self, agent_type: str) -> List[Dict]: |
|
|
"""Load data for a specific agent type.""" |
|
|
if not self.input_dir: |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pattern = f"*{agent_type}*.jsonl" |
|
|
matching_files = list(self.input_dir.glob(pattern)) |
|
|
|
|
|
data = [] |
|
|
for file_path in matching_files: |
|
|
file_data = self._load_jsonl(file_path) |
|
|
data.extend(file_data) |
|
|
|
|
|
return data |
|
|
|
|
|
def _save_jsonl(self, data: List[Dict], output_path: Path) -> None: |
|
|
"""Save data to JSONL file.""" |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
with open(output_path, "w", encoding="utf-8") as f: |
|
|
for item in data: |
|
|
f.write(json.dumps(item, ensure_ascii=False) + "\n") |
|
|
|
|
|
def _save_stats(self, stats: Dict, output_path: Path) -> None: |
|
|
"""Save statistics to JSON file.""" |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
with open(output_path, "w", encoding="utf-8") as f: |
|
|
json.dump(stats, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
def process_agent( |
|
|
self, |
|
|
agent_type: str, |
|
|
train_ratio: float = 0.8, |
|
|
val_ratio: float = 0.1, |
|
|
test_ratio: float = 0.1, |
|
|
) -> None: |
|
|
"""Process data for a single agent type.""" |
|
|
|
|
|
data = self._load_agent_data(agent_type) |
|
|
|
|
|
if not data: |
|
|
raise ValueError(f"No valid data found for agent type: {agent_type}") |
|
|
|
|
|
|
|
|
valid_data = [] |
|
|
for sample in data: |
|
|
if self._validate_sample(sample): |
|
|
formatted = self._format_conversation(sample) |
|
|
valid_data.append(formatted) |
|
|
|
|
|
if not valid_data: |
|
|
raise ValueError( |
|
|
f"No valid data found after processing for agent type: {agent_type}" |
|
|
) |
|
|
|
|
|
|
|
|
unique_data = self._remove_duplicates(valid_data) |
|
|
|
|
|
|
|
|
train_data, val_data, test_data = self._split_dataset( |
|
|
unique_data, train_ratio, val_ratio, test_ratio |
|
|
) |
|
|
|
|
|
|
|
|
self.output_dir.mkdir(parents=True, exist_ok=True) |
|
|
self._save_jsonl(train_data, self.output_dir / f"{agent_type}_train.jsonl") |
|
|
self._save_jsonl(val_data, self.output_dir / f"{agent_type}_val.jsonl") |
|
|
self._save_jsonl(test_data, self.output_dir / f"{agent_type}_test.jsonl") |
|
|
|
|
|
|
|
|
stats = self._calculate_stats(unique_data) |
|
|
self._save_stats(stats, self.output_dir / f"{agent_type}_stats.json") |
|
|
|
|
|
logger.info( |
|
|
f"Processed {agent_type}: {len(train_data)} train, {len(val_data)} val, {len(test_data)} test samples" |
|
|
) |
|
|
|
|
|
def _convert_to_training_format( |
|
|
self, data: List[Dict], agent_type: str |
|
|
) -> List[Dict]: |
|
|
"""Convert to format suitable for training.""" |
|
|
training_data = [] |
|
|
|
|
|
for item in data: |
|
|
|
|
|
conversation = { |
|
|
"messages": [ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": f"You are a {agent_type.replace('_', ' ')} specialist. " |
|
|
f"Provide expert recommendations and analysis.", |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": f"{item['instruction']}\n\n{item['input']}", |
|
|
}, |
|
|
{"role": "assistant", "content": item["output"]}, |
|
|
], |
|
|
"metadata": item.get("metadata", {}), |
|
|
"agent_type": agent_type, |
|
|
} |
|
|
training_data.append(conversation) |
|
|
|
|
|
return training_data |
|
|
|
|
|
def _create_splits( |
|
|
self, data: List[Dict] |
|
|
) -> Tuple[List[Dict], List[Dict], List[Dict]]: |
|
|
"""Create train/validation/test splits.""" |
|
|
train_size = self.data_config["train_split"] |
|
|
val_size = self.data_config["validation_split"] |
|
|
test_size = self.data_config["test_split"] |
|
|
|
|
|
|
|
|
total = train_size + val_size + test_size |
|
|
train_size /= total |
|
|
val_size /= total |
|
|
test_size /= total |
|
|
|
|
|
|
|
|
train_data, temp_data = train_test_split( |
|
|
data, test_size=(val_size + test_size), random_state=42, shuffle=True |
|
|
) |
|
|
|
|
|
|
|
|
val_ratio = val_size / (val_size + test_size) |
|
|
val_data, test_data = train_test_split( |
|
|
temp_data, test_size=(1 - val_ratio), random_state=42, shuffle=True |
|
|
) |
|
|
|
|
|
return train_data, val_data, test_data |
|
|
|
|
|
def _save_datasets( |
|
|
self, |
|
|
train_data: List[Dict], |
|
|
val_data: List[Dict], |
|
|
test_data: List[Dict], |
|
|
output_dir: Path, |
|
|
agent_type: str, |
|
|
) -> None: |
|
|
"""Save processed datasets to files.""" |
|
|
datasets = {"train": train_data, "validation": val_data, "test": test_data} |
|
|
|
|
|
for split_name, split_data in datasets.items(): |
|
|
output_file = output_dir / f"{agent_type}_{split_name}.jsonl" |
|
|
|
|
|
with open(output_file, "w", encoding="utf-8") as f: |
|
|
for item in split_data: |
|
|
f.write(json.dumps(item, ensure_ascii=False) + "\n") |
|
|
|
|
|
logger.info(f"Saved {len(split_data)} samples to {output_file}") |
|
|
|
|
|
def create_combined_dataset(self, output_dir: str = "/Volumes/Mango_MAS/data/processed") -> None: |
|
|
"""Create combined dataset with all agent types for multi-task training.""" |
|
|
output_path = Path(output_dir) |
|
|
|
|
|
|
|
|
all_train_data = [] |
|
|
all_val_data = [] |
|
|
all_test_data = [] |
|
|
|
|
|
for agent_type in self.agents_config.keys(): |
|
|
for split in ["train", "validation", "test"]: |
|
|
file_path = output_path / f"{agent_type}_{split}.jsonl" |
|
|
if file_path.exists(): |
|
|
data = self._load_jsonl(file_path) |
|
|
|
|
|
if split == "train": |
|
|
all_train_data.extend(data) |
|
|
elif split == "validation": |
|
|
all_val_data.extend(data) |
|
|
else: |
|
|
all_test_data.extend(data) |
|
|
|
|
|
|
|
|
import random |
|
|
|
|
|
random.seed(42) |
|
|
random.shuffle(all_train_data) |
|
|
random.shuffle(all_val_data) |
|
|
random.shuffle(all_test_data) |
|
|
|
|
|
|
|
|
combined_datasets = { |
|
|
"train": all_train_data, |
|
|
"validation": all_val_data, |
|
|
"test": all_test_data, |
|
|
} |
|
|
|
|
|
for split_name, split_data in combined_datasets.items(): |
|
|
output_file = output_path / f"combined_{split_name}.jsonl" |
|
|
|
|
|
with open(output_file, "w", encoding="utf-8") as f: |
|
|
for item in split_data: |
|
|
f.write(json.dumps(item, ensure_ascii=False) + "\n") |
|
|
|
|
|
logger.info( |
|
|
f"Saved combined {split_name} dataset: {len(split_data)} samples" |
|
|
) |
|
|
|
|
|
def generate_statistics(self, output_dir: str = "/Volumes/Mango_MAS/data/processed") -> Dict: |
|
|
"""Generate statistics about the processed datasets.""" |
|
|
output_path = Path(output_dir) |
|
|
stats = {} |
|
|
|
|
|
for agent_type in list(self.agents_config.keys()) + ["combined"]: |
|
|
agent_stats = {} |
|
|
|
|
|
for split in ["train", "validation", "test"]: |
|
|
file_path = output_path / f"{agent_type}_{split}.jsonl" |
|
|
|
|
|
if file_path.exists(): |
|
|
data = self._load_jsonl(file_path) |
|
|
|
|
|
|
|
|
lengths = [] |
|
|
for item in data: |
|
|
if "messages" in item: |
|
|
|
|
|
total_length = sum( |
|
|
len(msg["content"]) for msg in item["messages"] |
|
|
) |
|
|
lengths.append(total_length) |
|
|
|
|
|
agent_stats[split] = { |
|
|
"count": len(data), |
|
|
"avg_length": sum(lengths) / len(lengths) if lengths else 0, |
|
|
"min_length": min(lengths) if lengths else 0, |
|
|
"max_length": max(lengths) if lengths else 0, |
|
|
} |
|
|
|
|
|
stats[agent_type] = agent_stats |
|
|
|
|
|
|
|
|
stats_file = output_path / "dataset_statistics.json" |
|
|
with open(stats_file, "w") as f: |
|
|
json.dump(stats, f, indent=2) |
|
|
|
|
|
logger.info(f"Generated dataset statistics: {stats_file}") |
|
|
return stats |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Process MangoMAS datasets for local training" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--input_dir", |
|
|
type=str, |
|
|
default="/Users/iancruickshank/Documents/Model/mangomas-datasets/agents/", |
|
|
help="Directory containing original JSONL files", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output_dir", |
|
|
type=str, |
|
|
default="/Volumes/Mango_MAS/data/processed", |
|
|
help="Directory to save processed datasets", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--config", |
|
|
type=str, |
|
|
default="config/training/distillation.yaml", |
|
|
help="Path to configuration file", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--create_combined", |
|
|
action="store_true", |
|
|
help="Create combined multi-agent dataset", |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
processor = MangoMASDataProcessor(args.config) |
|
|
|
|
|
|
|
|
processor.process_datasets(args.input_dir, args.output_dir) |
|
|
|
|
|
|
|
|
if args.create_combined: |
|
|
processor.create_combined_dataset(args.output_dir) |
|
|
|
|
|
|
|
|
stats = processor.generate_statistics(args.output_dir) |
|
|
|
|
|
print("\nDataset Statistics:") |
|
|
print("=" * 50) |
|
|
for agent_type, agent_stats in stats.items(): |
|
|
print(f"\n{agent_type.upper()}:") |
|
|
for split, split_stats in agent_stats.items(): |
|
|
print( |
|
|
f" {split}: {split_stats['count']} samples, " |
|
|
f"avg length: {split_stats['avg_length']:.0f} chars" |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|