from __future__ import annotations
import math
from collections import Counter
from enum import Enum
from itertools import product
from typing import Dict, List, Tuple, Union, cast
import numpy as np
from matplotlib.pyplot import subplots
from numpy.random import Generator
from numpy.typing import NDArray
from pytket import Circuit, Qubit
from pytket.circuit import OpType
from pytket.pauli import Pauli, QubitPauliString
from scipy.linalg import fractional_matrix_power # type: ignore
from .qermit_pauli import QermitPauli
Direction = Enum("Direction", ["forward", "backward"])
[docs]
class ErrorDistribution:
"""
Model of a Pauli error channel. Contains utilities to analyse and
sample from distributions of errors.
Attributes:
distribution: Dictionary mapping a string of Pauli errors to the
probability that they occur.
rng: Randomness generator.
"""
distribution: Dict[Tuple[Pauli, ...], float]
rng: Generator
def __init__(
self,
distribution: Dict[Tuple[Pauli, ...], float],
rng: Generator = np.random.default_rng(),
):
"""
:param distribution: Dictionary mapping a string of Pauli errors
to the probability that they occur.
:param rng: Randomness generator, defaults to np.random.default_rng()
:raises Exception: Raised if error probabilities sum to greater than 1.
"""
if sum(distribution.values()) > 1:
if not math.isclose(sum(distribution.values()), 1):
raise Exception(
f"Probabilities sum to {sum(distribution.values())}"
+ " but should be less than or equal to 1."
)
# If the given distribution is empty then no
# noise will be acted.
if distribution == {}:
pass
# If it it not empty then we check that the number
# of qubits in each of the errors match.
else:
n_qubits = len(list(distribution.keys())[0])
if not all(len(error) == n_qubits for error in distribution.keys()):
raise Exception("Errors must all act on the same number of qubits.")
self.distribution = distribution
self.rng = rng
@property
def identity_error_rate(self) -> float:
"""The rate at which no error occurs.
:return: Rate at which no error occurs.
Calculated as 1 minus the total error rate of
error in this distribution.
"""
return 1 - sum(self.distribution.values())
[docs]
def to_ptm(self) -> Tuple[NDArray, Dict[Tuple[Pauli, ...], int]]:
"""Convert error distribution to Pauli Transfer Matrix (PTM) form.
:return: PTM of error distribution and Pauli index dictionary.
The Pauli index dictionary maps Pauli errors to their
index in the PTM
"""
# Initialise an empty PTM and index dictionary
# of the appropriate size.
ptm = np.zeros((4**self.n_qubits, 4**self.n_qubits))
pauli_index = {
pauli: index
for index, pauli in enumerate(
product({Pauli.I, Pauli.X, Pauli.Y, Pauli.Z}, repeat=self.n_qubits)
)
}
# For each pauli, calculate the corresponding
# PTM entry as a sum pf error weights multiplied by +/-1
# Depending on commutation relations.
for pauli_tuple, index in pauli_index.items():
pauli = QermitPauli.from_pauli_iterable(
pauli_iterable=pauli_tuple,
qubit_list=[Qubit(i) for i in range(self.n_qubits)],
)
# Can add the identity error rate.
# This will not come up in the following for loop
# as the error distribution does not save
# the rate at which no errors occur.
ptm[index][index] += self.identity_error_rate
for error, error_rate in self.distribution.items():
error_pauli = QermitPauli.from_pauli_iterable(
pauli_iterable=error,
qubit_list=[Qubit(i) for i in range(self.n_qubits)],
)
ptm[index][index] += error_rate * QermitPauli.commute_coeff(
pauli_one=pauli, pauli_two=error_pauli
)
# Some checks that the form of the PTM is correct.
identity = tuple(Pauli.I for _ in range(self.n_qubits))
if not abs(ptm[pauli_index[identity]][pauli_index[identity]] - 1.0) < 10 ** (
-6
):
raise Exception(
"The identity entry of the PTM is incorrect. "
+ "This is a fault in Qermit. "
+ "Please report this as an issue."
)
if not self == ErrorDistribution.from_ptm(ptm=ptm, pauli_index=pauli_index):
raise Exception(
"From PTM does not match to PTM. "
+ "This is a fault in Qermit. "
+ "Please report this as an issue."
)
return ptm, pauli_index
[docs]
@classmethod
def from_ptm(
cls, ptm: NDArray, pauli_index: Dict[Tuple[Pauli, ...], int]
) -> ErrorDistribution:
"""Convert a Pauli Transfer Matrix (PTM) to an error distribution.
:param ptm: Pauli Transfer Matrix to convert. Should be a 4^n by 4^n matrix
where n is the number of qubits.
:param pauli_index: A dictionary mapping Pauli errors to
their index in the PTM.
:return: The converted error distribution.
"""
if ptm.ndim != 2:
raise Exception(
f"This given matrix is not has dimension {ptm.ndim} "
+ "but should have dimension 2."
)
if ptm.shape[0] != ptm.shape[1]:
raise Exception(
"The dimensions of the given PTM are "
+ f"{ptm.shape[0]} and {ptm.shape[1]} "
+ "but they should match."
)
n_qubit = math.log(ptm.shape[0], 4)
if n_qubit % 1 != 0.0:
raise Exception(
"The given PTM should have a dimension of the form 4^n "
+ "where n is the number of qubits."
)
if not np.array_equal(ptm, np.diag(np.diag(ptm))):
raise Exception("The given PTM is not diagonal as it should be.")
# calculate the error rates by solving simultaneous
# linear equations. In particular the matrix to invert
# is the matrix of commutation values.
commutation_matrix = np.zeros(ptm.shape)
for pauli_one_tuple, index_one in pauli_index.items():
pauli_one = QermitPauli.from_pauli_iterable(
pauli_iterable=pauli_one_tuple,
qubit_list=[Qubit(i) for i in range(len(pauli_one_tuple))],
)
for pauli_two_tuple, index_two in pauli_index.items():
pauli_two = QermitPauli.from_pauli_iterable(
pauli_iterable=pauli_two_tuple,
qubit_list=[Qubit(i) for i in range(len(pauli_two_tuple))],
)
commutation_matrix[index_one][index_two] = QermitPauli.commute_coeff(
pauli_one=pauli_one, pauli_two=pauli_two
)
error_rate_list = np.matmul(ptm.diagonal(), np.linalg.inv(commutation_matrix))
distribution = {
error: error_rate_list[index]
for error, index in pauli_index.items()
if (error_rate_list[index] > 10 ** (-6))
and error != tuple(Pauli.I for _ in range(int(n_qubit)))
}
return cls(distribution=distribution)
@property
def n_qubits(self) -> int:
"""The number of qubits this error distribution acts on."""
return len(list(self.distribution.keys())[0])
def __eq__(self, other: object) -> bool:
"""Check equality of two instances of ErrorDistribution by ensuring
that all keys in distribution match, and that the probabilities are
close for each value.
:param other: Instance of ErrorDistribution to be compared against.
:return: True if two instances are equal, false otherwise.
"""
if not isinstance(other, ErrorDistribution):
return False
# Check all pauli error in this distributions are the same.
if set(self.distribution.keys()) != set(other.distribution.keys()):
return False
# Check all probabilities are close.
if not all(
math.isclose(
self.distribution[error], other.distribution[error], abs_tol=0.01
)
for error in self.distribution.keys()
):
return False
# Otherwise they are equal.
return True
def __str__(self) -> str:
"""Generates string representation of error distribution.
:return: String representation of error distribution.
"""
return "".join(f"{key}:{value} \n" for key, value in self.distribution.items())
[docs]
@classmethod
def mixture(cls, distribution_list: List[ErrorDistribution]) -> ErrorDistribution:
"""Generates the distribution corresponding to the mixture of a
list of distributions.
:param distribution_list: List of instances of ErrorDistribution.
:return: Mixture distribution.
"""
return cls(
distribution={
error: sum(
distribution.distribution.get(error, 0)
for distribution in distribution_list
)
/ len(distribution_list)
for error in set(
error
for distribution in distribution_list
for error in distribution.distribution
)
}
)
[docs]
def order(self, reverse: bool = True):
"""Reorders the distribution dictionary based on probabilities.
:param reverse: Order from high to low, defaults to True
"""
self.distribution = {
error: probability
for error, probability in sorted(
self.distribution.items(), key=lambda x: x[1], reverse=reverse
)
}
[docs]
def reset_rng(self, rng: Generator):
"""Reset randomness generator.
:param rng: Randomness generator.
"""
self.rng = rng
[docs]
def to_dict(self) -> List[Dict[str, Union[List[int], float]]]:
"""Produces json serialisable representation of ErrorDistribution.
:return: Json serialisable representation of ErrorDistribution.
"""
return [
{
"op_list": [op.value for op in op_list],
"noise_level": noise_level,
}
for op_list, noise_level in self.distribution.items()
]
[docs]
@classmethod
def from_dict(
cls, distribution_dict: List[Dict[str, Union[List[int], float]]]
) -> ErrorDistribution:
"""Generates ErrorDistribution from json serialisable representation.
:param distribution_dict: List of dictionaries, each of which map
a property of the distribution to its value.
:return: ErrorDistribution created from serialised representation.
"""
return cls(
distribution={
tuple(Pauli(op) for op in cast(List[int], noise_op["op_list"])): cast(
float, noise_op["noise_level"]
)
for noise_op in distribution_dict
}
)
[docs]
def sample(self) -> Union[Tuple[Pauli, ...], None]:
"""Draw sample from distribution.
:return: Either one of the pauli strings in the support of the
distribution, or None. None can be returned if the total proability
of the distribution not 1, and should be interpreted as the
the unspecified support.
"""
return_val = self.rng.uniform(0, 1)
total = 0.0
for error, prob in self.distribution.items():
total += prob
if total >= return_val:
return error
return None
[docs]
def plot(self):
"""
Generates plot of distribution.
"""
fig, ax = subplots()
to_plot = {key: value for key, value in self.distribution.items()}
ax.bar(range(len(to_plot)), list(to_plot.values()), align="center")
ax.set_xticks(
ticks=range(len(to_plot)),
labels=list(
"".join(tuple(op.name for op in op_tuple))
for op_tuple in to_plot.keys()
),
)
return fig
[docs]
def scale(self, scaling_factor: float) -> ErrorDistribution:
"""Scale the error rates of this error distribution.
This is done by converting the error distribution to a PTM,
scaling that matrix appropriately, and converting back to a
new error distribution.
:param scaling_factor: The factor by which the noise should be scaled.
:return: A new error distribution with the noise scaled.
"""
ptm, pauli_index = self.to_ptm()
scaled_ptm = fractional_matrix_power(ptm, scaling_factor)
return ErrorDistribution.from_ptm(ptm=scaled_ptm, pauli_index=pauli_index)
class LogicalErrorDistribution:
"""
Class for managing distributions of logical errors from a noisy
simulation of a circuit. This differs from ErrorDistribution in that
the errors are pauli strings rather than tuples of Paulis. That is to
say that the errors are fixed to qubits.
Attributes:
pauli_error_counter: Counts number of pauli errors in distribution
"""
pauli_error_counter: Counter[QermitPauli]
def __init__(self, pauli_error_counter: Counter[QermitPauli], **kwargs):
"""Initialisation method. Stores pauli error counter.
:param pauli_error_counter: Counter of pauli errors.
:key total: The total number of shots taken when measuring the
errors. By default this will be taken to be the total
number of errors.
"""
self.pauli_error_counter = pauli_error_counter
self.total = kwargs.get("total", sum(self.pauli_error_counter.values()))
@property
def distribution(self) -> Dict[QubitPauliString, float]:
"""Probability distribution equivalent to counts distribution.
:return: Dictionary mapping QubitPauliString to probability that
that error occurs.
"""
distribution: Dict[QubitPauliString, float] = {}
for stab, count in dict(self.pauli_error_counter).items():
# Note that the phase is ignored here
pauli_string, _ = stab.qubit_pauli_string
distribution[pauli_string] = (
distribution.get(pauli_string, 0) + count / self.total
)
return distribution
def post_select(self, qubit_list: List[Qubit]) -> LogicalErrorDistribution:
"""Post select based on the given qubits. In particular remove the
the given qubits, and the shots with measurable errors on those qubits.
:param qubit_list: List of qubits to be post selected on.
:return: New LogicalErrorDistribution with given qubits removed,
and those shots where there are measurable errors on those
qubits removed.
"""
# the number of error free shots.
total = self.total - sum(self.pauli_error_counter.values())
distribution: Counter[QermitPauli] = Counter()
for pauli_error, count in self.pauli_error_counter.items():
if not pauli_error.is_measureable(qubit_list):
distribution[pauli_error.reduce_qubits(qubit_list)] += count
total += count
return LogicalErrorDistribution(
pauli_error_counter=distribution,
total=total,
)
[docs]
class NoiseModel:
"""
Module for managing and executing a circuit noise model. In particular
error models are assigned to each gate, and logical errors from
circuits can be sampled.
Attributes:
noise_model: Mapping from gates to the error model which corresponds
to that gate.
"""
noise_model: Dict[OpType, ErrorDistribution]
def __init__(self, noise_model: Dict[OpType, ErrorDistribution]):
"""
:param noise_model: Map from gates to their error models.
"""
self.noise_model = noise_model
[docs]
def scale(self, scaling_factor: float) -> NoiseModel:
"""Generate new error model where all error rates have been scaled by
the given scaling factor.
:param scaling_factor: Factor by which to scale the error rates.
:return: New noise model with scaled error rates.
"""
return NoiseModel(
noise_model={
op_type: error_distribution.scale(scaling_factor=scaling_factor)
for op_type, error_distribution in self.noise_model.items()
}
)
[docs]
def reset_rng(self, rng: Generator):
"""Reset randomness generator.
:param rng: Randomness generator to be reset to.
"""
for distribution in self.noise_model.values():
distribution.reset_rng(rng=rng)
[docs]
def plot(self):
"""Generates plot of noise model."""
fig_list = []
for noisy_gate, distribution in self.noise_model.items():
fig = distribution.plot()
ax = fig.axes[0]
ax.set_title(noisy_gate.name)
fig_list.append(fig)
return fig_list
def __eq__(self, other: object) -> bool:
"""Checks equality by checking all gates in the two noise models match,
and that the noise models of each gate match.
:param other: Noise model to be compared against.
:return: True if equivalent, false otherwise.
"""
if not isinstance(other, NoiseModel):
return False
if not (sorted(self.noise_model.keys()) == sorted(other.noise_model.keys())):
return False
if not all(
self.noise_model[op] == other.noise_model[op]
for op in self.noise_model.keys()
):
return False
return True
[docs]
def to_dict(self) -> Dict[str, List[Dict[str, Union[List[int], float]]]]:
"""Json serialisable object representing noise model.
:return: Json serialisable object representing noise model.
"""
return {
op.name: distribution.to_dict()
for op, distribution in self.noise_model.items()
}
[docs]
@classmethod
def from_dict(
cls, noise_model_dict: Dict[str, List[Dict[str, Union[List[int], float]]]]
) -> NoiseModel:
"""Convert JSON serialised version of noise model back to an instance
of NoiseModel.
:param noise_model_dict: JSON serialised version of NoiseModel
:return: Instance of noise model corresponding to JSON serialised
version.
"""
return cls(
noise_model={
OpType.from_name(op): ErrorDistribution.from_dict(error_distribution)
for op, error_distribution in noise_model_dict.items()
}
)
@property
def noisy_gates(self) -> List[OpType]:
"""List of OpTypes with noise.
:return: List of OpTypes with noise.
"""
return list(self.noise_model.keys())
[docs]
def get_error_distribution(self, optype: OpType) -> ErrorDistribution:
"""Recovers error model corresponding to particular OpType.
:param optype: OpType for which noise model should be retrieved.
:return: Error model corresponding to particular OpType.
"""
return self.noise_model[optype]
[docs]
def get_effective_pre_error_distribution(
self,
cliff_circ: Circuit,
n_rand: int = 1000,
**kwargs,
) -> LogicalErrorDistribution:
"""Retrieve the effective noise model of a given circuit. This is to
say, repeatedly generate circuits with coherent noise added at random.
Push all errors to the front of the circuit. Return a counter of the
errors which have been pushed to the front.
:param cliff_circ: Circuit to be simulated. This should be a Clifford
circuit.
:param n_rand: Number of random circuit instances, defaults to 1000
:return: Resulting distribution of errors.
"""
error_counter = self.counter_propagate(
cliff_circ=cliff_circ,
n_counts=n_rand,
direction=Direction.backward,
)
return LogicalErrorDistribution(error_counter, total=n_rand)
[docs]
def counter_propagate(
self, cliff_circ: Circuit, n_counts: int, **kwargs
) -> Counter[QermitPauli]:
"""Generate random noisy instances of the given circuit and propagate
the noise to create a counter of logical errors. Note that
kwargs are passed onto `random_propagate`.
:param cliff_circ: Circuit to be simulated. This should be a Clifford
circuit.
:param n_counts: Number of random instances.
:return: Counter of logical errors.
"""
error_counter: Counter[QermitPauli] = Counter()
# TODO: There is some time wasted here, if for example there is
# no error in back_propagate_random_error. There may be a saving to
# be made here if there errors are sampled before the back
# propagation occurs?
for _ in range(n_counts):
pauli_error = self.random_propagate(cliff_circ, **kwargs)
if not pauli_error.is_identity:
error_counter.update([pauli_error])
return error_counter
[docs]
def random_propagate(
self,
cliff_circ: Circuit,
direction: Direction = Direction.backward,
) -> QermitPauli:
"""Generate a random noisy instance of the given circuit and
propagate the noise forward or backward to recover the logical error.
:param cliff_circ: Circuit to be simulated. This should be a Clifford
circuit.
:param direction: Direction in which noise should be propagated,
defaults to 'backward'
:raises Exception: Raised if direction is invalid.
:return: Resulting logical error.
"""
# Create identity error.
qubit_list = cliff_circ.qubits
pauli_error = QermitPauli(
Z_list=[0] * len(qubit_list),
X_list=[0] * len(qubit_list),
qubit_list=qubit_list,
)
# Commands are ordered in reverse or original order depending on which
# way the noise is being pushed.
if direction == Direction.backward:
command_list = list(reversed(cliff_circ.get_commands()))
elif direction == Direction.forward:
command_list = cliff_circ.get_commands()
else:
raise Exception(
"Direction must be Direction.backward or Direction.forward."
)
# For each command in the circuit, add an error as appropriate, and
# push the total error through the command.
for command in command_list:
if command.op.type in [OpType.Measure, OpType.Barrier]:
continue
if direction == Direction.forward:
# Apply gate to total error.
pauli_error.apply_gate(
op_type=command.op.type,
qubits=cast(List[Qubit], command.args),
params=command.op.params,
)
# Add noise operation if appropriate.
if command.op.type in self.noisy_gates:
error_distribution = self.get_error_distribution(optype=command.op.type)
error = error_distribution.sample()
if error is not None:
for pauli, qubit in zip(error, command.args):
if direction == Direction.backward:
pauli_error.pre_apply_pauli(
pauli=pauli, qubit=cast(Qubit, qubit)
)
elif direction == Direction.forward:
pauli_error.post_apply_pauli(
pauli=pauli, qubit=cast(Qubit, qubit)
)
else:
raise Exception(
"Direction must be Direction.backward or Direction.forward. "
)
if direction == Direction.backward:
# Note that here we wish to pull the pauli back through the gate,
# which has the same effect on the pauli as pushing through the
# dagger.
pauli_error.apply_gate(
op_type=command.op.dagger.type,
qubits=cast(List[Qubit], command.args),
params=command.op.dagger.params,
)
return pauli_error