File size: 4,668 Bytes
fcd8cdd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# Copyright (c) Facebook, Inc. and its affiliates.
import json
import os
import tempfile
import unittest
from detectron2.utils.events import (
CommonMetricPrinter,
EventStorage,
JSONWriter,
get_event_storage,
has_event_storage,
)
class TestEventWriter(unittest.TestCase):
def testScalar(self):
with tempfile.TemporaryDirectory(
prefix="detectron2_tests"
) as dir, EventStorage() as storage:
json_file = os.path.join(dir, "test.json")
writer = JSONWriter(json_file)
for k in range(60):
storage.put_scalar("key", k, smoothing_hint=False)
if (k + 1) % 20 == 0:
writer.write()
storage.step()
writer.close()
with open(json_file) as f:
data = [json.loads(l) for l in f]
self.assertTrue([int(k["key"]) for k in data] == [19, 39, 59])
def testScalarMismatchedPeriod(self):
with tempfile.TemporaryDirectory(
prefix="detectron2_tests"
) as dir, EventStorage() as storage:
json_file = os.path.join(dir, "test.json")
writer = JSONWriter(json_file)
for k in range(60):
if k % 17 == 0: # write in a differnt period
storage.put_scalar("key2", k, smoothing_hint=False)
storage.put_scalar("key", k, smoothing_hint=False)
if (k + 1) % 20 == 0:
writer.write()
storage.step()
writer.close()
with open(json_file) as f:
data = [json.loads(l) for l in f]
self.assertTrue([int(k.get("key2", 0)) for k in data] == [17, 0, 34, 0, 51, 0])
self.assertTrue([int(k.get("key", 0)) for k in data] == [0, 19, 0, 39, 0, 59])
self.assertTrue([int(k["iteration"]) for k in data] == [17, 19, 34, 39, 51, 59])
def testPrintETA(self):
with EventStorage() as s:
p1 = CommonMetricPrinter(10)
p2 = CommonMetricPrinter()
s.put_scalar("time", 1.0)
s.step()
s.put_scalar("time", 1.0)
s.step()
with self.assertLogs("detectron2.utils.events") as logs:
p1.write()
self.assertIn("eta", logs.output[0])
with self.assertLogs("detectron2.utils.events") as logs:
p2.write()
self.assertNotIn("eta", logs.output[0])
def testPrintNonLosses(self):
with EventStorage() as s:
p1 = CommonMetricPrinter(10)
p2 = CommonMetricPrinter()
s.put_scalar("time", 1.0)
s.put_scalar("[metric]bn_stat", 1.0)
s.step()
s.put_scalar("time", 1.0)
s.put_scalar("[metric]bn_stat", 1.0)
s.step()
with self.assertLogs("detectron2.utils.events") as logs:
p1.write()
self.assertIn("[metric]bn_stat", logs.output[0])
with self.assertLogs("detectron2.utils.events") as logs:
p2.write()
self.assertIn("[metric]bn_stat", logs.output[0])
def testSmoothingWithWindowSize(self):
with tempfile.TemporaryDirectory(
prefix="detectron2_tests"
) as dir, EventStorage() as storage:
json_file = os.path.join(dir, "test.json")
writer = JSONWriter(json_file, window_size=10)
for k in range(20):
storage.put_scalar("key1", k, smoothing_hint=True)
if (k + 1) % 2 == 0:
storage.put_scalar("key2", k, smoothing_hint=True)
if (k + 1) % 5 == 0:
storage.put_scalar("key3", k, smoothing_hint=True)
if (k + 1) % 10 == 0:
writer.write()
storage.step()
num_samples = {k: storage.count_samples(k, 10) for k in ["key1", "key2", "key3"]}
self.assertEqual(num_samples, {"key1": 10, "key2": 5, "key3": 2})
writer.close()
with open(json_file) as f:
data = [json.loads(l) for l in f]
self.assertEqual([k["key1"] for k in data], [4.5, 14.5])
self.assertEqual([k["key2"] for k in data], [5, 15])
self.assertEqual([k["key3"] for k in data], [6.5, 16.5])
def testEventStorage(self):
self.assertFalse(has_event_storage())
with EventStorage() as storage:
self.assertTrue(has_event_storage())
self.assertEqual(storage, get_event_storage())
self.assertFalse(has_event_storage())
|