Skip to content

Commit

Permalink
fixed apply_unitaries
Browse files Browse the repository at this point in the history
  • Loading branch information
NoureldinYosri committed Jun 1, 2023
1 parent d656a38 commit d10ae04
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 31 deletions.
41 changes: 31 additions & 10 deletions cirq-core/cirq/protocols/apply_unitary_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,17 +410,18 @@ def _strat_apply_unitary_from_apply_unitary(


def _strat_apply_unitary_from_unitary(
unitary_value: Any, args: ApplyUnitaryArgs
unitary_value: Any, args: ApplyUnitaryArgs, matrix: Optional[np.ndarray] = None
) -> Optional[np.ndarray]:
# Check for magic method.
method = getattr(unitary_value, '_unitary_', None)
if method is None:
return NotImplemented
if matrix is None:
# Check for magic method.
method = getattr(unitary_value, '_unitary_', None)
if method is None:
return NotImplemented

# Attempt to get the unitary matrix.
matrix = method()
if matrix is NotImplemented or matrix is None:
return matrix
# Attempt to get the unitary matrix.
matrix = method()
if matrix is NotImplemented or matrix is None:
return matrix

if args.slices is None:
val_qid_shape = qid_shape_protocol.qid_shape(unitary_value, default=(2,) * len(args.axes))
Expand Down Expand Up @@ -454,7 +455,27 @@ def _strat_apply_unitary_from_decompose(val: Any, args: ApplyUnitaryArgs) -> Opt
operations, qubits, _ = _try_decompose_into_operations_and_qubits(val)
if operations is None:
return NotImplemented
return apply_unitaries(operations, qubits, args, None)
all_qubits = frozenset([q for op in operations for q in op.qubits])
ancilla = tuple(sorted(all_qubits.difference(qubits)))
if not len(ancilla):
return apply_unitaries(operations, qubits, args, None)
ordered_qubits = ancilla + tuple(qubits)
all_qid_shapes = qid_shape_protocol.qid_shape(ordered_qubits)
state = qis.eye_tensor(all_qid_shapes, dtype=np.complex128)
buffer = np.empty_like(state)
result = apply_unitaries(
operations,
ordered_qubits,
ApplyUnitaryArgs(state, buffer, range(len(ordered_qubits))),
None,
)
if result is None or result is NotImplemented:
return result
result = result.reshape((np.prod(all_qid_shapes, dtype=np.int64), -1))
val_qid_shape = qid_shape_protocol.qid_shape(qubits)
state_vec_length = np.prod(val_qid_shape, dtype=np.int64)
result = result[:state_vec_length, :state_vec_length]
return _strat_apply_unitary_from_unitary(val, args, matrix=result)


def apply_unitaries(
Expand Down
40 changes: 40 additions & 0 deletions cirq-core/cirq/protocols/apply_unitary_protocol_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,3 +717,43 @@ def test_cast_to_complex():
np.ComplexWarning, match='Casting complex values to real discards the imaginary part'
):
cirq.apply_unitary(y0, args)


class NotDecomposableGate(cirq.Gate):
def num_qubits(self):
return 1


class DecomposableGate(cirq.Gate):
def __init__(self, sub_gate: cirq.Gate, allocate_ancilla: bool) -> None:
super().__init__()
self._sub_gate = sub_gate
self._allocate_ancilla = allocate_ancilla

def num_qubits(self):
return 1

def _decompose_(self, qubits):
if self._allocate_ancilla:
yield cirq.Z(cirq.LineQubit(1))
yield self._sub_gate(qubits[0])


def test_strat_apply_unitary_from_decompose():
state = np.eye(2, dtype=np.complex128)
args = cirq.ApplyUnitaryArgs(
target_tensor=state, available_buffer=np.zeros_like(state), axes=(0,)
)
np.testing.assert_allclose(
cirq.apply_unitaries(
[DecomposableGate(cirq.X, False)(cirq.LineQubit(0))], [cirq.LineQubit(0)], args
),
[[0, 1], [1, 0]],
)

with pytest.raises(TypeError):
_ = cirq.apply_unitaries(
[DecomposableGate(NotDecomposableGate(), True)(cirq.LineQubit(0))],
[cirq.LineQubit(0)],
args,
)
8 changes: 3 additions & 5 deletions cirq-core/cirq/protocols/unitary_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,10 @@ def _strat_unitary_from_decompose(val: Any) -> Optional[np.ndarray]:

all_qubits = frozenset(q for op in operations for q in op.qubits)
work_qubits = frozenset(qubits)
ancillas = tuple(sorted(q for q in all_qubits if q not in work_qubits))
ancillas = tuple(sorted(all_qubits.difference(work_qubits)))

ordered_qubits = ancillas + tuple(qubits)
val_qid_shape = (2,) * len(
ancillas
) + val_qid_shape # For now ancillas have only one qid_shape = (2,).
val_qid_shape = qid_shape_protocol.qid_shape(ancillas) + val_qid_shape

# Apply sub-operations' unitary effects to an identity matrix.
state = qis.eye_tensor(val_qid_shape, dtype=np.complex128)
Expand All @@ -200,9 +198,9 @@ def _strat_unitary_from_decompose(val: Any) -> Optional[np.ndarray]:
return None

state_len = np.prod(val_qid_shape, dtype=np.int64)
work_state_len = np.prod(val_qid_shape[len(ancillas) :], dtype=np.int64)
result = result.reshape((state_len, state_len))
# Assuming borrowable qubits are restored to their original state and
# clean qubits restord to the zero state then the desired unitary is
# the upper left square.
work_state_len = np.prod(val_qid_shape[len(ancillas) :], dtype=np.int64)
return result[:work_state_len, :work_state_len]
79 changes: 63 additions & 16 deletions cirq-core/cirq/protocols/unitary_protocol_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from typing import Optional, cast
import functools

import numpy as np
import pytest
Expand Down Expand Up @@ -94,22 +95,24 @@ def _decompose_(self, qubits):


class GateThatAllocatesAQubit(cirq.Gate):
_target_unitary = np.array([[1, 0], [0, -1]])
def __init__(self, theta: float) -> None:
super().__init__()
self._theta = theta

def _num_qubits_(self):
return 1

def _decompose_(self, q):
anc = cirq.NamedQubit("anc")
yield cirq.CX(*q, anc)
yield (cirq.Z)(anc)
yield (cirq.Z**self._theta)(anc)
yield cirq.CX(*q, anc)

def target_unitary(self) -> np.ndarray:
return np.array([[1, 0], [0, (-1 + 0j) ** self._theta]])

class GateThatAllocatesTwoQubits(cirq.Gate):
# Unitary = (-j I_2) \otimes Z
_target_unitary = np.array([[-1j, 0, 0, 0], [0, 1j, 0, 0], [0, 0, 1j, 0], [0, 0, 0, -1j]])

class GateThatAllocatesTwoQubits(cirq.Gate):
def _num_qubits_(self):
return 2

Expand All @@ -126,6 +129,33 @@ def _decompose_(self, qs):
yield (cirq.Z)(anc[1])
yield cirq.CX(q1, anc[1])

@classmethod
def target_unitary(cls) -> np.ndarray:
# Unitary = (-j I_2) \otimes Z
return np.array([[-1j, 0, 0, 0], [0, 1j, 0, 0], [0, 0, 1j, 0], [0, 0, 0, -1j]])


class GateThatDecomposesIntoNGates(cirq.Gate):
def __init__(self, n: int, sub_gate: cirq.Gate, theta: float) -> None:
super().__init__()
self._n = n
self._subgate = sub_gate
self._name = str(sub_gate)
self._theta = theta

def _num_qubits_(self) -> int:
return self._n

def _decompose_(self, qs):
ancilla = cirq.NamedQubit.range(self._n, prefix=self._name)
yield self._subgate.on_each(ancilla)
yield (cirq.Z**self._theta).on_each(qs)
yield self._subgate.on_each(ancilla)

def target_unitary(self) -> np.ndarray:
U = np.array([[1, 0], [0, (-1 + 0j) ** self._theta]])
return functools.reduce(np.kron, [U] * self._n)


class DecomposableOperation(cirq.Operation):
qubits = ()
Expand Down Expand Up @@ -222,6 +252,31 @@ def test_has_unitary():
assert not cirq.has_unitary(FullyImplemented(False))


@pytest.mark.parametrize('theta', np.linspace(0, 2 * np.pi, 10))
def test_decompose_gate_that_allocates_qubits(theta: float):
from cirq.protocols.unitary_protocol import _strat_unitary_from_decompose

gate = GateThatAllocatesAQubit(theta)
np.testing.assert_allclose(
cast(np.ndarray, _strat_unitary_from_decompose(gate)), gate.target_unitary()
)
np.testing.assert_allclose(
cast(np.ndarray, _strat_unitary_from_decompose(gate(a))), gate.target_unitary()
)


@pytest.mark.parametrize('theta', np.linspace(0, 2 * np.pi, 10))
@pytest.mark.parametrize('n', [*range(1, 6)])
def test_recusive_decomposition(n: int, theta: float):
from cirq.protocols.unitary_protocol import _strat_unitary_from_decompose

g1 = GateThatDecomposesIntoNGates(n, cirq.H, theta)
g2 = GateThatDecomposesIntoNGates(n, g1, theta)
np.testing.assert_allclose(
cast(np.ndarray, _strat_unitary_from_decompose(g2)), g2.target_unitary()
)


def test_decompose_and_get_unitary():
from cirq.protocols.unitary_protocol import _strat_unitary_from_decompose

Expand All @@ -235,21 +290,13 @@ def test_decompose_and_get_unitary():
np.testing.assert_allclose(_strat_unitary_from_decompose(DummyComposite()), np.eye(1))
np.testing.assert_allclose(_strat_unitary_from_decompose(OtherComposite()), m2)

np.testing.assert_allclose(
_strat_unitary_from_decompose(GateThatAllocatesAQubit()),
GateThatAllocatesAQubit._target_unitary,
)
np.testing.assert_allclose(
_strat_unitary_from_decompose(GateThatAllocatesAQubit().on(a)),
GateThatAllocatesAQubit._target_unitary,
)
np.testing.assert_allclose(
_strat_unitary_from_decompose(GateThatAllocatesTwoQubits()),
GateThatAllocatesTwoQubits._target_unitary,
GateThatAllocatesTwoQubits.target_unitary(),
)
np.testing.assert_allclose(
_strat_unitary_from_decompose(GateThatAllocatesTwoQubits().on(a, b)),
GateThatAllocatesTwoQubits._target_unitary,
GateThatAllocatesTwoQubits.target_unitary(),
)


Expand Down

0 comments on commit d10ae04

Please sign in to comment.