File size: 6,087 Bytes
f50dc54 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
#!/usr/bin/env python3
"""
IPO Grouped Sampling ํ
์คํธ
IPO ๊ทธ๋ฃน ์ํ๋ง์ด ์ ๋๋ก ์๋ํ๋์ง ํ์ธ
"""
import sys
import os
sys.path.append('/home/ubuntu/RLVR/TestTime-RLVR-v2')
sys.path.append('/home/ubuntu/RLVR/verl')
import pandas as pd
import numpy as np
from transformers import AutoTokenizer
from absolute_zero_reasoner.utils.dataset.ttrlvr_dataset import TTRLVRDataset
from absolute_zero_reasoner.utils.dataset.ipo_grouped_sampler import IPOGroupedBatchSampler
def create_test_data():
"""ํ
์คํธ์ฉ parquet ํ์ผ ์์ฑ"""
# 3๊ฐ์ IPO ๊ทธ๋ฃน, ๊ฐ๊ฐ 3๊ฐ์ task (induction, deduction, abduction)
data = []
for ipo_idx in range(3):
ipo_group_id = f"Mbpp_2_program_var_0_ipo_{ipo_idx}"
# ๊ฐ IPO ๊ทธ๋ฃน์ 3๊ฐ์ task
for task_type in ['induction', 'deduction', 'abduction']:
record = {
'prompt': f"Test prompt for {task_type} task from IPO {ipo_idx}",
'ground_truth': f"Expected solution for {task_type}",
'uid': f"Mbpp_2_round_1_{task_type}_{ipo_idx}",
'ipo_group_id': ipo_group_id,
'problem': {
'input': f"test_input_{ipo_idx}",
'output': f"test_output_{ipo_idx}",
'snippet': f"def test_func_{ipo_idx}(): pass"
},
'basic_accuracy': 0.0
}
data.append(record)
# DataFrame ์์ฑ ๋ฐ ์ ์ฅ
df = pd.DataFrame(data)
test_file = '/tmp/test_ipo_grouped.parquet'
df.to_parquet(test_file)
print(f"โ
Created test data with {len(data)} samples in {len(df['ipo_group_id'].unique())} IPO groups")
print(f" Saved to: {test_file}")
return test_file
def test_ipo_grouped_sampler():
"""IPO ๊ทธ๋ฃน ์ํ๋ฌ ํ
์คํธ"""
print("\n๐ง Testing IPO Grouped Sampler")
print("=" * 60)
# 1. ํ
์คํธ ๋ฐ์ดํฐ ์์ฑ
test_file = create_test_data()
# 2. ํ ํฌ๋์ด์ ๋ก๋
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-0.5B-Instruct")
# 3. ๋ฐ์ดํฐ์
์์ฑ
dataset = TTRLVRDataset(
parquet_files=test_file,
tokenizer=tokenizer
)
print(f"\n๐ Dataset loaded: {len(dataset)} samples")
# 4. IPO ๊ทธ๋ฃน ์ํ๋ฌ ์์ฑ
batch_size = 3 # ํ IPO ๊ทธ๋ฃน์ 3๊ฐ task๊ฐ ํ ๋ฐฐ์น์ ๋ค์ด๊ฐ๋๋ก
sampler = IPOGroupedBatchSampler(
dataset=dataset,
batch_size=batch_size,
shuffle=False, # ํ
์คํธ๋ฅผ ์ํด ์
ํ ๋นํ์ฑํ
drop_last=False
)
print(f"\n๐ฏ Sampler created with batch_size={batch_size}")
print(f" Total batches: {len(sampler)}")
# 5. ๋ฐฐ์น ํ์ธ
print("\n๐ฆ Checking batch composition:")
for batch_idx, batch_indices in enumerate(sampler):
print(f"\n Batch {batch_idx + 1}: {len(batch_indices)} samples")
# ๋ฐฐ์น ๋ด IPO ๊ทธ๋ฃน ํ์ธ
ipo_groups = []
for idx in batch_indices:
row = dataset.dataframe.iloc[idx]
ipo_group = row['ipo_group_id']
uid = row['uid']
ipo_groups.append(ipo_group)
print(f" - idx={idx}: {uid} (IPO: {ipo_group})")
# ๊ฐ์ IPO ๊ทธ๋ฃน์ธ์ง ํ์ธ
unique_groups = set(ipo_groups)
if len(unique_groups) == 1:
print(f" โ
All samples from same IPO group!")
else:
print(f" โ ๏ธ Mixed IPO groups: {unique_groups}")
# 6. ์
ํ ํ
์คํธ
print("\n\n๐ Testing with shuffle=True:")
sampler_shuffled = IPOGroupedBatchSampler(
dataset=dataset,
batch_size=batch_size,
shuffle=True,
seed=42
)
batch_order = []
for batch_idx, batch_indices in enumerate(sampler_shuffled):
first_idx = batch_indices[0]
row = dataset.dataframe.iloc[first_idx]
ipo_group = row['ipo_group_id']
batch_order.append(ipo_group)
print(f" Batch {batch_idx + 1}: IPO group = {ipo_group}")
print("\nโ
IPO Grouped Sampler test completed!")
return True
def test_verl_integration():
"""VeRL์ create_rl_sampler์ ํตํฉ ํ
์คํธ"""
print("\n\n๐ง Testing VeRL Integration")
print("=" * 60)
# ํ
์คํธ ์ค์
from omegaconf import OmegaConf
data_config = OmegaConf.create({
'train_batch_size': 3,
'shuffle': True,
'use_ipo_grouping': True, # IPO ๊ทธ๋ฃน ์ํ๋ง ํ์ฑํ
'drop_last': False,
'seed': 42
})
# ํ
์คํธ ๋ฐ์ดํฐ
test_file = '/tmp/test_ipo_grouped.parquet'
if not os.path.exists(test_file):
test_file = create_test_data()
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-0.5B-Instruct")
dataset = TTRLVRDataset(
parquet_files=test_file,
tokenizer=tokenizer
)
# create_rl_sampler ํธ์ถ
from verl.trainer.main_ppo import create_rl_sampler
sampler = create_rl_sampler(data_config, dataset)
# ์ํ๋ฌ ํ์
ํ์ธ
print(f"Sampler type: {type(sampler).__name__}")
if hasattr(sampler, '__len__'):
print(f"Number of batches: {len(sampler)}")
# ๋ฐฐ์น ํ์ธ (BatchSampler์ธ ๊ฒฝ์ฐ)
if hasattr(sampler, '__iter__'):
print("\nFirst 3 batches:")
for i, batch in enumerate(sampler):
if i >= 3:
break
if isinstance(batch, list):
print(f" Batch {i+1}: {len(batch)} samples - indices: {batch}")
else:
print(f" Batch {i+1}: {batch}")
print("\nโ
VeRL integration test completed!")
return True
if __name__ == "__main__":
print("๐ Starting IPO Grouped Sampling Tests")
print("=" * 80)
# ๊ธฐ๋ณธ ์ํ๋ฌ ํ
์คํธ
test_ipo_grouped_sampler()
# VeRL ํตํฉ ํ
์คํธ
test_verl_integration()
print("\n" + "=" * 80)
print("๐ All tests completed successfully!") |