herutriana44's picture
First Commit
b7d9967 verified
# This code is part of Qiskit.
#
# (C) Copyright IBM 2017, 2019, 2021.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Objects to represent the information at a node in the DAGCircuit."""
import uuid
from typing import Iterable
from qiskit.circuit import (
Qubit,
Clbit,
ClassicalRegister,
ControlFlowOp,
IfElseOp,
WhileLoopOp,
SwitchCaseOp,
ForLoopOp,
Parameter,
)
from qiskit.circuit.classical import expr
def _legacy_condition_eq(cond1, cond2, bit_indices1, bit_indices2):
if cond1 is cond2 is None:
return True
elif None in (cond1, cond2):
return False
target1, val1 = cond1
target2, val2 = cond2
if val1 != val2:
return False
if isinstance(target1, Clbit) and isinstance(target2, Clbit):
return bit_indices1[target1] == bit_indices2[target2]
if isinstance(target1, ClassicalRegister) and isinstance(target2, ClassicalRegister):
return target1.size == target2.size and all(
bit_indices1[t1] == bit_indices2[t2] for t1, t2 in zip(target1, target2)
)
return False
def _circuit_to_dag(circuit, node_qargs, node_cargs, bit_indices):
"""Get a :class:`.DAGCircuit` of the given :class:`.QuantumCircuit`. The bits in the output
will be ordered in a canonical order based on their indices in the outer DAG, as defined by the
``bit_indices`` mapping and the ``node_{q,c}args`` arguments."""
from qiskit.converters import circuit_to_dag # pylint: disable=cyclic-import
def sort_key(bits):
outer, _inner = bits
return bit_indices[outer]
return circuit_to_dag(
circuit,
copy_operations=False,
qubit_order=[
inner for _outer, inner in sorted(zip(node_qargs, circuit.qubits), key=sort_key)
],
clbit_order=[
inner for _outer, inner in sorted(zip(node_cargs, circuit.clbits), key=sort_key)
],
)
def _make_expr_key(bit_indices):
def key(var):
if isinstance(var, Clbit):
return bit_indices.get(var)
if isinstance(var, ClassicalRegister):
return [bit_indices.get(bit) for bit in var]
return None
return key
def _condition_op_eq(node1, node2, bit_indices1, bit_indices2):
cond1 = node1.op.condition
cond2 = node2.op.condition
if isinstance(cond1, expr.Expr) and isinstance(cond2, expr.Expr):
if not expr.structurally_equivalent(
cond1, cond2, _make_expr_key(bit_indices1), _make_expr_key(bit_indices2)
):
return False
elif isinstance(cond1, expr.Expr) or isinstance(cond2, expr.Expr):
return False
elif not _legacy_condition_eq(cond1, cond2, bit_indices1, bit_indices2):
return False
return len(node1.op.blocks) == len(node2.op.blocks) and all(
_circuit_to_dag(block1, node1.qargs, node1.cargs, bit_indices1)
== _circuit_to_dag(block2, node2.qargs, node2.cargs, bit_indices2)
for block1, block2 in zip(node1.op.blocks, node2.op.blocks)
)
def _switch_case_eq(node1, node2, bit_indices1, bit_indices2):
target1 = node1.op.target
target2 = node2.op.target
if isinstance(target1, expr.Expr) and isinstance(target2, expr.Expr):
if not expr.structurally_equivalent(
target1, target2, _make_expr_key(bit_indices1), _make_expr_key(bit_indices2)
):
return False
elif isinstance(target1, Clbit) and isinstance(target2, Clbit):
if bit_indices1[target1] != bit_indices2[target2]:
return False
elif isinstance(target1, ClassicalRegister) and isinstance(target2, ClassicalRegister):
if target1.size != target2.size or any(
bit_indices1[b1] != bit_indices2[b2] for b1, b2 in zip(target1, target2)
):
return False
else:
return False
cases1 = [case for case, _ in node1.op.cases_specifier()]
cases2 = [case for case, _ in node2.op.cases_specifier()]
return (
len(cases1) == len(cases2)
and all(set(labels1) == set(labels2) for labels1, labels2 in zip(cases1, cases2))
and len(node1.op.blocks) == len(node2.op.blocks)
and all(
_circuit_to_dag(block1, node1.qargs, node1.cargs, bit_indices1)
== _circuit_to_dag(block2, node2.qargs, node2.cargs, bit_indices2)
for block1, block2 in zip(node1.op.blocks, node2.op.blocks)
)
)
def _for_loop_eq(node1, node2, bit_indices1, bit_indices2):
indexset1, param1, body1 = node1.op.params
indexset2, param2, body2 = node2.op.params
if indexset1 != indexset2:
return False
if (param1 is None and param2 is not None) or (param1 is not None and param2 is None):
return False
if param1 is not None and param2 is not None:
sentinel = Parameter(str(uuid.uuid4()))
body1 = (
body1.assign_parameters({param1: sentinel}, inplace=False)
if param1 in body1.parameters
else body1
)
body2 = (
body2.assign_parameters({param2: sentinel}, inplace=False)
if param2 in body2.parameters
else body2
)
return _circuit_to_dag(body1, node1.qargs, node1.cargs, bit_indices1) == _circuit_to_dag(
body2, node2.qargs, node2.cargs, bit_indices2
)
_SEMANTIC_EQ_CONTROL_FLOW = {
IfElseOp: _condition_op_eq,
WhileLoopOp: _condition_op_eq,
SwitchCaseOp: _switch_case_eq,
ForLoopOp: _for_loop_eq,
}
_SEMANTIC_EQ_SYMMETRIC = frozenset({"barrier", "swap", "break_loop", "continue_loop"})
class DAGNode:
"""Parent class for DAGOpNode, DAGInNode, and DAGOutNode."""
__slots__ = ["_node_id"]
def __init__(self, nid=-1):
"""Create a node"""
self._node_id = nid
def __lt__(self, other):
return self._node_id < other._node_id
def __gt__(self, other):
return self._node_id > other._node_id
def __str__(self):
# TODO is this used anywhere other than in DAG drawing?
# needs to be unique as it is what pydot uses to distinguish nodes
return str(id(self))
@staticmethod
def semantic_eq(node1, node2, bit_indices1, bit_indices2):
"""
Check if DAG nodes are considered equivalent, e.g., as a node_match for
:func:`rustworkx.is_isomorphic_node_match`.
Args:
node1 (DAGOpNode, DAGInNode, DAGOutNode): A node to compare.
node2 (DAGOpNode, DAGInNode, DAGOutNode): The other node to compare.
bit_indices1 (dict): Dictionary mapping Bit instances to their index
within the circuit containing node1
bit_indices2 (dict): Dictionary mapping Bit instances to their index
within the circuit containing node2
Return:
Bool: If node1 == node2
"""
if not isinstance(node1, DAGOpNode) or not isinstance(node1, DAGOpNode):
return type(node1) is type(node2) and bit_indices1.get(node1.wire) == bit_indices2.get(
node2.wire
)
if isinstance(node1.op, ControlFlowOp) and isinstance(node2.op, ControlFlowOp):
# While control-flow operations aren't represented natively in the DAG, we have to do
# some unpleasant dispatching and very manual handling. Once they have more first-class
# support we'll still be dispatching, but it'll look more appropriate (like the dispatch
# based on `DAGOpNode`/`DAGInNode`/`DAGOutNode` that already exists) and less like we're
# duplicating code from the `ControlFlowOp` classes.
if type(node1.op) is not type(node2.op):
return False
comparer = _SEMANTIC_EQ_CONTROL_FLOW.get(type(node1.op))
if comparer is None: # pragma: no cover
raise RuntimeError(f"unhandled control-flow operation: {type(node1.op)}")
return comparer(node1, node2, bit_indices1, bit_indices2)
node1_qargs = [bit_indices1[qarg] for qarg in node1.qargs]
node1_cargs = [bit_indices1[carg] for carg in node1.cargs]
node2_qargs = [bit_indices2[qarg] for qarg in node2.qargs]
node2_cargs = [bit_indices2[carg] for carg in node2.cargs]
# For barriers, qarg order is not significant so compare as sets
if node1.op.name == node2.op.name and node1.name in _SEMANTIC_EQ_SYMMETRIC:
node1_qargs = set(node1_qargs)
node1_cargs = set(node1_cargs)
node2_qargs = set(node2_qargs)
node2_cargs = set(node2_cargs)
return (
node1_qargs == node2_qargs
and node1_cargs == node2_cargs
and _legacy_condition_eq(
getattr(node1.op, "condition", None),
getattr(node2.op, "condition", None),
bit_indices1,
bit_indices2,
)
and node1.op == node2.op
)
class DAGOpNode(DAGNode):
"""Object to represent an Instruction at a node in the DAGCircuit."""
__slots__ = ["op", "qargs", "cargs", "sort_key"]
def __init__(self, op, qargs: Iterable[Qubit] = (), cargs: Iterable[Clbit] = ()):
"""Create an Instruction node"""
super().__init__()
self.op = op
self.qargs = tuple(qargs)
self.cargs = tuple(cargs)
self.sort_key = str(self.qargs)
@property
def name(self):
"""Returns the Instruction name corresponding to the op for this node"""
return self.op.name
@name.setter
def name(self, new_name):
"""Sets the Instruction name corresponding to the op for this node"""
self.op.name = new_name
def __repr__(self):
"""Returns a representation of the DAGOpNode"""
return f"DAGOpNode(op={self.op}, qargs={self.qargs}, cargs={self.cargs})"
class DAGInNode(DAGNode):
"""Object to represent an incoming wire node in the DAGCircuit."""
__slots__ = ["wire", "sort_key"]
def __init__(self, wire):
"""Create an incoming node"""
super().__init__()
self.wire = wire
# TODO sort_key which is used in dagcircuit.topological_nodes
# only works as str([]) for DAGInNodes. Need to figure out why.
self.sort_key = str([])
def __repr__(self):
"""Returns a representation of the DAGInNode"""
return f"DAGInNode(wire={self.wire})"
class DAGOutNode(DAGNode):
"""Object to represent an outgoing wire node in the DAGCircuit."""
__slots__ = ["wire", "sort_key"]
def __init__(self, wire):
"""Create an outgoing node"""
super().__init__()
self.wire = wire
# TODO sort_key which is used in dagcircuit.topological_nodes
# only works as str([]) for DAGOutNodes. Need to figure out why.
self.sort_key = str([])
def __repr__(self):
"""Returns a representation of the DAGOutNode"""
return f"DAGOutNode(wire={self.wire})"