neural-mesh / test /test_ipo_grouped_sampling.py
hjkim00's picture
Upload TestTime-RLVR-v2 from Full-pipeline-relative_0827 branch
f50dc54 verified
#!/usr/bin/env python3
"""
IPO Grouped Sampling ํ…Œ์ŠคํŠธ
IPO ๊ทธ๋ฃน ์ƒ˜ํ”Œ๋ง์ด ์ œ๋Œ€๋กœ ์ž‘๋™ํ•˜๋Š”์ง€ ํ™•์ธ
"""
import sys
import os
sys.path.append('/home/ubuntu/RLVR/TestTime-RLVR-v2')
sys.path.append('/home/ubuntu/RLVR/verl')
import pandas as pd
import numpy as np
from transformers import AutoTokenizer
from absolute_zero_reasoner.utils.dataset.ttrlvr_dataset import TTRLVRDataset
from absolute_zero_reasoner.utils.dataset.ipo_grouped_sampler import IPOGroupedBatchSampler
def create_test_data():
"""ํ…Œ์ŠคํŠธ์šฉ parquet ํŒŒ์ผ ์ƒ์„ฑ"""
# 3๊ฐœ์˜ IPO ๊ทธ๋ฃน, ๊ฐ๊ฐ 3๊ฐœ์˜ task (induction, deduction, abduction)
data = []
for ipo_idx in range(3):
ipo_group_id = f"Mbpp_2_program_var_0_ipo_{ipo_idx}"
# ๊ฐ IPO ๊ทธ๋ฃน์— 3๊ฐœ์˜ task
for task_type in ['induction', 'deduction', 'abduction']:
record = {
'prompt': f"Test prompt for {task_type} task from IPO {ipo_idx}",
'ground_truth': f"Expected solution for {task_type}",
'uid': f"Mbpp_2_round_1_{task_type}_{ipo_idx}",
'ipo_group_id': ipo_group_id,
'problem': {
'input': f"test_input_{ipo_idx}",
'output': f"test_output_{ipo_idx}",
'snippet': f"def test_func_{ipo_idx}(): pass"
},
'basic_accuracy': 0.0
}
data.append(record)
# DataFrame ์ƒ์„ฑ ๋ฐ ์ €์žฅ
df = pd.DataFrame(data)
test_file = '/tmp/test_ipo_grouped.parquet'
df.to_parquet(test_file)
print(f"โœ… Created test data with {len(data)} samples in {len(df['ipo_group_id'].unique())} IPO groups")
print(f" Saved to: {test_file}")
return test_file
def test_ipo_grouped_sampler():
"""IPO ๊ทธ๋ฃน ์ƒ˜ํ”Œ๋Ÿฌ ํ…Œ์ŠคํŠธ"""
print("\n๐Ÿ”ง Testing IPO Grouped Sampler")
print("=" * 60)
# 1. ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ
test_file = create_test_data()
# 2. ํ† ํฌ๋‚˜์ด์ € ๋กœ๋“œ
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-0.5B-Instruct")
# 3. ๋ฐ์ดํ„ฐ์…‹ ์ƒ์„ฑ
dataset = TTRLVRDataset(
parquet_files=test_file,
tokenizer=tokenizer
)
print(f"\n๐Ÿ“Š Dataset loaded: {len(dataset)} samples")
# 4. IPO ๊ทธ๋ฃน ์ƒ˜ํ”Œ๋Ÿฌ ์ƒ์„ฑ
batch_size = 3 # ํ•œ IPO ๊ทธ๋ฃน์˜ 3๊ฐœ task๊ฐ€ ํ•œ ๋ฐฐ์น˜์— ๋“ค์–ด๊ฐ€๋„๋ก
sampler = IPOGroupedBatchSampler(
dataset=dataset,
batch_size=batch_size,
shuffle=False, # ํ…Œ์ŠคํŠธ๋ฅผ ์œ„ํ•ด ์…”ํ”Œ ๋น„ํ™œ์„ฑํ™”
drop_last=False
)
print(f"\n๐ŸŽฏ Sampler created with batch_size={batch_size}")
print(f" Total batches: {len(sampler)}")
# 5. ๋ฐฐ์น˜ ํ™•์ธ
print("\n๐Ÿ“ฆ Checking batch composition:")
for batch_idx, batch_indices in enumerate(sampler):
print(f"\n Batch {batch_idx + 1}: {len(batch_indices)} samples")
# ๋ฐฐ์น˜ ๋‚ด IPO ๊ทธ๋ฃน ํ™•์ธ
ipo_groups = []
for idx in batch_indices:
row = dataset.dataframe.iloc[idx]
ipo_group = row['ipo_group_id']
uid = row['uid']
ipo_groups.append(ipo_group)
print(f" - idx={idx}: {uid} (IPO: {ipo_group})")
# ๊ฐ™์€ IPO ๊ทธ๋ฃน์ธ์ง€ ํ™•์ธ
unique_groups = set(ipo_groups)
if len(unique_groups) == 1:
print(f" โœ… All samples from same IPO group!")
else:
print(f" โš ๏ธ Mixed IPO groups: {unique_groups}")
# 6. ์…”ํ”Œ ํ…Œ์ŠคํŠธ
print("\n\n๐Ÿ”€ Testing with shuffle=True:")
sampler_shuffled = IPOGroupedBatchSampler(
dataset=dataset,
batch_size=batch_size,
shuffle=True,
seed=42
)
batch_order = []
for batch_idx, batch_indices in enumerate(sampler_shuffled):
first_idx = batch_indices[0]
row = dataset.dataframe.iloc[first_idx]
ipo_group = row['ipo_group_id']
batch_order.append(ipo_group)
print(f" Batch {batch_idx + 1}: IPO group = {ipo_group}")
print("\nโœ… IPO Grouped Sampler test completed!")
return True
def test_verl_integration():
"""VeRL์˜ create_rl_sampler์™€ ํ†ตํ•ฉ ํ…Œ์ŠคํŠธ"""
print("\n\n๐Ÿ”ง Testing VeRL Integration")
print("=" * 60)
# ํ…Œ์ŠคํŠธ ์„ค์ •
from omegaconf import OmegaConf
data_config = OmegaConf.create({
'train_batch_size': 3,
'shuffle': True,
'use_ipo_grouping': True, # IPO ๊ทธ๋ฃน ์ƒ˜ํ”Œ๋ง ํ™œ์„ฑํ™”
'drop_last': False,
'seed': 42
})
# ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ
test_file = '/tmp/test_ipo_grouped.parquet'
if not os.path.exists(test_file):
test_file = create_test_data()
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-0.5B-Instruct")
dataset = TTRLVRDataset(
parquet_files=test_file,
tokenizer=tokenizer
)
# create_rl_sampler ํ˜ธ์ถœ
from verl.trainer.main_ppo import create_rl_sampler
sampler = create_rl_sampler(data_config, dataset)
# ์ƒ˜ํ”Œ๋Ÿฌ ํƒ€์ž… ํ™•์ธ
print(f"Sampler type: {type(sampler).__name__}")
if hasattr(sampler, '__len__'):
print(f"Number of batches: {len(sampler)}")
# ๋ฐฐ์น˜ ํ™•์ธ (BatchSampler์ธ ๊ฒฝ์šฐ)
if hasattr(sampler, '__iter__'):
print("\nFirst 3 batches:")
for i, batch in enumerate(sampler):
if i >= 3:
break
if isinstance(batch, list):
print(f" Batch {i+1}: {len(batch)} samples - indices: {batch}")
else:
print(f" Batch {i+1}: {batch}")
print("\nโœ… VeRL integration test completed!")
return True
if __name__ == "__main__":
print("๐Ÿš€ Starting IPO Grouped Sampling Tests")
print("=" * 80)
# ๊ธฐ๋ณธ ์ƒ˜ํ”Œ๋Ÿฌ ํ…Œ์ŠคํŠธ
test_ipo_grouped_sampler()
# VeRL ํ†ตํ•ฉ ํ…Œ์ŠคํŠธ
test_verl_integration()
print("\n" + "=" * 80)
print("๐ŸŽ‰ All tests completed successfully!")