deeplab2 / data /build_step_data_test.py
akhaliq3
spaces demo
506da10
# coding=utf-8
# Copyright 2021 The Deeplab2 Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for build_step_data."""
import os
from absl import flags
import numpy as np
from PIL import Image
import tensorflow as tf
from deeplab2.data import build_step_data
FLAGS = flags.FLAGS
class BuildStepDataTest(tf.test.TestCase):
def setUp(self):
super().setUp()
self.data_dir = FLAGS.test_tmpdir
self.height = 100
self.width = 100
self.sequence_id = '010'
def _create_images(self, split):
image_path = os.path.join(self.data_dir, build_step_data._IMAGE_FOLDER_NAME,
split, self.sequence_id)
panoptic_map_path = os.path.join(self.data_dir,
build_step_data._PANOPTIC_MAP_FOLDER_NAME,
split, self.sequence_id)
tf.io.gfile.makedirs(image_path)
tf.io.gfile.makedirs(panoptic_map_path)
self.panoptic_maps = {}
for image_id in [101, 100]:
self.panoptic_maps[image_id] = self._create_image_and_panoptic_map(
image_path, panoptic_map_path, image_id)
def _create_image_and_panoptic_map(self, image_path, panoptic_path, image_id):
"""Creates dummy images and panoptic maps."""
# Dummy image.
image = np.random.randint(
0, 255, (self.height, self.width, 3), dtype=np.uint8)
with tf.io.gfile.GFile(
os.path.join(image_path, '%06d.png' % image_id), 'wb') as f:
Image.fromarray(image).save(f, format='PNG')
# Dummy panoptic map.
semantic = np.random.randint(
0, 20, (self.height, self.width), dtype=np.int32)
instance = np.random.randint(
0, 1000, (self.height, self.width), dtype=np.int32)
encoded_panoptic_map = np.dstack(
(semantic, instance // 256, instance % 256)).astype(np.uint8)
with tf.io.gfile.GFile(
os.path.join(panoptic_path, '%06d.png' % image_id), 'wb') as f:
Image.fromarray(encoded_panoptic_map).save(f, format='PNG')
decoded_panoptic_map = semantic * 1000 + instance
return decoded_panoptic_map
def test_build_step_dataset_correct(self):
split = 'train'
self._create_images(split)
build_step_data._convert_dataset(
step_root=self.data_dir,
dataset_split=split,
output_dir=FLAGS.test_tmpdir)
# We will have 2 shards with each shard containing 1 image.
num_shards = 2
output_record = os.path.join(
FLAGS.test_tmpdir, build_step_data._TF_RECORD_PATTERN %
(split, 0, num_shards))
self.assertTrue(tf.io.gfile.exists(output_record))
# Parses tf record.
image_ids = sorted(self.panoptic_maps)
for i, raw_record in enumerate(
tf.data.TFRecordDataset([output_record]).take(5)):
image_id = image_ids[i]
example = tf.train.Example.FromString(raw_record.numpy())
panoptic_map = np.fromstring(
example.features.feature['image/segmentation/class/encoded']
.bytes_list.value[0],
dtype=np.int32).reshape((self.height, self.width))
np.testing.assert_array_equal(panoptic_map, self.panoptic_maps[image_id])
self.assertEqual(
example.features.feature['video/sequence_id'].bytes_list.value[0],
b'010')
self.assertEqual(
example.features.feature['video/frame_id'].bytes_list.value[0],
b'%06d' % image_id)
def test_build_step_dataset_correct_with_two_frames(self):
split = 'train'
self._create_images(split)
build_step_data._convert_dataset(
step_root=self.data_dir,
dataset_split=split,
output_dir=FLAGS.test_tmpdir, use_two_frames=True)
num_shards = 2
output_record = os.path.join(
FLAGS.test_tmpdir, build_step_data._TF_RECORD_PATTERN %
(split, 0, num_shards))
self.assertTrue(tf.io.gfile.exists(output_record))
# Parses tf record.
image_ids = sorted(self.panoptic_maps)
for i, raw_record in enumerate(
tf.data.TFRecordDataset([output_record]).take(5)):
image_id = image_ids[i]
example = tf.train.Example.FromString(raw_record.numpy())
panoptic_map = np.fromstring(
example.features.feature['image/segmentation/class/encoded']
.bytes_list.value[0],
dtype=np.int32).reshape((self.height, self.width))
np.testing.assert_array_equal(panoptic_map, self.panoptic_maps[image_id])
prev_panoptic_map = np.fromstring(
example.features.feature['prev_image/segmentation/class/encoded']
.bytes_list.value[0],
dtype=np.int32).reshape((self.height, self.width))
if i == 0:
# First frame.
np.testing.assert_array_equal(panoptic_map, prev_panoptic_map)
else:
# Not a first frame.
np.testing.assert_array_equal(prev_panoptic_map, self.panoptic_maps[0])
self.assertEqual(
example.features.feature['video/sequence_id'].bytes_list.value[0],
b'010')
self.assertEqual(
example.features.feature['video/frame_id'].bytes_list.value[0],
b'%06d' % image_id)
def test_build_step_dataset_with_two_frames_shared_by_sequence(self):
split = 'val'
self._create_images(split)
build_step_data._convert_dataset(
step_root=self.data_dir,
dataset_split=split,
output_dir=FLAGS.test_tmpdir, use_two_frames=True)
# Only one shard since there is only one sequence for the val set.
num_shards = 1
output_record = os.path.join(
FLAGS.test_tmpdir, build_step_data._TF_RECORD_PATTERN %
(split, 0, num_shards))
self.assertTrue(tf.io.gfile.exists(output_record))
if __name__ == '__main__':
tf.test.main()