Expectation value class
from __future__ import annotations
import copy
import typing
from collections.abc import Sequence
from itertools import accumulate
import numpy as np
from qiskit.circuit import QuantumCircuit
from qiskit.compiler import transpile
from qiskit.providers import BackendV1, BackendV2, Options
from qiskit.quantum_info import Pauli, PauliList
from qiskit.quantum_info.operators.base_operator import BaseOperator
from qiskit.result import Counts, Result
from qiskit.transpiler import PassManager
from .base import BaseEstimator, EstimatorResult
from .primitive_job import PrimitiveJob
from .utils import _circuit_key, _observable_key, init_observable
if typing.TYPE_CHECKING:
from qiskit.opflow import PauliSumOp
def _run_circuits(
circuits: QuantumCircuit | list[QuantumCircuit],
backend: BackendV1 | BackendV2,
) -> tuple[list[Result], list[dict]]:
"""Remove metadata of circuits and run the circuits on a backend.
circuits: The circuits
backend: The backend
monitor: Enable job minotor if True
**run_options: run_options
The result and the metadata of the circuits
if isinstance(circuits, QuantumCircuit):
circuits = [circuits]
metadata = []
for circ in circuits:
circ.metadata = {}
if isinstance(backend, BackendV1):
max_circuits = getattr(backend.configuration(), "max_experiments", None)
elif isinstance(backend, BackendV2):
max_circuits = backend.max_circuits
if max_circuits:
jobs = [
backend.run(circuits[pos : pos + max_circuits], **run_options)
for pos in range(0, len(circuits), max_circuits)
result = [x.result() for x in jobs]
result = [backend.run(circuits, **run_options).result()]
return result, metadata
def _prepare_counts(results: list[Result]):
counts = []
for res in results:
count = res.get_counts()
if not isinstance(count, list):
count = [count]
return counts
class BackendEstimator(BaseEstimator[PrimitiveJob[EstimatorResult]]):
"""Evaluates expectation value using Pauli rotation gates.
The :class:`~.BackendEstimator` class is a generic implementation of the
:class:`~.BaseEstimator` interface that is used to wrap a :class:`~.BackendV2`
(or :class:`~.BackendV1`) object in the :class:`~.BaseEstimator` API. It
facilitates using backends that do not provide a native
:class:`~.BaseEstimator` implementation in places that work with
:class:`~.BaseEstimator`, such as algorithms in :mod:`qiskit.algorithms`
including :class:`~.qiskit.algorithms.minimum_eigensolvers.VQE`. However,
if you're using a provider that has a native implementation of
:class:`~.BaseEstimator`, it is a better choice to leverage that native
implementation as it will likely include additional optimizations and be
a more efficient implementation. The generic nature of this class
precludes doing any provider- or backend-specific optimizations.
def __init__(
backend: BackendV1 | BackendV2,
options: dict | None = None,
abelian_grouping: bool = True,
bound_pass_manager: PassManager | None = None,
skip_transpilation: bool = False,
"""Initalize a new BackendEstimator instance
backend: Required: the backend to run the primitive on
options: Default options.
abelian_grouping: Whether the observable should be grouped into
bound_pass_manager: An optional pass manager to run after
parameter binding.
skip_transpilation: If this is set to True the internal compilation
of the input circuits is skipped and the circuit objects
will be directly executed when this object is called.
self._abelian_grouping = abelian_grouping
self._backend = backend
self._transpile_options = Options()
self._bound_pass_manager = bound_pass_manager
self._preprocessed_circuits: list[tuple[QuantumCircuit, list[QuantumCircuit]]] | None = None
self._transpiled_circuits: list[QuantumCircuit] | None = None
self._grouping = list(zip(range(len(self._circuits)), range(len(self._observables))))
self._skip_transpilation = skip_transpilation
self._circuit_ids = {}
self._observable_ids = {}
def transpile_options(self) -> Options:
"""Return the transpiler options for transpiling the circuits."""
return self._transpile_options
def set_transpile_options(self, **fields):
"""Set the transpiler options for transpiler.
**fields: The fields to update the options
self._transpiled_circuits = None
def preprocessed_circuits(
) -> list[tuple[QuantumCircuit, list[QuantumCircuit]]]:
Transpiled quantum circuits produced by preprocessing
List of the transpiled quantum circuit
self._preprocessed_circuits = self._preprocessing()
return self._preprocessed_circuits
def transpiled_circuits(self) -> list[QuantumCircuit]:
Transpiled quantum circuits.
List of the transpiled quantum circuit
QiskitError: if the instance has been closed.
return self._transpiled_circuits
def backend(self) -> BackendV1 | BackendV2:
The backend which this estimator object based on
return self._backend
def _transpile(self):
"""Split Transpile"""
self._transpiled_circuits = []
for common_circuit, diff_circuits in self.preprocessed_circuits:
if self._skip_transpilation:
transpiled_circuit = common_circuit.copy()
perm_pattern = list(range(common_circuit.num_qubits))
transpiled_circuit = transpile(
common_circuit, self.backend, **self.transpile_options.__dict__
if transpiled_circuit.layout is not None:
layout = transpiled_circuit.layout
virtual_bit_map = layout.initial_layout.get_virtual_bits()
perm_pattern = [virtual_bit_map[v] for v in common_circuit.qubits]
if layout.final_layout is not None:
final_mapping = dict(
perm_pattern = [final_mapping[i] for i in perm_pattern]
perm_pattern = list(range(transpiled_circuit.num_qubits))
transpile_opts = copy.copy(self.transpile_options)
diff_circuits = transpile(diff_circuits, self.backend, **transpile_opts.__dict__)
transpiled_circuits = []
for diff_circuit in diff_circuits:
transpiled_circuit_copy = transpiled_circuit.copy()
for creg in diff_circuit.cregs:
if creg not in transpiled_circuit_copy.cregs:
transpiled_circuit_copy.compose(diff_circuit, inplace=True)
transpiled_circuit_copy.metadata = diff_circuit.metadata
self._transpiled_circuits += transpiled_circuits
def _call(
circuits: Sequence[int],
observables: Sequence[int],
parameter_values: Sequence[Sequence[float]],
) -> EstimatorResult:
self._grouping = list(zip(circuits, observables))
transpiled_circuits = self.transpiled_circuits
num_observables = [len(m) for (_, m) in self.preprocessed_circuits]
accum = [0] + list(accumulate(num_observables))
parameter_dicts = [
dict(zip(self._parameters[i], value)) for i, value in zip(circuits, parameter_values)
bound_circuits = [
if len(p) == 0
else transpiled_circuits[circuit_index].bind_parameters(p)
for i, (p, n) in enumerate(zip(parameter_dicts, num_observables))
for circuit_index in range(accum[i], accum[i] + n)
bound_circuits = self._bound_pass_manager_run(bound_circuits)
result, metadata = _run_circuits(bound_circuits, self._backend, **run_options)
return self._postprocessing(result, accum, metadata)
def _run(
circuits: tuple[QuantumCircuit, ...],
observables: tuple[BaseOperator | PauliSumOp, ...],
parameter_values: tuple[tuple[float, ...], ...],
circuit_indices = []
for circuit in circuits:
index = self._circuit_ids.get(_circuit_key(circuit))
if index is not None:
self._circuit_ids[_circuit_key(circuit)] = len(self._circuits)
observable_indices = []
for observable in observables:
observable = init_observable(observable)
index = self._observable_ids.get(_observable_key(observable))
if index is not None:
self._observable_ids[_observable_key(observable)] = len(self._observables)
job = PrimitiveJob(
self._call, circuit_indices, observable_indices, parameter_values, **run_options
return job
def _measurement_circuit(num_qubits: int, pauli: Pauli):
qubit_indices = np.arange(pauli.num_qubits)[pauli.z | pauli.x]
if not np.any(qubit_indices):
qubit_indices = [0]
meas_circuit = QuantumCircuit(num_qubits, len(qubit_indices))
for clbit, i in enumerate(qubit_indices):
if pauli.x[i]:
if pauli.z[i]:
meas_circuit.measure(i, clbit)
return meas_circuit, qubit_indices
def _preprocessing(self) -> list[tuple[QuantumCircuit, list[QuantumCircuit]]]:
Preprocessing for evaluation of expectation value using pauli rotation gates.
preprocessed_circuits = []
for group in self._grouping:
circuit = self._circuits[group[0]]
observable = self._observables[group[1]]
diff_circuits: list[QuantumCircuit] = []
if self._abelian_grouping:
for obs in observable.group_commuting(qubit_wise=True):
basis = Pauli(
(np.logical_or.reduce(obs.paulis.z), np.logical_or.reduce(obs.paulis.x))
meas_circuit, indices = self._measurement_circuit(circuit.num_qubits, basis)
paulis = PauliList.from_symplectic(
obs.paulis.z[:, indices],
obs.paulis.x[:, indices],
meas_circuit.metadata = {
"paulis": paulis,
"coeffs": np.real_if_close(obs.coeffs),
for basis, obs in zip(observable.paulis, observable):
meas_circuit, indices = self._measurement_circuit(circuit.num_qubits, basis)
paulis = PauliList.from_symplectic(
obs.paulis.z[:, indices],
obs.paulis.x[:, indices],
meas_circuit.metadata = {
"paulis": paulis,
"coeffs": np.real_if_close(obs.coeffs),
preprocessed_circuits.append((circuit.copy(), diff_circuits))
return preprocessed_circuits
def _postprocessing(
self, result: list[Result], accum: list[int], metadata: list[dict]
) -> EstimatorResult:
Postprocessing for evaluation of expectation value using pauli rotation gates.
counts = _prepare_counts(result)
expval_list = []
var_list = []
shots_list = []
for i, j in zip(accum, accum[1:]):
combined_expval = 0.0
combined_var = 0.0
for k in range(i, j):
meta = metadata[k]
paulis = meta["paulis"]
coeffs = meta["coeffs"]
count = counts[k]
expvals, variances = _pauli_expval_with_variance(count, paulis)
combined_expval += np.dot(expvals, coeffs)
combined_var += np.dot(variances, coeffs**2)
metadata = [{"variance": var, "shots": shots} for var, shots in zip(var_list, shots_list)]
return EstimatorResult(np.real_if_close(expval_list), metadata)
def _bound_pass_manager_run(self, circuits):
if self._bound_pass_manager is None:
return circuits
output = self._bound_pass_manager.run(circuits)
if not isinstance(output, list):
output = [output]
return output
def _paulis2inds(paulis: PauliList) -> list[int]:
"""Convert PauliList to diagonal integers.
These are integer representations of the binary string with a
1 where there are Paulis, and 0 where there are identities.
nonid = paulis.z | paulis.x
inds = [0] * paulis.size
packed_vals = np.packbits(nonid, axis=1, bitorder="little")
for i, vals in enumerate(packed_vals):
for j, val in enumerate(vals):
inds[i] += val.item() * (1 << (8 * j))
return inds
def _parity(integer: int) -> int:
"""Return the parity of an integer"""
return bin(integer).count("1") % 2
def _pauli_expval_with_variance(counts: Counts, paulis: PauliList) -> tuple[np.ndarray, np.ndarray]:
"""Return array of expval and variance pairs for input Paulis.
Note: All non-identity Pauli's are treated as Z-paulis, assuming
that basis rotations have been applied to convert them to the
diagonal basis.
size = len(paulis)
diag_inds = _paulis2inds(paulis)
expvals = np.zeros(size, dtype=float)
denom = 0
for bin_outcome, freq in counts.items():
outcome = int(bin_outcome, 2)
denom += freq
for k in range(size):
coeff = (-1) ** _parity(diag_inds[k] & outcome)
expvals[k] += freq * coeff
expvals /= denom
variances = 1 - expvals**2
return expvals, variances