| |
|
| |
|
| |
|
| |
|
| | from caffe2.python import workspace, crf, brew |
| | from caffe2.python.model_helper import ModelHelper |
| | import numpy as np |
| | from scipy.special import logsumexp |
| | import caffe2.python.hypothesis_test_util as hu |
| | import hypothesis.strategies as st |
| | from hypothesis import given, settings |
| |
|
| |
|
| | class TestCRFOp(hu.HypothesisTestCase): |
| |
|
| | @given(num_tags=st.integers(2, 4), |
| | num_words=st.integers(2, 15)) |
| | @settings(deadline=10000) |
| | def test_crf_with_loss_op(self, num_tags, num_words): |
| | model = ModelHelper(name='external') |
| | embeddings_dim = 200 |
| | embeddings = np.random.randn(num_words, embeddings_dim).astype(np.float32) |
| | transitions = np.random.uniform( |
| | low=-1, high=1, size=(num_tags + 2, num_tags + 2) |
| | ).astype(np.float32) |
| | labels = np.random.randint(num_tags, size=(num_words)).astype(np.int64) |
| | embeddings_blob, labels_blob, transitions_blob = ( |
| | model.net.AddExternalInputs( |
| | 'embeddings_blob', |
| | 'labels_blob', |
| | 'crf_transitions') |
| | ) |
| | workspace.FeedBlob(str(embeddings_blob), embeddings) |
| | workspace.FeedBlob(str(labels_blob), labels) |
| | workspace.FeedBlob(str(transitions_blob), transitions) |
| | predictions_blob = brew.fc( |
| | model, |
| | embeddings_blob, "fc_0", |
| | embeddings_dim, num_tags, |
| | ('UniformFill', {'min': -1.0}, {'max': 1.0}), |
| | ('UniformFill', {'min': -1.0}, {'max': 1.0}) |
| | ) |
| | crf_layer = crf.CRFWithLoss(model, num_tags, transitions_blob) |
| | crf_loss = crf_layer.crf_loss(predictions_blob, labels_blob) |
| | model.net.AddGradientOperators([crf_loss]) |
| | workspace.RunNetOnce(model.param_init_net) |
| | workspace.RunNetOnce(model.net) |
| | loss = workspace.FetchBlob(str(crf_loss)) |
| | predictions = workspace.FetchBlob(str(predictions_blob)) |
| | np.testing.assert_allclose( |
| | loss, |
| | self._compute_loss_manual( |
| | predictions, num_tags, labels, transitions |
| | ), |
| | atol=0.001, |
| | rtol=0.001, |
| | err_msg='CRF LOSS is not matching the reference' |
| | ) |
| |
|
| | @given(num_tags=st.integers(1, 4), |
| | num_words=st.integers(2, 4)) |
| | @settings(deadline=10000) |
| | def test_crf_gradient(self, num_tags, num_words): |
| | base_model = ModelHelper(name='base_model') |
| | transitions = np.random.randn( |
| | num_tags + 2, num_tags + 2 |
| | ).astype(np.float32) |
| | predictions = np.random.randn(num_words, 1, num_tags + 2).astype(np.float32) |
| | initial = np.random.randn(1, num_tags + 2).astype(np.float32) |
| | predictions_blob, transitions_blob, initial_blob = ( |
| | base_model.net.AddExternalInputs( |
| | 'predictions_blob', 'crf_transitions', 'inital_blob' |
| | ) |
| | ) |
| |
|
| | workspace.FeedBlob(str(predictions_blob), predictions) |
| | workspace.FeedBlob(str(transitions_blob), transitions) |
| | workspace.FeedBlob(str(initial_blob), initial) |
| |
|
| | crf_layer = crf.CRFWithLoss(base_model, num_tags, transitions_blob) |
| | crf_layer.build_crf_net( |
| | predictions_blob, initial_blob, transitions_blob |
| | ) |
| | op = base_model.net._net.op[-1] |
| | workspace.RunNetOnce(base_model.param_init_net) |
| | gradients_to_check = ( |
| | index for (index, input_name) in enumerate(op.input) |
| | if input_name != "crf_net/zero_segment_id" |
| | ) |
| |
|
| | inputs = [workspace.FetchBlob(name) for name in op.input] |
| | for param in gradients_to_check: |
| | self.assertGradientChecks( |
| | device_option=hu.cpu_do, |
| | op=op, |
| | inputs=inputs, |
| | outputs_to_check=param, |
| | outputs_with_grads=[1], |
| | threshold=0.05, |
| | stepsize=0.001, |
| | ) |
| |
|
| | def _compute_loss_manual(self, predictions, num_tags, labels, transitions): |
| | low_score = -1000 |
| | b_s = np.array( |
| | [[low_score] * num_tags + [0, low_score]] |
| | ).astype(np.float32) |
| | e_s = np.array( |
| | [[low_score] * num_tags + [low_score, 0]] |
| | ).astype(np.float32) |
| | predictions = np.concatenate( |
| | [predictions, low_score * np.ones((predictions.shape[0], 2))], |
| | axis=1 |
| | ) |
| | predictions = np.concatenate( |
| | [b_s, predictions, e_s], |
| | axis=0 |
| | ) |
| | b_id = np.array([num_tags], dtype=np.int32) |
| | e_id = np.array([num_tags + 1], dtype=np.int32) |
| | labels = np.concatenate( |
| | [b_id, labels, e_id], |
| | axis=0 |
| | ) |
| | curr_state = predictions[0] |
| | input_states = predictions[1:] |
| |
|
| | for input_state in input_states: |
| | prev = np.expand_dims(curr_state, axis=1) |
| | curr_input = np.expand_dims(input_state, axis=0) |
| | curr_state = logsumexp(prev + curr_input + transitions, axis=0) |
| |
|
| | total_score = logsumexp(curr_state, axis=0) |
| | |
| | unary_scores = sum(w[labels[i]] for i, w in enumerate(predictions)) |
| | binary_scores = sum( |
| | transitions[a][b] for a, b in zip(labels[:-1], labels[1:]) |
| | ) |
| | loss = total_score - (binary_scores + unary_scores) |
| | return loss |
| |
|