Coverage for /home/runner/work/tket/tket/pytket/pytket/utils/spam.py: 92%
223 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-09 15:08 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-09 15:08 +0000
1# Copyright Quantinuum
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import itertools
16from collections import Counter, OrderedDict
17from collections.abc import Callable, Iterable
18from functools import lru_cache
19from math import ceil, log2
20from typing import cast
22import numpy as np
24from pytket.backends import Backend
25from pytket.backends.backendresult import BackendResult
26from pytket.circuit import Bit, CircBox, Circuit, Node, OpType, Qubit
27from pytket.passes import DecomposeBoxes, FlattenRegisters
28from pytket.utils.outcomearray import OutcomeArray
29from pytket.utils.results import CountsDict, StateTuple
31ParallelMeasures = list[dict[Qubit, Bit]]
34def compress_counts(
35 counts: dict[StateTuple, float], tol: float = 1e-6, round_to_int: bool = False
36) -> CountsDict:
37 """Filter counts to remove states that have a count value (which can be a
38 floating-point number) below a tolerance, and optionally round to an
39 integer.
41 :param counts: Input counts
42 :type counts: Dict[StateTuple, float]
43 :param tol: Value below which counts are pruned. Defaults to 1e-6.
44 :type tol: float, optional
45 :param round_to_int: Whether to round each count to an integer. Defaults to False.
46 :type round_to_int: bool, optional
48 :return: Filtered counts
49 :rtype: CountsDict
50 """
51 valprocess: Callable[[float], int | float] = lambda x: (
52 round(x) if round_to_int else x
53 )
54 processed_pairs = (
55 (key, valprocess(val)) for key, val in counts.items() if val > tol
56 )
57 return {key: val for key, val in processed_pairs if val > 0}
60@lru_cache(maxsize=128)
61def binary_to_int(bintuple: tuple[int]) -> int:
62 """Convert a binary tuple to corresponding integer, with most significant bit as
63 the first element of tuple.
65 :param bintuple: Binary tuple
66 :type bintuple: Tuple[int]
68 :return:
69 Integer :rtype: int
70 """
71 integer = 0
72 for index, bitset in enumerate(reversed(bintuple)):
73 if bitset:
74 integer |= 1 << index
75 return integer
78@lru_cache(maxsize=128)
79def int_to_binary(val: int, dim: int) -> tuple[int, ...]:
80 """Convert an integer to corresponding binary tuple, with most significant bit as
81 the first element of tuple.
83 :param val: input integer
84 :type val: int
85 :param dim: Bit width
86 :type dim: int
88 :return: Binary tuple of width dim
89 :rtype: Tuple[int, ...]
90 """
91 return tuple(map(int, format(val, f"0{dim}b")))
94#########################################
95### _compute_dot and helper functions ###
96###
97### With thanks to
98### https://math.stackexchange.com/a/3423910
99### and especially
100### https://gist.github.com/ahwillia/f65bc70cb30206d4eadec857b98c4065
101### on which this code is based.
102def _unfold(tens: np.ndarray, mode: int, dims: list[int]) -> np.ndarray:
103 """Unfolds tensor into matrix.
105 :param tens: Tensor with shape equivalent to dimensions
106 :type tens: np.ndarray
107 :param mode: Specifies axis move to front of matrix in unfolding of tensor
108 :type mode: int
109 :param dims: Gives shape of tensor passed
110 :type dims: List[int]
112 :return: Matrix with shape (dims[mode], prod(dims[/mode]))
113 :rtype: np.ndarray
114 """
115 if mode == 0:
116 return tens.reshape(dims[0], -1)
117 return np.moveaxis(tens, mode, 0).reshape(dims[mode], -1)
120def _refold(vec: np.ndarray, mode: int, dims: list[int]) -> np.ndarray:
121 """Refolds vector into tensor.
123 :param vec: Tensor with length equivalent to the product of dimensions given in
124 dims
125 :type vec: np.ndarray
126 :param mode: Axis tensor was unfolded along
127 :type mode: int
128 :param dims: Shape of tensor
129 :type dims: List[int]
131 :return: Tensor folded from vector with shape equivalent to given dimensions
132 :rtype: np.ndarray
133 """
134 if mode == 0:
135 return vec.reshape(dims)
136 # Reshape and then move dims[mode] back to its
137 # appropriate spot (undoing the `unfold` operation).
138 tens = vec.reshape([dims[mode]] + [d for m, d in enumerate(dims) if m != mode])
139 return np.moveaxis(tens, 0, mode)
142def _compute_dot(submatrices: Iterable[np.ndarray], vector: np.ndarray) -> np.ndarray:
143 """Multiplies the kronecker product of the given submatrices with given vector.
145 :param submatrices: Submatrices multiplied
146 :type submatrices: Iterable[np.ndarray]
147 :param vector: Vector multplied
148 :type vector: np.ndarray
150 :return: Kronecker product of arguments
151 :rtype: np.ndarray
152 """
153 dims = [A.shape[0] for A in submatrices]
154 vt = vector.reshape(dims)
155 for i, A in enumerate(submatrices):
156 vt = _refold(A @ _unfold(vt, i, dims), i, dims)
157 return vt.ravel()
160def _bayesian_iteration(
161 submatrices: Iterable[np.ndarray],
162 measurements: np.ndarray,
163 t: np.ndarray,
164 epsilon: float,
165) -> np.ndarray:
166 """Transforms T corresponds to a Bayesian iteration, used to modfiy
167 measurements.
169 :param submatrices: submatrices to be inverted and applied to measurements.
170 :type submatrices: Iterable[np.ndarray]
171 :param measurements: Probability distribution over set of states to be amended.
172 :type measurements: np.ndarray
173 :param t: Some transform to act on measurements.
174 :type t: np.ndarray
175 :param epsilon: A stabilization parameter to define an affine transformation for
176 application to submatrices, eliminating zero probabilities.
177 :type epsilon: float
179 :return: Transformed distribution vector.
180 :rtype: np.ndarray
181 """
182 # Transform t according to the Bayesian iteration
183 # The parameter epsilon is a stabilization parameter which defines an affine
184 # transformation to apply to the submatrices to eliminate zero probabilities. This
185 # transformation preserves the property that all columns sum to 1
186 if epsilon == 0: 186 ↛ 190line 186 didn't jump to line 190 because the condition on line 186 was always true
187 # avoid copying if we don't need to
188 As = submatrices
189 else:
190 As = [
191 epsilon / submatrix.shape[0] + (1 - epsilon) * submatrix
192 for submatrix in submatrices
193 ]
194 z = _compute_dot(As, t)
195 if np.isclose(z, 0).any(): 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 raise ZeroDivisionError
197 return cast(
198 "np.ndarray", t * _compute_dot([A.transpose() for A in As], measurements / z)
199 )
202def _bayesian_iterative_correct(
203 submatrices: Iterable[np.ndarray],
204 measurements: np.ndarray,
205 tol: float = 1e-5,
206 max_it: int | None = None,
207) -> np.ndarray:
208 """Finds new states to represent application of inversion of submatrices on
209 measurements. Converges when update states within tol range of previously
210 tested states.
212 :param submatrices: Matrices comprising the pure noise characterisation.
213 :type submatrices: Iterable[np.ndarray]
214 :param input_vector: Vector corresponding to some counts distribution.
215 :type input_vector: np.ndarray
216 :param tol: tolerance of closeness of found results
217 :type tol: float
218 :param max_it: Maximum number of inversions attempted to correct results.
219 :type max_it: int
220 """
221 # based on method found in https://arxiv.org/abs/1910.00129
223 vector_size = measurements.size
224 # uniform initial
225 true_states = np.full(vector_size, 1 / vector_size)
226 prev_true = true_states.copy()
227 converged = False
228 count = 0
229 epsilon: float = 0 # stabilization parameter, adjusted dynamically
230 while not converged:
231 if max_it:
232 if count >= max_it:
233 break
234 count += 1
235 try:
236 true_states = _bayesian_iteration(
237 submatrices, measurements, true_states, epsilon
238 )
239 converged = np.allclose(true_states, prev_true, atol=tol)
240 prev_true = true_states.copy()
241 except ZeroDivisionError:
242 # Shift the stabilization parameter up a bit (always < 0.5).
243 epsilon = 0.99 * epsilon + 0.01 * 0.5
245 return true_states
248def _reduce_matrix(indices_to_remove: list[int], matrix: np.ndarray) -> np.ndarray:
249 """Removes indices from indices_to_remove from binary associated to indexing of
250 matrix, producing a new transition matrix. To do so, it assigns all transition
251 probabilities as the given state in the remaining indices binary, with the removed
252 binary in state 0. This is an assumption on the noise made because it is likely
253 that unmeasured qubits will be in that state.
255 :param indices_to_remove: Binary index of state matrix is mapping to be removed.
256 :type indices_to_remove: List[int]
257 :param matrix: Transition matrix where indices correspond to some binary state.
258 :type matrix: np.ndarray
260 :return: Transition matrix with removed entries.
261 :rtype: np.ndarray
262 """
264 new_n_qubits = int(log2(matrix.shape[0])) - len(indices_to_remove)
265 if new_n_qubits == 0:
266 return np.array([])
267 bin_map = {}
268 mat_dim = 1 << new_n_qubits
269 for index in range(mat_dim):
270 # get current binary
271 bina = list(int_to_binary(index, new_n_qubits))
272 # add 0's to fetch old binary to set values from
273 for i in sorted(indices_to_remove):
274 bina.insert(i, 0)
275 # get index of values
276 bin_map[index] = binary_to_int(tuple(bina))
278 new_mat = np.zeros((mat_dim,) * 2, dtype=float)
279 for i in range(len(new_mat)):
280 old_row_index = bin_map[i]
281 for j in range(len(new_mat)):
282 old_col_index = bin_map[j]
283 new_mat[i, j] = matrix[old_row_index, old_col_index]
284 return new_mat
287def _reduce_matrices(
288 entries_to_remove: list[tuple[int, int]], matrices: list[np.ndarray]
289) -> list[np.ndarray]:
290 """Removes some dimensions from some matrices.
292 :param entries_to_remove: Via indexing, details dimensions to be removed.
293 :type entries_to_remove: List[Tuple[int, int]]
294 :param matrices: All matrices to have dimensions removed.
295 :type matrices: List[np.ndarray]
297 :return: Matrices with some dimensions removed.
298 :rtype: List[np.ndarray]
299 """
300 organise: dict[int, list] = {k: [] for k in range(len(matrices))}
301 for unused in entries_to_remove:
302 # unused[0] is index in matrices
303 # unused[1] is qubit index in matrix
304 organise[unused[0]].append(unused[1])
305 output_matrices = [_reduce_matrix(organise[m], matrices[m]) for m in organise]
306 return [
307 mat / np.sum(mat, axis=0) for mat in [x for x in output_matrices if len(x) != 0]
308 ]
311class SpamCorrecter:
312 """A class for generating "state preparation and measurement" (SPAM) calibration
313 experiments for ``pytket`` backends, and correcting counts generated from them.
315 Supports saving calibrated state to a dictionary format, and restoring from the
316 dictionary.
317 """
319 def __init__(self, qubit_subsets: list[list[Node]], backend: Backend | None = None):
320 """Construct a new `SpamCorrecter`.
322 :param qubit_subsets: A list of lists of correlated Nodes of an `Architecture`.
323 Qubits within the same list are assumed to only have SPAM errors correlated
324 with each other. Thus to allow SPAM errors between all qubits you should
325 provide a single list.
326 :type qubit_subsets: List[List[Node]]
327 :param backend: Backend on which the experiments are intended to be run
328 (optional). If provided, the qubits in `qubit_subsets` must be nodes in the
329 backend's associated `Architecture`. If not provided, it is assumed that the
330 experiment will be run on an `Architecture`with the nodes in
331 `qubit_subsets`, and furthermore that the intended architecture natively
332 supports X gates.
334 :raises ValueError: There are repeats in the `qubit_subsets` specification.
335 """
336 self.correlations = qubit_subsets
338 self.all_qbs = [qb for subset in qubit_subsets for qb in subset]
340 def to_tuple(inp: list[Node]) -> tuple:
341 return tuple(inp)
343 self.subsets_matrix_map = OrderedDict.fromkeys(
344 sorted(map(to_tuple, self.correlations), key=len, reverse=True)
345 )
346 # ordered from largest to smallest via OrderedDict & sorted
347 self.subset_dimensions = [len(subset) for subset in self.subsets_matrix_map]
349 if len(self.all_qbs) != len(set(self.all_qbs)): 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true
350 raise ValueError("Qubit subsets are not mutually disjoint.")
352 xcirc = Circuit(1).X(0)
353 if backend is not None: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true
354 if backend.backend_info is None:
355 raise ValueError("No architecture associated with backend.")
356 nodes = backend.backend_info.nodes
357 if not all(node in nodes for node in self.all_qbs):
358 raise ValueError("Nodes do not all belong to architecture.")
359 backend.default_compilation_pass().apply(xcirc)
360 FlattenRegisters().apply(xcirc)
362 self.xbox = CircBox(xcirc)
364 def calibration_circuits(self) -> list[Circuit]:
365 """Generate calibration circuits according to the specified correlations.
367 :return: A list of calibration circuits to be run on the machine. The circuits
368 should be processed without compilation. Results from these circuits must
369 be given back to this class (via the `calculate_matrices` method) in the
370 same order.
371 :rtype: List[Circuit]
372 """
374 major_state_dimensions = self.subset_dimensions[0]
375 n_circuits = 1 << major_state_dimensions
376 # output
377 self.prepared_circuits = []
378 self.state_infos = []
380 # set up base circuit for appending xbox to
381 base_circuit = Circuit()
382 c_reg = []
383 for index, qb in enumerate(self.all_qbs):
384 base_circuit.add_qubit(qb)
385 c_bit = Bit(index)
386 c_reg.append(c_bit)
387 base_circuit.add_bit(c_bit)
389 # generate state circuits for given correlations
390 for major_state_index in range(n_circuits):
391 state_circuit = base_circuit.copy()
392 # get bit string corresponding to basis state of biggest subset of qubits
393 major_state = int_to_binary(major_state_index, major_state_dimensions)
394 new_state_dicts = {}
395 # parallelise circuits, run uncorrelated subsets
396 # characterisation in parallel
397 for dim, qubits in zip(
398 self.subset_dimensions, self.subsets_matrix_map, strict=False
399 ):
400 # add state to prepared states
401 new_state_dicts[qubits] = major_state[:dim]
402 # find only qubits that are expected to be in 1 state,
403 # add xbox to given qubits
404 for flipped_qb in itertools.compress(qubits, major_state[:dim]):
405 state_circuit.add_circbox(self.xbox, [flipped_qb])
406 # Decompose boxes, add barriers to preserve circuit, add measures
407 DecomposeBoxes().apply(state_circuit)
408 for qb, cb in zip(self.all_qbs, c_reg, strict=False):
409 state_circuit.Measure(qb, cb)
411 # add to returned types
412 self.prepared_circuits.append(state_circuit)
413 self.state_infos.append((new_state_dicts, state_circuit.qubit_to_bit_map))
414 return self.prepared_circuits
416 def calculate_matrices(self, results_list: list[BackendResult]) -> None:
417 """Calculate the calibration matrices from the results of running calibration
418 circuits.
420 :param results_list: List of results from Backend. Must be in the same order as
421 the corresponding circuits generated by `calibration_circuits`.
422 :type counts_list: List[BackendResult]
424 :raises RuntimeError: Calibration circuits have not been generated yet.
425 """
426 if not self.state_infos: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 raise RuntimeError(
428 "Ensure calibration states/circuits have been calculated first."
429 )
431 counter = 0
432 self.node_index_dict: dict[Node, tuple[int, int]] = {}
434 for qbs, dim in zip(
435 self.subsets_matrix_map, self.subset_dimensions, strict=False
436 ):
437 # for a subset with n qubits, create a 2^n by 2^n matrix
438 self.subsets_matrix_map[qbs] = np.zeros((1 << dim,) * 2, dtype=float)
439 for i in range(len(qbs)):
440 qb = qbs[i]
441 self.node_index_dict[qb] = (counter, i)
442 counter += 1 # noqa: SIM113
444 for result, state_info in zip(results_list, self.state_infos, strict=False):
445 state_dict = state_info[0]
446 qb_bit_map = state_info[1]
447 for qb_sub in self.subsets_matrix_map:
448 # bits of counts to consider
449 bits = [qb_bit_map[q] for q in qb_sub]
450 counts_dict = result.get_counts(cbits=bits)
451 for measured_state, count in counts_dict.items():
452 # intended state
453 prepared_state_index = binary_to_int(state_dict[qb_sub])
454 # produced state
455 measured_state_index = binary_to_int(measured_state)
456 # update characterisation matrix
457 M = self.subsets_matrix_map[qb_sub]
458 assert type(M) is np.ndarray
459 M[measured_state_index, prepared_state_index] += count
461 # normalise everything
462 self.characterisation_matrices = [
463 mat / np.sum(cast("np.ndarray", mat), axis=0)
464 for mat in self.subsets_matrix_map.values()
465 ]
467 def get_parallel_measure(self, circuit: Circuit) -> ParallelMeasures:
468 """For a given circuit, produces and returns a ParallelMeasures object required
469 for correcting counts results.
471 :param circuit: Circuit with some Measure operations.
472 :type circuit: Circuit
474 :return: A list of dictionaries mapping Qubit to Bit where each separate
475 dictionary details some set of Measurement operations run in parallel.
476 :rtype: ParallelMeasures
477 """
478 parallel_measure = [circuit.qubit_to_bit_map]
479 # implies mid-circuit measurements, or that at least missing
480 # bits need to be checked for Measure operation
481 if len(parallel_measure[0]) != len(circuit.bits):
482 used_bits = set(parallel_measure[0].values())
483 for mc in circuit.commands_of_type(OpType.Measure):
484 bit = mc.bits[0]
485 if bit not in used_bits:
486 # mid-circuit measure, add as a separate parallel measure
487 parallel_measure.append({mc.qubits[0]: bit})
488 return parallel_measure
490 def correct_counts(
491 self,
492 result: BackendResult,
493 parallel_measures: ParallelMeasures,
494 method: str = "bayesian",
495 options: dict | None = None,
496 ) -> BackendResult:
497 """Modifies count distribution for result, such that the inversion of the pure
498 noise map represented by characterisation matrices is applied to it.
500 :param result: BackendResult object to be negated by pure noise object.
501 :type result: BackendResult
502 :param parallel_measures: Used to permute corresponding BackendResult object so
503 counts order matches noise characterisation and to amend characterisation
504 matrices to correct the right bits. SpamCorrecter.get_parallel_measure
505 returns the required object for a given circuit.
506 :type parallel_measures: ParallelMeasures
508 :raises ValueError: Measured qubit in result not characterised.
510 :return: A new result object with counts modified to reflect SPAM correction.
511 :rtype: BackendResult
512 """
513 # the correction process assumes that when passed a list of matrices
514 # and a distribution to correct, that the j rows of matrix i
515 # corrects for the i, i+1,...i+j states in the passed distribution
516 # given information of which bits are measured on which qubits from
517 # parallel_measures, the following first produces matrices such that
518 # this condition is true
520 char_bits_order = []
521 correction_matrices = []
523 for mapping in parallel_measures:
524 # _reduce_matrices removes given qubits corresponding entries from
525 # characterisation matrices
526 unused_qbs = set(self.all_qbs.copy())
527 for q in mapping:
528 # no q duplicates as mapping is dict from qubit to bit
529 if q not in unused_qbs: 529 ↛ 530line 529 didn't jump to line 530 because the condition on line 529 was never true
530 raise ValueError(
531 f"Measured qubit {q} is not characterised by SpamCorrecter"
532 )
533 unused_qbs.remove(q) # type:ignore[arg-type]
534 char_bits_order.append(mapping[q])
535 correction_matrices.extend(
536 _reduce_matrices(
537 [self.node_index_dict[q] for q in unused_qbs],
538 self.characterisation_matrices,
539 )
540 )
542 # get counts object for returning later
543 counts = result.get_counts(cbits=char_bits_order)
544 in_vec = np.zeros(1 << len(char_bits_order), dtype=float)
545 # turn from counts to probability distribution
546 for state, count in counts.items():
547 in_vec[binary_to_int(state)] = count
548 Ncounts = np.sum(in_vec)
549 in_vec_norm = in_vec / Ncounts
551 # with counts and characterisation matrices orders matching,
552 # correct distribution
553 if method == "invert":
554 try:
555 subinverts = [
556 np.linalg.inv(submatrix) for submatrix in correction_matrices
557 ]
558 except np.linalg.LinAlgError:
559 raise ValueError( # noqa: B904
560 "Unable to invert calibration matrix: please re-run "
561 "calibration experiments or use an alternative correction method."
562 )
563 # assumes that order of rows in flattened subinverts equals
564 # order of bits in input vector
565 outvec = _compute_dot(subinverts, in_vec_norm)
566 # The entries of v will always sum to 1, but they may not all
567 # be in the range [0,1]. In order to make them genuine
568 # probabilities (and thus generate meaningful counts),
569 # we adjust them by setting all negative values to 0 and scaling
570 # the remainder.
571 outvec[outvec < 0] = 0
572 outvec /= sum(outvec)
574 elif method == "bayesian":
575 if options is None:
576 options = {}
577 tol_val = options.get("tol", 1 / Ncounts)
578 maxit = options.get("maxiter", None)
579 outvec = _bayesian_iterative_correct(
580 correction_matrices, in_vec_norm, tol=tol_val, max_it=maxit
581 )
583 else:
584 valid_methods = ("invert", "bayesian")
585 raise ValueError("Method must be one of: ", *valid_methods)
587 outvec *= Ncounts
589 # counter object with binary from distribution
590 corrected_counts = {
591 int_to_binary(index, len(char_bits_order)): Bcount
592 for index, Bcount in enumerate(outvec)
593 }
594 counter = Counter(
595 {
596 OutcomeArray.from_readouts([key]): ceil(val)
597 for key, val in corrected_counts.items()
598 }
599 )
600 # produce and return BackendResult object
601 return BackendResult(counts=counter, c_bits=char_bits_order)
603 def to_dict(self) -> dict:
604 """Get calibration information as a dictionary.
606 :return: Dictionary output
607 :rtype: Dict
608 """
609 correlations = []
610 for subset in self.correlations:
611 correlations.append([(uid.reg_name, uid.index) for uid in subset]) # noqa: PERF401
613 node_index_hashable = [
614 ((uid.reg_name, uid.index), self.node_index_dict[uid])
615 for uid in self.node_index_dict
616 ]
617 char_matrices = [m.tolist() for m in self.characterisation_matrices]
618 return {
619 "correlations": correlations,
620 "node_index_dict": node_index_hashable,
621 "characterisation_matrices": char_matrices,
622 }
624 @classmethod
625 def from_dict(class_obj, d: dict) -> "SpamCorrecter":
626 """Build a `SpamCorrecter` instance from a dictionary in the format returned
627 by `to_dict`.
629 :return: Dictionary of calibration information.
630 :rtype: SpamCorrecter
631 """
632 new_inst = class_obj(
633 [
634 [Node(*pair)]
635 for subset_tuple in d["correlations"]
636 for pair in subset_tuple
637 ]
638 )
639 new_inst.node_index_dict = {
640 Node(*pair[0]): (int(pair[1][0]), int(pair[1][1]))
641 for pair in d["node_index_dict"]
642 }
643 new_inst.characterisation_matrices = [
644 np.array(m) for m in d["characterisation_matrices"]
645 ]
646 return new_inst