|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Tests for build_step_data.""" |
|
|
|
import os |
|
|
|
from absl import flags |
|
import numpy as np |
|
from PIL import Image |
|
import tensorflow as tf |
|
|
|
from deeplab2.data import build_step_data |
|
|
|
FLAGS = flags.FLAGS |
|
|
|
|
|
class BuildStepDataTest(tf.test.TestCase): |
|
|
|
def setUp(self): |
|
super().setUp() |
|
self.data_dir = FLAGS.test_tmpdir |
|
self.height = 100 |
|
self.width = 100 |
|
self.sequence_id = '010' |
|
|
|
def _create_images(self, split): |
|
image_path = os.path.join(self.data_dir, build_step_data._IMAGE_FOLDER_NAME, |
|
split, self.sequence_id) |
|
panoptic_map_path = os.path.join(self.data_dir, |
|
build_step_data._PANOPTIC_MAP_FOLDER_NAME, |
|
split, self.sequence_id) |
|
|
|
tf.io.gfile.makedirs(image_path) |
|
tf.io.gfile.makedirs(panoptic_map_path) |
|
self.panoptic_maps = {} |
|
for image_id in [101, 100]: |
|
self.panoptic_maps[image_id] = self._create_image_and_panoptic_map( |
|
image_path, panoptic_map_path, image_id) |
|
|
|
def _create_image_and_panoptic_map(self, image_path, panoptic_path, image_id): |
|
"""Creates dummy images and panoptic maps.""" |
|
|
|
image = np.random.randint( |
|
0, 255, (self.height, self.width, 3), dtype=np.uint8) |
|
with tf.io.gfile.GFile( |
|
os.path.join(image_path, '%06d.png' % image_id), 'wb') as f: |
|
Image.fromarray(image).save(f, format='PNG') |
|
|
|
|
|
semantic = np.random.randint( |
|
0, 20, (self.height, self.width), dtype=np.int32) |
|
instance = np.random.randint( |
|
0, 1000, (self.height, self.width), dtype=np.int32) |
|
encoded_panoptic_map = np.dstack( |
|
(semantic, instance // 256, instance % 256)).astype(np.uint8) |
|
with tf.io.gfile.GFile( |
|
os.path.join(panoptic_path, '%06d.png' % image_id), 'wb') as f: |
|
Image.fromarray(encoded_panoptic_map).save(f, format='PNG') |
|
decoded_panoptic_map = semantic * 1000 + instance |
|
return decoded_panoptic_map |
|
|
|
def test_build_step_dataset_correct(self): |
|
split = 'train' |
|
self._create_images(split) |
|
build_step_data._convert_dataset( |
|
step_root=self.data_dir, |
|
dataset_split=split, |
|
output_dir=FLAGS.test_tmpdir) |
|
|
|
num_shards = 2 |
|
output_record = os.path.join( |
|
FLAGS.test_tmpdir, build_step_data._TF_RECORD_PATTERN % |
|
(split, 0, num_shards)) |
|
self.assertTrue(tf.io.gfile.exists(output_record)) |
|
|
|
|
|
image_ids = sorted(self.panoptic_maps) |
|
for i, raw_record in enumerate( |
|
tf.data.TFRecordDataset([output_record]).take(5)): |
|
image_id = image_ids[i] |
|
example = tf.train.Example.FromString(raw_record.numpy()) |
|
panoptic_map = np.fromstring( |
|
example.features.feature['image/segmentation/class/encoded'] |
|
.bytes_list.value[0], |
|
dtype=np.int32).reshape((self.height, self.width)) |
|
np.testing.assert_array_equal(panoptic_map, self.panoptic_maps[image_id]) |
|
self.assertEqual( |
|
example.features.feature['video/sequence_id'].bytes_list.value[0], |
|
b'010') |
|
self.assertEqual( |
|
example.features.feature['video/frame_id'].bytes_list.value[0], |
|
b'%06d' % image_id) |
|
|
|
def test_build_step_dataset_correct_with_two_frames(self): |
|
split = 'train' |
|
self._create_images(split) |
|
build_step_data._convert_dataset( |
|
step_root=self.data_dir, |
|
dataset_split=split, |
|
output_dir=FLAGS.test_tmpdir, use_two_frames=True) |
|
num_shards = 2 |
|
output_record = os.path.join( |
|
FLAGS.test_tmpdir, build_step_data._TF_RECORD_PATTERN % |
|
(split, 0, num_shards)) |
|
self.assertTrue(tf.io.gfile.exists(output_record)) |
|
|
|
|
|
image_ids = sorted(self.panoptic_maps) |
|
for i, raw_record in enumerate( |
|
tf.data.TFRecordDataset([output_record]).take(5)): |
|
image_id = image_ids[i] |
|
example = tf.train.Example.FromString(raw_record.numpy()) |
|
panoptic_map = np.fromstring( |
|
example.features.feature['image/segmentation/class/encoded'] |
|
.bytes_list.value[0], |
|
dtype=np.int32).reshape((self.height, self.width)) |
|
np.testing.assert_array_equal(panoptic_map, self.panoptic_maps[image_id]) |
|
prev_panoptic_map = np.fromstring( |
|
example.features.feature['prev_image/segmentation/class/encoded'] |
|
.bytes_list.value[0], |
|
dtype=np.int32).reshape((self.height, self.width)) |
|
if i == 0: |
|
|
|
np.testing.assert_array_equal(panoptic_map, prev_panoptic_map) |
|
else: |
|
|
|
np.testing.assert_array_equal(prev_panoptic_map, self.panoptic_maps[0]) |
|
self.assertEqual( |
|
example.features.feature['video/sequence_id'].bytes_list.value[0], |
|
b'010') |
|
self.assertEqual( |
|
example.features.feature['video/frame_id'].bytes_list.value[0], |
|
b'%06d' % image_id) |
|
|
|
def test_build_step_dataset_with_two_frames_shared_by_sequence(self): |
|
split = 'val' |
|
self._create_images(split) |
|
build_step_data._convert_dataset( |
|
step_root=self.data_dir, |
|
dataset_split=split, |
|
output_dir=FLAGS.test_tmpdir, use_two_frames=True) |
|
|
|
num_shards = 1 |
|
output_record = os.path.join( |
|
FLAGS.test_tmpdir, build_step_data._TF_RECORD_PATTERN % |
|
(split, 0, num_shards)) |
|
self.assertTrue(tf.io.gfile.exists(output_record)) |
|
|
|
|
|
if __name__ == '__main__': |
|
tf.test.main() |
|
|