| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """ |
| Tests for the TokenizerProcessorStep class. |
| """ |
|
|
| import tempfile |
| from unittest.mock import patch |
|
|
| import pytest |
| import torch |
|
|
| from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature |
| from lerobot.processor import DataProcessorPipeline, TokenizerProcessorStep |
| from lerobot.processor.converters import create_transition, identity_transition |
| from lerobot.types import TransitionKey |
| from lerobot.utils.constants import ( |
| ACTION, |
| OBS_IMAGE, |
| OBS_LANGUAGE, |
| OBS_LANGUAGE_SUBTASK_ATTENTION_MASK, |
| OBS_LANGUAGE_SUBTASK_TOKENS, |
| OBS_STATE, |
| ) |
| from tests.utils import require_package |
|
|
|
|
| class MockTokenizer: |
| """Mock tokenizer for testing that mimics transformers tokenizer interface.""" |
|
|
| def __init__(self, vocab_size: int = 1000): |
| self.vocab_size = vocab_size |
|
|
| def __call__( |
| self, |
| text: str | list[str], |
| max_length: int = 512, |
| truncation: bool = True, |
| padding: str = "max_length", |
| padding_side: str = "right", |
| return_tensors: str = "pt", |
| **kwargs, |
| ) -> dict[str, torch.Tensor]: |
| """Mock tokenization that returns deterministic tokens based on text.""" |
| texts = [text] if isinstance(text, str) else text |
|
|
| batch_size = len(texts) |
|
|
| |
| input_ids = torch.zeros(batch_size, max_length, dtype=torch.long) |
| attention_mask = torch.zeros(batch_size, max_length, dtype=torch.long) |
|
|
| for i, txt in enumerate(texts): |
| |
| text_hash = hash(txt) % self.vocab_size |
| seq_len = min(len(txt.split()), max_length) |
|
|
| |
| for j in range(seq_len): |
| input_ids[i, j] = (text_hash + j) % self.vocab_size |
|
|
| |
| attention_mask[i, :seq_len] = 1 |
|
|
| result = { |
| "input_ids": input_ids, |
| "attention_mask": attention_mask, |
| } |
|
|
| |
| if len(texts) == 1: |
| result = {k: v.squeeze(0) for k, v in result.items()} |
|
|
| return result |
|
|
|
|
| @pytest.fixture |
| def mock_tokenizer(): |
| """Provide a mock tokenizer for testing.""" |
| return MockTokenizer(vocab_size=100) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_basic_tokenization(mock_auto_tokenizer): |
| """Test basic string tokenization functionality.""" |
| |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "pick up the red cube"}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "pick up the red cube" |
|
|
| |
| observation = result[TransitionKey.OBSERVATION] |
| assert f"{OBS_LANGUAGE}.tokens" in observation |
| assert f"{OBS_LANGUAGE}.attention_mask" in observation |
|
|
| |
| tokens = observation[f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"] |
| assert isinstance(tokens, torch.Tensor) |
| assert isinstance(attention_mask, torch.Tensor) |
| assert tokens.shape == (10,) |
| assert attention_mask.shape == (10,) |
|
|
|
|
| @require_package("transformers") |
| def test_basic_tokenization_with_tokenizer_object(): |
| """Test basic string tokenization functionality using tokenizer object directly.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
|
|
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "pick up the red cube"}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "pick up the red cube" |
|
|
| |
| observation = result[TransitionKey.OBSERVATION] |
| assert f"{OBS_LANGUAGE}.tokens" in observation |
| assert f"{OBS_LANGUAGE}.attention_mask" in observation |
|
|
| |
| tokens = observation[f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"] |
| assert isinstance(tokens, torch.Tensor) |
| assert isinstance(attention_mask, torch.Tensor) |
| assert tokens.shape == (10,) |
| assert attention_mask.shape == (10,) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_list_of_strings_tokenization(mock_auto_tokenizer): |
| """Test tokenization of a list of strings.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=8) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": ["pick up cube", "place on table"]}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == ["pick up cube", "place on table"] |
|
|
| |
| observation = result[TransitionKey.OBSERVATION] |
| tokens = observation[f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"] |
| assert tokens.shape == (2, 8) |
| assert attention_mask.shape == (2, 8) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_custom_keys(mock_auto_tokenizer): |
| """Test using custom task_key.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", task_key="instruction", max_length=5) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"instruction": "move forward"}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| observation = result[TransitionKey.OBSERVATION] |
| assert f"{OBS_LANGUAGE}.tokens" in observation |
| assert f"{OBS_LANGUAGE}.attention_mask" in observation |
|
|
| tokens = observation[f"{OBS_LANGUAGE}.tokens"] |
| assert tokens.shape == (5,) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_none_complementary_data(mock_auto_tokenizer): |
| """Test handling of None complementary_data.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer") |
|
|
| transition = create_transition(observation={}, complementary_data=None) |
|
|
| |
| with pytest.raises(KeyError, match="task"): |
| processor(transition) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_missing_task_key(mock_auto_tokenizer): |
| """Test handling when task key is missing.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer") |
|
|
| transition = create_transition(observation={}, complementary_data={"other_field": "some value"}) |
|
|
| with pytest.raises(KeyError, match="task"): |
| processor(transition) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_none_task_value(mock_auto_tokenizer): |
| """Test handling when task value is None.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer") |
|
|
| transition = create_transition(observation={}, complementary_data={"task": None}) |
|
|
| with pytest.raises(ValueError, match="Task extracted from Complementary data is None"): |
| processor(transition) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_unsupported_task_type(mock_auto_tokenizer): |
| """Test handling of unsupported task types.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer") |
|
|
| |
| transition = create_transition(observation={}, complementary_data={"task": 123}) |
|
|
| with pytest.raises(ValueError, match="Task cannot be None"): |
| processor(transition) |
|
|
| |
| transition = create_transition(observation={}, complementary_data={"task": ["text", 123, "more text"]}) |
|
|
| with pytest.raises(ValueError, match="Task cannot be None"): |
| processor(transition) |
|
|
|
|
| @require_package("transformers") |
| def test_no_tokenizer_error(): |
| """Test that ValueError is raised when neither tokenizer nor tokenizer_name is provided.""" |
| with pytest.raises(ValueError, match="Either 'tokenizer' or 'tokenizer_name' must be provided"): |
| TokenizerProcessorStep() |
|
|
|
|
| @require_package("transformers") |
| def test_invalid_tokenizer_name_error(): |
| """Test that error is raised when invalid tokenizer_name is provided.""" |
| with patch("lerobot.processor.tokenizer_processor.AutoTokenizer") as mock_auto_tokenizer: |
| |
| mock_auto_tokenizer.from_pretrained.side_effect = Exception("Model not found") |
|
|
| with pytest.raises(Exception, match="Model not found"): |
| TokenizerProcessorStep(tokenizer_name="invalid-tokenizer") |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_get_config_with_tokenizer_name(mock_auto_tokenizer): |
| """Test configuration serialization when using tokenizer_name.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep( |
| tokenizer_name="test-tokenizer", |
| max_length=256, |
| task_key="instruction", |
| padding="longest", |
| truncation=False, |
| ) |
|
|
| config = processor.get_config() |
|
|
| expected = { |
| "tokenizer_name": "test-tokenizer", |
| "max_length": 256, |
| "task_key": "instruction", |
| "padding_side": "right", |
| "padding": "longest", |
| "truncation": False, |
| } |
|
|
| assert config == expected |
|
|
|
|
| @require_package("transformers") |
| def test_get_config_with_tokenizer_object(): |
| """Test configuration serialization when using tokenizer object.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
|
|
| processor = TokenizerProcessorStep( |
| tokenizer=mock_tokenizer, |
| max_length=256, |
| task_key="instruction", |
| padding="longest", |
| truncation=False, |
| ) |
|
|
| config = processor.get_config() |
|
|
| |
| expected = { |
| "max_length": 256, |
| "task_key": "instruction", |
| "padding_side": "right", |
| "padding": "longest", |
| "truncation": False, |
| } |
|
|
| assert config == expected |
| assert "tokenizer_name" not in config |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_state_dict_methods(mock_auto_tokenizer): |
| """Test state_dict and load_state_dict methods.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer") |
|
|
| |
| state = processor.state_dict() |
| assert state == {} |
|
|
| |
| processor.load_state_dict({}) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_reset_method(mock_auto_tokenizer): |
| """Test reset method.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer") |
|
|
| |
| processor.reset() |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_integration_with_robot_processor(mock_auto_tokenizer): |
| """Test integration with RobotProcessor.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| tokenizer_processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=6) |
| robot_processor = DataProcessorPipeline( |
| [tokenizer_processor], to_transition=identity_transition, to_output=identity_transition |
| ) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "test task"}, |
| ) |
|
|
| result = robot_processor(transition) |
|
|
| |
| assert TransitionKey.OBSERVATION in result |
| observation = result[TransitionKey.OBSERVATION] |
| assert f"{OBS_LANGUAGE}.tokens" in observation |
| assert f"{OBS_LANGUAGE}.attention_mask" in observation |
| tokens = observation[f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"] |
| assert tokens.shape == (6,) |
| assert attention_mask.shape == (6,) |
|
|
| |
| assert torch.equal( |
| result[TransitionKey.OBSERVATION]["state"], transition[TransitionKey.OBSERVATION]["state"] |
| ) |
| assert torch.equal(result[TransitionKey.ACTION], transition[TransitionKey.ACTION]) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_save_and_load_pretrained_with_tokenizer_name(mock_auto_tokenizer): |
| """Test saving and loading processor with tokenizer_name.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| original_processor = TokenizerProcessorStep( |
| tokenizer_name="test-tokenizer", max_length=32, task_key="instruction" |
| ) |
|
|
| robot_processor = DataProcessorPipeline( |
| [original_processor], to_transition=identity_transition, to_output=identity_transition |
| ) |
|
|
| with tempfile.TemporaryDirectory() as temp_dir: |
| |
| robot_processor.save_pretrained(temp_dir) |
|
|
| |
| loaded_processor = DataProcessorPipeline.from_pretrained( |
| temp_dir, |
| config_filename="dataprocessorpipeline.json", |
| to_transition=identity_transition, |
| to_output=identity_transition, |
| ) |
|
|
| |
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"instruction": "test instruction"}, |
| ) |
|
|
| result = loaded_processor(transition) |
| assert TransitionKey.OBSERVATION in result |
| assert f"{OBS_LANGUAGE}.tokens" in result[TransitionKey.OBSERVATION] |
| assert f"{OBS_LANGUAGE}.attention_mask" in result[TransitionKey.OBSERVATION] |
|
|
|
|
| @require_package("transformers") |
| def test_save_and_load_pretrained_with_tokenizer_object(): |
| """Test saving and loading processor with tokenizer object using overrides.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
|
|
| original_processor = TokenizerProcessorStep( |
| tokenizer=mock_tokenizer, max_length=32, task_key="instruction" |
| ) |
|
|
| robot_processor = DataProcessorPipeline( |
| [original_processor], to_transition=identity_transition, to_output=identity_transition |
| ) |
|
|
| with tempfile.TemporaryDirectory() as temp_dir: |
| |
| robot_processor.save_pretrained(temp_dir) |
|
|
| |
| loaded_processor = DataProcessorPipeline.from_pretrained( |
| temp_dir, |
| config_filename="dataprocessorpipeline.json", |
| overrides={"tokenizer_processor": {"tokenizer": mock_tokenizer}}, |
| to_transition=identity_transition, |
| to_output=identity_transition, |
| ) |
|
|
| |
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"instruction": "test instruction"}, |
| ) |
|
|
| result = loaded_processor(transition) |
| assert TransitionKey.OBSERVATION in result |
| assert f"{OBS_LANGUAGE}.tokens" in result[TransitionKey.OBSERVATION] |
| assert f"{OBS_LANGUAGE}.attention_mask" in result[TransitionKey.OBSERVATION] |
|
|
|
|
| @require_package("transformers") |
| def test_registry_functionality(): |
| """Test that the processor is properly registered.""" |
| from lerobot.processor import ProcessorStepRegistry |
|
|
| |
| assert "tokenizer_processor" in ProcessorStepRegistry.list() |
|
|
| |
| retrieved_class = ProcessorStepRegistry.get("tokenizer_processor") |
| assert retrieved_class is TokenizerProcessorStep |
|
|
|
|
| @require_package("transformers") |
| def test_features_basic(): |
| """Test basic feature contract functionality.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=128) |
|
|
| input_features = { |
| PipelineFeatureType.OBSERVATION: {OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,))}, |
| PipelineFeatureType.ACTION: {ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(5,))}, |
| } |
|
|
| output_features = processor.transform_features(input_features) |
|
|
| |
| assert OBS_STATE in output_features[PipelineFeatureType.OBSERVATION] |
| assert ACTION in output_features[PipelineFeatureType.ACTION] |
|
|
| |
| assert f"{OBS_LANGUAGE}.tokens" in output_features[PipelineFeatureType.OBSERVATION] |
| assert f"{OBS_LANGUAGE}.attention_mask" in output_features[PipelineFeatureType.OBSERVATION] |
|
|
| |
| tokens_feature = output_features[PipelineFeatureType.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask_feature = output_features[PipelineFeatureType.OBSERVATION][ |
| f"{OBS_LANGUAGE}.attention_mask" |
| ] |
|
|
| assert tokens_feature.type == FeatureType.LANGUAGE |
| assert tokens_feature.shape == (128,) |
| assert attention_mask_feature.type == FeatureType.LANGUAGE |
| assert attention_mask_feature.shape == (128,) |
|
|
|
|
| @require_package("transformers") |
| def test_features_with_custom_max_length(): |
| """Test feature contract with custom max_length.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=64) |
|
|
| input_features = {PipelineFeatureType.OBSERVATION: {}} |
| output_features = processor.transform_features(input_features) |
|
|
| |
| assert f"{OBS_LANGUAGE}.tokens" in output_features[PipelineFeatureType.OBSERVATION] |
| assert f"{OBS_LANGUAGE}.attention_mask" in output_features[PipelineFeatureType.OBSERVATION] |
|
|
| tokens_feature = output_features[PipelineFeatureType.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask_feature = output_features[PipelineFeatureType.OBSERVATION][ |
| f"{OBS_LANGUAGE}.attention_mask" |
| ] |
|
|
| assert tokens_feature.shape == (64,) |
| assert attention_mask_feature.shape == (64,) |
|
|
|
|
| @require_package("transformers") |
| def test_features_existing_features(): |
| """Test feature contract when tokenized features already exist.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=256) |
|
|
| input_features = { |
| PipelineFeatureType.OBSERVATION: { |
| f"{OBS_LANGUAGE}.tokens": PolicyFeature(type=FeatureType.LANGUAGE, shape=(100,)), |
| f"{OBS_LANGUAGE}.attention_mask": PolicyFeature(type=FeatureType.LANGUAGE, shape=(100,)), |
| } |
| } |
|
|
| output_features = processor.transform_features(input_features) |
|
|
| |
| assert output_features[PipelineFeatureType.OBSERVATION][f"{OBS_LANGUAGE}.tokens"].shape == ( |
| 100, |
| ) |
| assert output_features[PipelineFeatureType.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"].shape == (100,) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_tokenization_parameters(mock_auto_tokenizer): |
| """Test that tokenization parameters are correctly passed to tokenizer.""" |
|
|
| |
| class TrackingMockTokenizer: |
| def __init__(self): |
| self.last_call_args = None |
| self.last_call_kwargs = None |
|
|
| def __call__(self, *args, **kwargs): |
| self.last_call_args = args |
| self.last_call_kwargs = kwargs |
| |
| return { |
| "input_ids": torch.zeros(16, dtype=torch.long), |
| "attention_mask": torch.ones(16, dtype=torch.long), |
| } |
|
|
| tracking_tokenizer = TrackingMockTokenizer() |
| mock_auto_tokenizer.from_pretrained.return_value = tracking_tokenizer |
|
|
| processor = TokenizerProcessorStep( |
| tokenizer_name="test-tokenizer", |
| max_length=16, |
| padding="longest", |
| truncation=False, |
| padding_side="left", |
| ) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "test task"}, |
| ) |
|
|
| processor(transition) |
|
|
| |
| assert tracking_tokenizer.last_call_args == (["test task"],) |
| assert tracking_tokenizer.last_call_kwargs["max_length"] == 16 |
| assert tracking_tokenizer.last_call_kwargs["padding"] == "longest" |
| assert tracking_tokenizer.last_call_kwargs["padding_side"] == "left" |
| assert tracking_tokenizer.last_call_kwargs["truncation"] is False |
| assert tracking_tokenizer.last_call_kwargs["return_tensors"] == "pt" |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_preserves_other_complementary_data(mock_auto_tokenizer): |
| """Test that other complementary data fields are preserved.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer") |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={ |
| "task": "test task", |
| "episode_id": 123, |
| "timestamp": 456.789, |
| "other_field": {"nested": "data"}, |
| }, |
| ) |
|
|
| result = processor(transition) |
| comp_data = result[TransitionKey.COMPLEMENTARY_DATA] |
|
|
| |
| assert comp_data["task"] == "test task" |
| assert comp_data["episode_id"] == 123 |
| assert comp_data["timestamp"] == 456.789 |
| assert comp_data["other_field"] == {"nested": "data"} |
|
|
| |
| observation = result[TransitionKey.OBSERVATION] |
| assert f"{OBS_LANGUAGE}.tokens" in observation |
| assert f"{OBS_LANGUAGE}.attention_mask" in observation |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_deterministic_tokenization(mock_auto_tokenizer): |
| """Test that tokenization is deterministic for the same input.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "consistent test"}, |
| ) |
|
|
| result1 = processor(transition) |
| result2 = processor(transition) |
|
|
| tokens1 = result1[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask1 = result1[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] |
| tokens2 = result2[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask2 = result2[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] |
|
|
| |
| assert torch.equal(tokens1, tokens2) |
| assert torch.equal(attention_mask1, attention_mask2) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_empty_string_task(mock_auto_tokenizer): |
| """Test handling of empty string task.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=8) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": ""}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| observation = result[TransitionKey.OBSERVATION] |
| assert f"{OBS_LANGUAGE}.tokens" in observation |
| tokens = observation[f"{OBS_LANGUAGE}.tokens"] |
| assert tokens.shape == (8,) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_very_long_task(mock_auto_tokenizer): |
| """Test handling of very long task strings.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=5, truncation=True) |
|
|
| long_task = " ".join(["word"] * 100) |
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": long_task}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| observation = result[TransitionKey.OBSERVATION] |
| tokens = observation[f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"] |
| assert tokens.shape == (5,) |
| assert attention_mask.shape == (5,) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_custom_padding_side(mock_auto_tokenizer): |
| """Test using custom padding_side parameter.""" |
|
|
| |
| class PaddingSideTrackingTokenizer: |
| def __init__(self): |
| self.padding_side_calls = [] |
|
|
| def __call__( |
| self, |
| text, |
| max_length=512, |
| truncation=True, |
| padding="max_length", |
| padding_side="right", |
| return_tensors="pt", |
| **kwargs, |
| ): |
| self.padding_side_calls.append(padding_side) |
| |
| return { |
| "input_ids": torch.zeros(max_length, dtype=torch.long), |
| "attention_mask": torch.ones(max_length, dtype=torch.long), |
| } |
|
|
| tracking_tokenizer = PaddingSideTrackingTokenizer() |
| mock_auto_tokenizer.from_pretrained.return_value = tracking_tokenizer |
|
|
| |
| processor_left = TokenizerProcessorStep( |
| tokenizer_name="test-tokenizer", max_length=10, padding_side="left" |
| ) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "test task"}, |
| ) |
| processor_left(transition) |
|
|
| assert tracking_tokenizer.padding_side_calls[-1] == "left" |
|
|
| |
| processor_right = TokenizerProcessorStep( |
| tokenizer_name="test-tokenizer", max_length=10, padding_side="right" |
| ) |
|
|
| processor_right(transition) |
|
|
| assert tracking_tokenizer.padding_side_calls[-1] == "right" |
|
|
|
|
| @require_package("transformers") |
| def test_device_detection_cpu(): |
| """Test that tokenized tensors stay on CPU when other tensors are on CPU.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| |
| observation = {OBS_STATE: torch.randn(10)} |
| action = torch.randn(5) |
| transition = create_transition( |
| observation=observation, action=action, complementary_data={"task": "test task"} |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] |
|
|
| assert tokens.device.type == "cpu" |
| assert attention_mask.device.type == "cpu" |
|
|
|
|
| @pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") |
| @require_package("transformers") |
| def test_device_detection_cuda(): |
| """Test that tokenized tensors are moved to CUDA when other tensors are on CUDA.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| |
| observation = {OBS_STATE: torch.randn(10).cuda()} |
| action = torch.randn(5).cuda() |
| transition = create_transition( |
| observation=observation, action=action, complementary_data={"task": "test task"} |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] |
|
|
| assert tokens.device.type == "cuda" |
| assert attention_mask.device.type == "cuda" |
| assert tokens.device.index == 0 |
|
|
|
|
| @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs") |
| @require_package("transformers") |
| def test_device_detection_multi_gpu(): |
| """Test that tokenized tensors match device in multi-GPU setup.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| |
| device = torch.device("cuda:1") |
| observation = {OBS_STATE: torch.randn(10).to(device)} |
| action = torch.randn(5).to(device) |
| transition = create_transition( |
| observation=observation, action=action, complementary_data={"task": "multi gpu test"} |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] |
|
|
| assert tokens.device == device |
| assert attention_mask.device == device |
|
|
|
|
| @require_package("transformers") |
| def test_device_detection_no_tensors(): |
| """Test that tokenized tensors stay on CPU when no other tensors exist.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| |
| transition = create_transition( |
| observation={"metadata": {"key": "value"}}, |
| complementary_data={"task": "no tensor test"}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] |
|
|
| assert tokens.device.type == "cpu" |
| assert attention_mask.device.type == "cpu" |
|
|
|
|
| @require_package("transformers") |
| def test_device_detection_mixed_devices(): |
| """Test device detection when tensors are on different devices (uses first found).""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| if torch.cuda.is_available(): |
| |
| observation = { |
| "observation.cpu": torch.randn(10), |
| "observation.cuda": torch.randn(10).cuda(), |
| } |
| transition = create_transition( |
| observation=observation, complementary_data={"task": "mixed device test"} |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| |
| tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] |
|
|
| |
| assert tokens.device == attention_mask.device |
|
|
|
|
| @pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") |
| @require_package("transformers") |
| def test_device_detection_from_action(): |
| """Test that device is detected from action tensor when no observation tensors exist.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| |
| observation = {"metadata": {"key": "value"}} |
| action = torch.randn(5).cuda() |
| transition = create_transition( |
| observation=observation, action=action, complementary_data={"task": "action device test"} |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] |
|
|
| assert tokens.device.type == "cuda" |
| assert attention_mask.device.type == "cuda" |
|
|
|
|
| @require_package("transformers") |
| def test_device_detection_preserves_dtype(): |
| """Test that device detection doesn't affect dtype of tokenized tensors.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| |
| observation = {OBS_STATE: torch.randn(10, dtype=torch.float16)} |
| transition = create_transition(observation=observation, complementary_data={"task": "dtype test"}) |
|
|
| result = processor(transition) |
|
|
| |
| tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] |
|
|
| assert tokens.dtype == torch.long |
| assert attention_mask.dtype == torch.bool |
|
|
|
|
| @pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") |
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_integration_with_device_processor(mock_auto_tokenizer): |
| """Test that TokenizerProcessorStep works correctly with DeviceProcessorStep in pipeline.""" |
| from lerobot.processor import DeviceProcessorStep |
|
|
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| |
| tokenizer_processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=6) |
| device_processor = DeviceProcessorStep(device="cuda:0") |
| robot_processor = DataProcessorPipeline( |
| [tokenizer_processor, device_processor], |
| to_transition=identity_transition, |
| to_output=identity_transition, |
| ) |
|
|
| |
| transition = create_transition( |
| observation={OBS_STATE: torch.randn(10)}, |
| action=torch.randn(5), |
| complementary_data={"task": "pipeline test"}, |
| ) |
|
|
| result = robot_processor(transition) |
|
|
| |
| assert result[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda" |
| assert result[TransitionKey.ACTION].device.type == "cuda" |
|
|
| |
| tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] |
| assert tokens.device.type == "cuda" |
| assert attention_mask.device.type == "cuda" |
|
|
|
|
| @pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") |
| @require_package("transformers") |
| def test_simulated_accelerate_scenario(): |
| """Test scenario simulating Accelerate with data already on GPU.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| |
| device = torch.device("cuda:0") |
| observation = { |
| OBS_STATE: torch.randn(1, 10).to(device), |
| OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device), |
| } |
| action = torch.randn(1, 5).to(device) |
|
|
| transition = create_transition( |
| observation=observation, |
| action=action, |
| complementary_data={"task": ["accelerate test"]}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"] |
| attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"] |
|
|
| assert tokens.device == device |
| assert attention_mask.device == device |
| |
| assert tokens.shape == (10,) |
| assert attention_mask.shape == (10,) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @require_package("transformers") |
| def test_get_subtask_missing_key(): |
| """Test get_subtask returns None when subtask key is missing from complementary_data.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task"}, |
| ) |
|
|
| result = processor.get_subtask(transition) |
| assert result is None |
|
|
|
|
| @require_package("transformers") |
| def test_get_subtask_none_value(): |
| """Test get_subtask returns None when subtask value is None.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": None}, |
| ) |
|
|
| result = processor.get_subtask(transition) |
| assert result is None |
|
|
|
|
| @require_package("transformers") |
| def test_get_subtask_none_complementary_data(): |
| """Test get_subtask returns None when complementary_data is None.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data=None, |
| ) |
|
|
| result = processor.get_subtask(transition) |
| assert result is None |
|
|
|
|
| @require_package("transformers") |
| def test_get_subtask_string(): |
| """Test get_subtask returns list with single string when subtask is a string.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": "pick up the cube"}, |
| ) |
|
|
| result = processor.get_subtask(transition) |
| assert result == ["pick up the cube"] |
| assert isinstance(result, list) |
| assert len(result) == 1 |
|
|
|
|
| @require_package("transformers") |
| def test_get_subtask_list_of_strings(): |
| """Test get_subtask returns the list when subtask is already a list of strings.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| subtask_list = ["pick up", "move to target", "place down"] |
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": subtask_list}, |
| ) |
|
|
| result = processor.get_subtask(transition) |
| assert result == subtask_list |
| assert isinstance(result, list) |
| assert len(result) == 3 |
|
|
|
|
| @require_package("transformers") |
| def test_get_subtask_unsupported_type_integer(): |
| """Test get_subtask returns None when subtask is an unsupported type (integer).""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": 123}, |
| ) |
|
|
| result = processor.get_subtask(transition) |
| assert result is None |
|
|
|
|
| @require_package("transformers") |
| def test_get_subtask_unsupported_type_mixed_list(): |
| """Test get_subtask returns None when subtask is a list with mixed types.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": ["valid string", 123, "another string"]}, |
| ) |
|
|
| result = processor.get_subtask(transition) |
| assert result is None |
|
|
|
|
| @require_package("transformers") |
| def test_get_subtask_unsupported_type_dict(): |
| """Test get_subtask returns None when subtask is a dictionary.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": {"key": "value"}}, |
| ) |
|
|
| result = processor.get_subtask(transition) |
| assert result is None |
|
|
|
|
| @require_package("transformers") |
| def test_get_subtask_empty_string(): |
| """Test get_subtask with empty string returns list with empty string.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": ""}, |
| ) |
|
|
| result = processor.get_subtask(transition) |
| assert result == [""] |
|
|
|
|
| @require_package("transformers") |
| def test_get_subtask_empty_list(): |
| """Test get_subtask with empty list returns empty list.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": []}, |
| ) |
|
|
| result = processor.get_subtask(transition) |
| assert result == [] |
|
|
|
|
| |
| |
| |
|
|
|
|
| @require_package("transformers") |
| def test_subtask_tokenization_when_present(): |
| """Test that subtask is tokenized and added to observation when present.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": "pick up the red cube"}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| observation = result[TransitionKey.OBSERVATION] |
| assert OBS_LANGUAGE_SUBTASK_TOKENS in observation |
| assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK in observation |
|
|
| |
| subtask_tokens = observation[OBS_LANGUAGE_SUBTASK_TOKENS] |
| subtask_attention_mask = observation[OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] |
| assert isinstance(subtask_tokens, torch.Tensor) |
| assert isinstance(subtask_attention_mask, torch.Tensor) |
| assert subtask_tokens.shape == (8,) |
| assert subtask_attention_mask.shape == (8,) |
| assert subtask_attention_mask.dtype == torch.bool |
|
|
|
|
| @require_package("transformers") |
| def test_subtask_tokenization_not_added_when_none(): |
| """Test that subtask tokens are NOT added to observation when subtask is None.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task"}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| observation = result[TransitionKey.OBSERVATION] |
| assert OBS_LANGUAGE_SUBTASK_TOKENS not in observation |
| assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK not in observation |
|
|
| |
| assert f"{OBS_LANGUAGE}.tokens" in observation |
| assert f"{OBS_LANGUAGE}.attention_mask" in observation |
|
|
|
|
| @require_package("transformers") |
| def test_subtask_tokenization_not_added_when_subtask_value_is_none(): |
| """Test that subtask tokens are NOT added when subtask value is explicitly None.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": None}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| observation = result[TransitionKey.OBSERVATION] |
| assert OBS_LANGUAGE_SUBTASK_TOKENS not in observation |
| assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK not in observation |
|
|
|
|
| @require_package("transformers") |
| def test_subtask_tokenization_list_of_strings(): |
| """Test subtask tokenization with list of strings.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": ["pick up", "place down"]}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| observation = result[TransitionKey.OBSERVATION] |
| assert OBS_LANGUAGE_SUBTASK_TOKENS in observation |
| assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK in observation |
|
|
| |
| subtask_tokens = observation[OBS_LANGUAGE_SUBTASK_TOKENS] |
| subtask_attention_mask = observation[OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] |
| assert subtask_tokens.shape == (2, 8) |
| assert subtask_attention_mask.shape == (2, 8) |
|
|
|
|
| @require_package("transformers") |
| def test_subtask_tokenization_device_cpu(): |
| """Test that subtask tokens are on CPU when other tensors are on CPU.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| |
| observation = {OBS_STATE: torch.randn(10)} |
| action = torch.randn(5) |
| transition = create_transition( |
| observation=observation, |
| action=action, |
| complementary_data={"task": "main task", "subtask": "pick up cube"}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| subtask_tokens = result[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS] |
| subtask_attention_mask = result[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] |
|
|
| assert subtask_tokens.device.type == "cpu" |
| assert subtask_attention_mask.device.type == "cpu" |
|
|
|
|
| @pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") |
| @require_package("transformers") |
| def test_subtask_tokenization_device_cuda(): |
| """Test that subtask tokens are moved to CUDA when other tensors are on CUDA.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| |
| observation = {OBS_STATE: torch.randn(10).cuda()} |
| action = torch.randn(5).cuda() |
| transition = create_transition( |
| observation=observation, |
| action=action, |
| complementary_data={"task": "main task", "subtask": "pick up cube"}, |
| ) |
|
|
| result = processor(transition) |
|
|
| |
| subtask_tokens = result[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS] |
| subtask_attention_mask = result[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] |
|
|
| assert subtask_tokens.device.type == "cuda" |
| assert subtask_attention_mask.device.type == "cuda" |
|
|
|
|
| @require_package("transformers") |
| def test_subtask_tokenization_preserves_other_observation_data(): |
| """Test that subtask tokenization preserves other observation data.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| original_state = torch.tensor([1.0, 2.0, 3.0]) |
| transition = create_transition( |
| observation={"state": original_state.clone()}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": "pick up cube"}, |
| ) |
|
|
| result = processor(transition) |
| observation = result[TransitionKey.OBSERVATION] |
|
|
| |
| assert torch.equal(observation["state"], original_state) |
|
|
| |
| assert f"{OBS_LANGUAGE}.tokens" in observation |
| assert f"{OBS_LANGUAGE}.attention_mask" in observation |
| assert OBS_LANGUAGE_SUBTASK_TOKENS in observation |
| assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK in observation |
|
|
|
|
| @require_package("transformers") |
| def test_subtask_attention_mask_dtype(): |
| """Test that subtask attention mask has correct dtype (bool).""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": "pick up cube"}, |
| ) |
|
|
| result = processor(transition) |
| observation = result[TransitionKey.OBSERVATION] |
|
|
| subtask_attention_mask = observation[OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] |
| assert subtask_attention_mask.dtype == torch.bool |
|
|
|
|
| @require_package("transformers") |
| def test_subtask_tokenization_deterministic(): |
| """Test that subtask tokenization is deterministic for the same input.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": "consistent subtask"}, |
| ) |
|
|
| result1 = processor(transition) |
| result2 = processor(transition) |
|
|
| subtask_tokens1 = result1[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS] |
| subtask_tokens2 = result2[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS] |
| subtask_mask1 = result1[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] |
| subtask_mask2 = result2[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] |
|
|
| |
| assert torch.equal(subtask_tokens1, subtask_tokens2) |
| assert torch.equal(subtask_mask1, subtask_mask2) |
|
|
|
|
| @require_package("transformers") |
| @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") |
| def test_subtask_tokenization_integration_with_pipeline(mock_auto_tokenizer): |
| """Test subtask tokenization works correctly with DataProcessorPipeline.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer |
|
|
| tokenizer_processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=6) |
| robot_processor = DataProcessorPipeline( |
| [tokenizer_processor], to_transition=identity_transition, to_output=identity_transition |
| ) |
|
|
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": "subtask instruction"}, |
| ) |
|
|
| result = robot_processor(transition) |
|
|
| |
| assert TransitionKey.OBSERVATION in result |
| observation = result[TransitionKey.OBSERVATION] |
|
|
| |
| assert f"{OBS_LANGUAGE}.tokens" in observation |
| assert f"{OBS_LANGUAGE}.attention_mask" in observation |
|
|
| |
| assert OBS_LANGUAGE_SUBTASK_TOKENS in observation |
| assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK in observation |
|
|
| |
| assert observation[f"{OBS_LANGUAGE}.tokens"].shape == (6,) |
| assert observation[OBS_LANGUAGE_SUBTASK_TOKENS].shape == (6,) |
|
|
|
|
| @require_package("transformers") |
| def test_subtask_not_added_for_unsupported_types(): |
| """Test that subtask tokens are not added when subtask has unsupported type.""" |
| mock_tokenizer = MockTokenizer(vocab_size=100) |
| processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8) |
|
|
| |
| transition = create_transition( |
| observation={"state": torch.tensor([1.0, 2.0])}, |
| action=torch.tensor([0.1, 0.2]), |
| complementary_data={"task": "main task", "subtask": 123}, |
| ) |
|
|
| result = processor(transition) |
| observation = result[TransitionKey.OBSERVATION] |
|
|
| |
| assert OBS_LANGUAGE_SUBTASK_TOKENS not in observation |
| assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK not in observation |
|
|
| |
| assert f"{OBS_LANGUAGE}.tokens" in observation |
|
|