File size: 16,648 Bytes
b7d9967
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# This code is part of Qiskit.
#
# (C) Copyright IBM 2020, 2023.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""CVaRMeasurement class."""


from typing import Callable, Optional, Tuple, Union, cast, Dict

import numpy as np

from qiskit.circuit import ParameterExpression
from qiskit.opflow.exceptions import OpflowError
from qiskit.opflow.list_ops import ListOp, SummedOp, TensoredOp
from qiskit.opflow.operator_base import OperatorBase
from qiskit.opflow.primitive_ops import PauliOp, PauliSumOp
from qiskit.opflow.state_fns.circuit_state_fn import CircuitStateFn
from qiskit.opflow.state_fns.dict_state_fn import DictStateFn
from qiskit.opflow.state_fns.operator_state_fn import OperatorStateFn
from qiskit.opflow.state_fns.state_fn import StateFn
from qiskit.opflow.state_fns.vector_state_fn import VectorStateFn
from qiskit.quantum_info import Statevector
from qiskit.utils.deprecation import deprecate_func


class CVaRMeasurement(OperatorStateFn):
    r"""Deprecated: A specialized measurement class to compute CVaR expectation values.

        See https://arxiv.org/pdf/1907.04769.pdf for further details.



    Used in :class:`~qiskit.opflow.CVaRExpectation`, see there for more details.

    """

    primitive: OperatorBase

    # TODO allow normalization somehow?
    @deprecate_func(

        since="0.24.0",

        additional_msg="For code migration guidelines, visit https://qisk.it/opflow_migration.",

    )
    def __init__(

        self,

        primitive: OperatorBase = None,

        alpha: float = 1.0,

        coeff: Union[complex, ParameterExpression] = 1.0,

    ) -> None:
        """

        Args:

            primitive: The ``OperatorBase`` which defines the diagonal operator

                       measurement.

            coeff: A coefficient by which to multiply the state function

            alpha: A real-valued parameter between 0 and 1 which specifies the

                   fraction of observed samples to include when computing the

                   objective value. alpha = 1 corresponds to a standard observable

                   expectation value. alpha = 0 corresponds to only using the single

                   sample with the lowest energy. alpha = 0.5 corresponds to ranking each

                   observation by lowest energy and using the best



        Raises:

            ValueError: TODO remove that this raises an error

            ValueError: If alpha is not in [0, 1].

            OpflowError: If the primitive is not diagonal.

        """
        if primitive is None:
            raise ValueError

        if not 0 <= alpha <= 1:
            raise ValueError("The parameter alpha must be in [0, 1].")
        self._alpha = alpha

        if not _check_is_diagonal(primitive):
            raise OpflowError(
                "Input operator to CVaRMeasurement must be diagonal, but is not:", str(primitive)
            )

        super().__init__(primitive, coeff=coeff, is_measurement=True)

    @property
    def alpha(self) -> float:
        """A real-valued parameter between 0 and 1 which specifies the

           fraction of observed samples to include when computing the

           objective value. alpha = 1 corresponds to a standard observable

           expectation value. alpha = 0 corresponds to only using the single

           sample with the lowest energy. alpha = 0.5 corresponds to ranking each

           observation by lowest energy and using the best half.



        Returns:

            The parameter alpha which was given at initialization

        """
        return self._alpha

    @property
    def settings(self) -> Dict:
        """Return settings."""
        return {"primitive": self._primitive, "coeff": self._coeff, "alpha": self._alpha}

    def add(self, other: OperatorBase) -> SummedOp:
        return SummedOp([self, other])

    def adjoint(self):
        """The adjoint of a CVaRMeasurement is not defined.



        Returns:

            Does not return anything, raises an error.



        Raises:

            OpflowError: The adjoint of a CVaRMeasurement is not defined.

        """
        raise OpflowError("Adjoint of a CVaR measurement not defined")

    def mul(self, scalar: Union[complex, ParameterExpression]) -> "CVaRMeasurement":
        if not isinstance(scalar, (int, float, complex, ParameterExpression)):
            raise ValueError(
                "Operators can only be scalar multiplied by float or complex, not "
                "{} of type {}.".format(scalar, type(scalar))
            )

        return self.__class__(self.primitive, coeff=self.coeff * scalar, alpha=self._alpha)

    def tensor(self, other: OperatorBase) -> Union["OperatorStateFn", TensoredOp]:
        if isinstance(other, OperatorStateFn):
            return OperatorStateFn(
                self.primitive.tensor(other.primitive), coeff=self.coeff * other.coeff
            )
        return TensoredOp([self, other])

    def to_density_matrix(self, massive: bool = False):
        """Not defined."""
        raise NotImplementedError

    def to_matrix_op(self, massive: bool = False):
        """Not defined."""
        raise NotImplementedError

    def to_matrix(self, massive: bool = False):
        """Not defined."""
        raise NotImplementedError

    def to_circuit_op(self):
        """Not defined."""
        raise NotImplementedError

    def __str__(self) -> str:
        return f"CVaRMeasurement({str(self.primitive)}) * {self.coeff}"

    def eval(

        self, front: Union[str, dict, np.ndarray, OperatorBase, Statevector] = None

    ) -> complex:
        r"""

        Given the energies of each sampled measurement outcome (H_i) as well as the

        sampling probability of each measurement outcome (p_i, we can compute the

        CVaR as H_j + 1/α*(sum_i<j p_i*(H_i - H_j)). Note that index j corresponds

        to the measurement outcome such that only some of the samples with

        measurement outcome j will be used in computing CVaR. Note also that the

        sampling probabilities serve as an alternative to knowing the counts of each

        observation.



        This computation is broken up into two subroutines. One which evaluates each

        measurement outcome and determines the sampling probabilities of each. And one

        which carries out the above calculation. The computation is split up this way

        to enable a straightforward calculation of the variance of this estimator.



        Args:

            front: A StateFn or primitive which specifies the results of evaluating

                      a quantum state.



        Returns:

            The CVaR of the diagonal observable specified by self.primitive and

                the sampled quantum state described by the inputs

                (energies, probabilities). For index j (described above), the CVaR

                is computed as H_j + 1/α*(sum_i<j p_i*(H_i - H_j))

        """
        energies, probabilities = self.get_outcome_energies_probabilities(front)
        return self.compute_cvar(energies, probabilities)

    def eval_variance(

        self, front: Optional[Union[str, dict, np.ndarray, OperatorBase]] = None

    ) -> complex:
        r"""

        Given the energies of each sampled measurement outcome (H_i) as well as the

        sampling probability of each measurement outcome (p_i, we can compute the

        variance of the CVaR estimator as

        H_j^2 + 1/α * (sum_i<j p_i*(H_i^2 - H_j^2)).

        This follows from the definition that Var[X] = E[X^2] - E[X]^2.

        In this case, X = E[<bi|H|bi>], where H is the diagonal observable and bi

        corresponds to measurement outcome i. Given this, E[X^2] = E[<bi|H|bi>^2]



        Args:

            front: A StateFn or primitive which specifies the results of evaluating

                      a quantum state.



        Returns:

            The Var[CVaR] of the diagonal observable specified by self.primitive

                and the sampled quantum state described by the inputs

                (energies, probabilities). For index j (described above), the CVaR

                is computed as H_j^2 + 1/α*(sum_i<j p_i*(H_i^2 - H_j^2))

        """
        energies, probabilities = self.get_outcome_energies_probabilities(front)
        sq_energies = [energy**2 for energy in energies]
        return self.compute_cvar(sq_energies, probabilities) - self.eval(front) ** 2

    def get_outcome_energies_probabilities(

        self, front: Optional[Union[str, dict, np.ndarray, OperatorBase, Statevector]] = None

    ) -> Tuple[list, list]:
        r"""

        In order to compute the  CVaR of an observable expectation, we require

        the energies of each sampled measurement outcome as well as the sampling

        probability of each measurement outcome. Note that the counts for each

        measurement outcome will also suffice (and this is often how the CVaR

        is presented).



        Args:

            front: A StateFn or a primitive which defines a StateFn.

                   This input holds the results of a sampled/simulated circuit.



        Returns:

            Two lists of equal length. `energies` contains the energy of each

                unique measurement outcome computed against the diagonal observable

                stored in self.primitive. `probabilities` contains the corresponding

                sampling probability for each measurement outcome in `energies`.



        Raises:

            ValueError: front isn't a DictStateFn or VectorStateFn

        """
        if isinstance(front, CircuitStateFn):
            front = cast(StateFn, front.eval())

        # Standardize the inputs to a dict
        if isinstance(front, DictStateFn):
            data = front.primitive
        elif isinstance(front, VectorStateFn):
            vec = front.primitive.data
            # Determine how many bits are needed
            key_len = int(np.ceil(np.log2(len(vec))))
            # Convert the vector primitive into a dict. The formatting here ensures
            # that the proper number of leading `0` characters are added.
            data = {format(index, "0" + str(key_len) + "b"): val for index, val in enumerate(vec)}
        else:
            raise ValueError("Unsupported input to CVaRMeasurement.eval:", type(front))

        obs = self.primitive
        outcomes = list(data.items())
        # add energy evaluation
        for i, outcome in enumerate(outcomes):
            key = outcome[0]
            outcomes[i] += (obs.eval(key).adjoint().eval(key),)  # type: ignore

        # Sort each observation based on it's energy
        outcomes = sorted(outcomes, key=lambda x: x[2])  # type: ignore

        # Here probabilities are the (root) probabilities of
        # observing each state. energies are the expectation
        # values of each state with the provided Hamiltonian.
        _, root_probabilities, energies = zip(*outcomes)

        # Square the dict values
        # (since CircuitSampler takes the root...)
        probabilities = [p_i * np.conj(p_i) for p_i in root_probabilities]
        return list(energies), probabilities

    def compute_cvar(self, energies: list, probabilities: list) -> complex:
        r"""

        Given the energies of each sampled measurement outcome (H_i) as well as the

        sampling probability of each measurement outcome (p_i, we can compute the

        CVaR. Note that the sampling probabilities serve as an alternative to knowing

        the counts of each observation and that the input energies are assumed to be

        sorted in increasing order.



        Consider the outcome with index j, such that only some of the samples with

        measurement outcome j will be used in computing CVaR. The CVaR calculation

        can then be separated into two parts. First we sum each of the energies for

        outcomes i < j, weighted by the probability of observing that outcome (i.e

        the normalized counts). Second, we add the energy for outcome j, weighted by

        the difference (α  - \sum_i<j p_i)



        Args:

            energies: A list containing the energies (H_i) of each sample measurement

                      outcome, sorted in increasing order.

            probabilities: The sampling probabilities (p_i) for each corresponding

                           measurement outcome.



        Returns:

            The CVaR of the diagonal observable specified by self.primitive and

                the sampled quantum state described by the inputs

                (energies, probabilities). For index j (described above), the CVaR

                is computed as H_j + 1/α * (sum_i<j p_i*(H_i - H_j))



        Raises:

            ValueError: front isn't a DictStateFn or VectorStateFn

        """
        alpha = self._alpha

        # Determine j, the index of the measurement outcome such
        # that only some samples with this outcome will be used to
        # compute the CVaR.
        j = 0
        running_total = 0
        for i, p_i in enumerate(probabilities):
            running_total += p_i
            j = i
            if running_total > alpha:
                break

        h_j = energies[j]
        cvar = alpha * h_j

        if alpha == 0 or j == 0:
            return self.coeff * h_j

        energies = energies[:j]
        probabilities = probabilities[:j]
        # Let H_i be the energy associated with outcome i
        # and let the outcomes be sorted by ascending energy.
        # Let p_i be the probability of observing outcome i.
        # CVaR = H_j + 1/α*(sum_i<j p_i*(H_i - H_j))
        for h_i, p_i in zip(energies, probabilities):
            cvar += p_i * (h_i - h_j)

        return self.coeff * cvar / alpha

    def traverse(

        self, convert_fn: Callable, coeff: Optional[Union[complex, ParameterExpression]] = None

    ) -> OperatorBase:
        r"""

        Apply the convert_fn to the internal primitive if the primitive is an Operator (as in

        the case of ``OperatorStateFn``). Otherwise do nothing. Used by converters.



        Args:

            convert_fn: The function to apply to the internal OperatorBase.

            coeff: A coefficient to multiply by after applying convert_fn.

                If it is None, self.coeff is used instead.



        Returns:

            The converted StateFn.

        """
        if coeff is None:
            coeff = self.coeff

        if isinstance(self.primitive, OperatorBase):
            return self.__class__(convert_fn(self.primitive), coeff=coeff, alpha=self._alpha)
        return self

    def sample(self, shots: int = 1024, massive: bool = False, reverse_endianness: bool = False):
        raise NotImplementedError


def _check_is_diagonal(operator: OperatorBase) -> bool:
    """Check whether ``operator`` is diagonal.



    Args:

        operator: The operator to check for diagonality.



    Returns:

        True, if the operator is diagonal, False otherwise.



    Raises:

        OpflowError: If the operator is not diagonal.

    """
    if isinstance(operator, PauliOp):
        # every X component must be False
        return not np.any(operator.primitive.x)

    # For sums (PauliSumOp and SummedOp), we cover the case of sums of diagonal paulis, but don't
    # raise since there might be summand canceling the non-diagonal parts. That case is checked
    # in the inefficient matrix check at the bottom.
    if isinstance(operator, PauliSumOp):
        if not np.any(operator.primitive.paulis.x):
            return True

    elif isinstance(operator, SummedOp):
        if all(isinstance(op, PauliOp) and not np.any(op.primitive.x) for op in operator.oplist):
            return True

    elif isinstance(operator, ListOp):
        return all(operator.traverse(_check_is_diagonal))

    # cannot efficiently check if a operator is diagonal, converting to matrix
    matrix = operator.to_matrix()
    return np.all(matrix == np.diag(np.diagonal(matrix)))