Coverage for /home/runner/work/tket/tket/pytket/pytket/utils/spam.py: 91%
221 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-02 12:44 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-02 12:44 +0000
1# Copyright Quantinuum
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import itertools
16from collections import Counter, OrderedDict
17from collections.abc import Callable, Iterable
18from functools import lru_cache
19from math import ceil, log2
20from typing import cast
22import numpy as np
24from pytket.backends import Backend
25from pytket.backends.backendresult import BackendResult
26from pytket.circuit import Bit, CircBox, Circuit, Node, OpType, Qubit
27from pytket.passes import DecomposeBoxes, FlattenRegisters
28from pytket.utils.outcomearray import OutcomeArray
31def compress_counts(
32 counts: dict[tuple[int, ...], float], tol: float = 1e-6, round_to_int: bool = False
33) -> dict[tuple[int, ...], int | float]:
34 """Filter counts to remove states that have a count value (which can be a
35 floating-point number) below a tolerance, and optionally round to an
36 integer.
38 :param counts: Input counts
39 :param tol: Value below which counts are pruned. Defaults to 1e-6.
40 :param round_to_int: Whether to round each count to an integer. Defaults to False.
42 :return: Filtered counts
43 """
44 valprocess: Callable[[float], int | float] = lambda x: (
45 round(x) if round_to_int else x
46 )
47 processed_pairs = (
48 (key, valprocess(val)) for key, val in counts.items() if val > tol
49 )
50 return {key: val for key, val in processed_pairs if val > 0}
53@lru_cache(maxsize=128)
54def binary_to_int(bintuple: tuple[int]) -> int:
55 """Convert a binary tuple to corresponding integer, with most significant bit as
56 the first element of tuple.
58 :param bintuple: Binary tuple
60 :return: Integer
61 """
62 integer = 0
63 for index, bitset in enumerate(reversed(bintuple)):
64 if bitset:
65 integer |= 1 << index
66 return integer
69@lru_cache(maxsize=128)
70def int_to_binary(val: int, dim: int) -> tuple[int, ...]:
71 """Convert an integer to corresponding binary tuple, with most significant bit as
72 the first element of tuple.
74 :param val: input integer
75 :param dim: Bit width
77 :return: Binary tuple of width dim
78 """
79 return tuple(map(int, format(val, f"0{dim}b")))
82#########################################
83### _compute_dot and helper functions ###
84###
85### With thanks to
86### https://math.stackexchange.com/a/3423910
87### and especially
88### https://gist.github.com/ahwillia/f65bc70cb30206d4eadec857b98c4065
89### on which this code is based.
90def _unfold(tens: np.ndarray, mode: int, dims: list[int]) -> np.ndarray:
91 """Unfolds tensor into matrix.
93 :param tens: Tensor with shape equivalent to dimensions
94 :param mode: Specifies axis move to front of matrix in unfolding of tensor
95 :param dims: Gives shape of tensor passed
97 :return: Matrix with shape (dims[mode], prod(dims[/mode]))
98 """
99 if mode == 0:
100 return tens.reshape(dims[0], -1)
101 return np.moveaxis(tens, mode, 0).reshape(dims[mode], -1)
104def _refold(vec: np.ndarray, mode: int, dims: list[int]) -> np.ndarray:
105 """Refolds vector into tensor.
107 :param vec: Tensor with length equivalent to the product of dimensions given in
108 dims
109 :param mode: Axis tensor was unfolded along
110 :param dims: Shape of tensor
112 :return: Tensor folded from vector with shape equivalent to given dimensions
113 """
114 if mode == 0:
115 return vec.reshape(dims)
116 # Reshape and then move dims[mode] back to its
117 # appropriate spot (undoing the `unfold` operation).
118 tens = vec.reshape([dims[mode]] + [d for m, d in enumerate(dims) if m != mode])
119 return np.moveaxis(tens, 0, mode)
122def _compute_dot(submatrices: Iterable[np.ndarray], vector: np.ndarray) -> np.ndarray:
123 """Multiplies the kronecker product of the given submatrices with given vector.
125 :param submatrices: Submatrices multiplied
126 :param vector: Vector multplied
128 :return: Kronecker product of arguments
129 """
130 dims = [A.shape[0] for A in submatrices]
131 vt = vector.reshape(dims)
132 for i, A in enumerate(submatrices):
133 vt = _refold(A @ _unfold(vt, i, dims), i, dims)
134 return vt.ravel()
137def _bayesian_iteration(
138 submatrices: Iterable[np.ndarray],
139 measurements: np.ndarray,
140 t: np.ndarray,
141 epsilon: float,
142) -> np.ndarray:
143 """Transforms T corresponds to a Bayesian iteration, used to modfiy
144 measurements.
146 :param submatrices: submatrices to be inverted and applied to measurements.
147 :param measurements: Probability distribution over set of states to be amended.
148 :param t: Some transform to act on measurements.
149 :param epsilon: A stabilization parameter to define an affine transformation for
150 application to submatrices, eliminating zero probabilities.
152 :return: Transformed distribution vector.
153 """
154 # Transform t according to the Bayesian iteration
155 # The parameter epsilon is a stabilization parameter which defines an affine
156 # transformation to apply to the submatrices to eliminate zero probabilities. This
157 # transformation preserves the property that all columns sum to 1
158 if epsilon == 0: 158 ↛ 162line 158 didn't jump to line 162 because the condition on line 158 was always true
159 # avoid copying if we don't need to
160 As = submatrices
161 else:
162 As = [
163 epsilon / submatrix.shape[0] + (1 - epsilon) * submatrix
164 for submatrix in submatrices
165 ]
166 z = _compute_dot(As, t)
167 if np.isclose(z, 0).any(): 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 raise ZeroDivisionError
169 return cast(
170 "np.ndarray", t * _compute_dot([A.transpose() for A in As], measurements / z)
171 )
174def _bayesian_iterative_correct(
175 submatrices: Iterable[np.ndarray],
176 measurements: np.ndarray,
177 tol: float = 1e-5,
178 max_it: int | None = None,
179) -> np.ndarray:
180 """Finds new states to represent application of inversion of submatrices on
181 measurements. Converges when update states within tol range of previously
182 tested states.
184 :param submatrices: Matrices comprising the pure noise characterisation.
185 :param input_vector: Vector corresponding to some counts distribution.
186 :param tol: tolerance of closeness of found results
187 :param max_it: Maximum number of inversions attempted to correct results.
188 """
189 # based on method found in https://arxiv.org/abs/1910.00129
191 vector_size = measurements.size
192 # uniform initial
193 true_states = np.full(vector_size, 1 / vector_size)
194 prev_true = true_states.copy()
195 converged = False
196 count = 0
197 epsilon: float = 0 # stabilization parameter, adjusted dynamically
198 while not converged:
199 if max_it:
200 if count >= max_it:
201 break
202 count += 1
203 try:
204 true_states = _bayesian_iteration(
205 submatrices, measurements, true_states, epsilon
206 )
207 converged = np.allclose(true_states, prev_true, atol=tol)
208 prev_true = true_states.copy()
209 except ZeroDivisionError:
210 # Shift the stabilization parameter up a bit (always < 0.5).
211 epsilon = 0.99 * epsilon + 0.01 * 0.5
213 return true_states
216def _reduce_matrix(indices_to_remove: list[int], matrix: np.ndarray) -> np.ndarray:
217 """Removes indices from indices_to_remove from binary associated to indexing of
218 matrix, producing a new transition matrix. To do so, it assigns all transition
219 probabilities as the given state in the remaining indices binary, with the removed
220 binary in state 0. This is an assumption on the noise made because it is likely
221 that unmeasured qubits will be in that state.
223 :param indices_to_remove: Binary index of state matrix is mapping to be removed.
224 :param matrix: Transition matrix where indices correspond to some binary state.
226 :return: Transition matrix with removed entries.
227 """
229 new_n_qubits = int(log2(matrix.shape[0])) - len(indices_to_remove)
230 if new_n_qubits == 0:
231 return np.array([])
232 bin_map = {}
233 mat_dim = 1 << new_n_qubits
234 for index in range(mat_dim):
235 # get current binary
236 bina = list(int_to_binary(index, new_n_qubits))
237 # add 0's to fetch old binary to set values from
238 for i in sorted(indices_to_remove):
239 bina.insert(i, 0)
240 # get index of values
241 bin_map[index] = binary_to_int(tuple(bina))
243 new_mat = np.zeros((mat_dim,) * 2, dtype=float)
244 for i in range(len(new_mat)):
245 old_row_index = bin_map[i]
246 for j in range(len(new_mat)):
247 old_col_index = bin_map[j]
248 new_mat[i, j] = matrix[old_row_index, old_col_index]
249 return new_mat
252def _reduce_matrices(
253 entries_to_remove: list[tuple[int, int]], matrices: list[np.ndarray]
254) -> list[np.ndarray]:
255 """Removes some dimensions from some matrices.
257 :param entries_to_remove: Via indexing, details dimensions to be removed.
258 :param matrices: All matrices to have dimensions removed.
260 :return: Matrices with some dimensions removed.
261 """
262 organise: dict[int, list] = {k: [] for k in range(len(matrices))}
263 for unused in entries_to_remove:
264 # unused[0] is index in matrices
265 # unused[1] is qubit index in matrix
266 organise[unused[0]].append(unused[1])
267 output_matrices = [_reduce_matrix(organise[m], matrices[m]) for m in organise]
268 return [
269 mat / np.sum(mat, axis=0) for mat in [x for x in output_matrices if len(x) != 0]
270 ]
273class SpamCorrecter:
274 """A class for generating "state preparation and measurement" (SPAM) calibration
275 experiments for ``pytket`` backends, and correcting counts generated from them.
277 Supports saving calibrated state to a dictionary format, and restoring from the
278 dictionary.
279 """
281 def __init__(self, qubit_subsets: list[list[Node]], backend: Backend | None = None):
282 """Construct a new `SpamCorrecter`.
284 :param qubit_subsets: A list of lists of correlated Nodes of an `Architecture`.
285 Qubits within the same list are assumed to only have SPAM errors correlated
286 with each other. Thus to allow SPAM errors between all qubits you should
287 provide a single list.
288 :param backend: Backend on which the experiments are intended to be run
289 (optional). If provided, the qubits in `qubit_subsets` must be nodes in the
290 backend's associated `Architecture`. If not provided, it is assumed that the
291 experiment will be run on an `Architecture`with the nodes in
292 `qubit_subsets`, and furthermore that the intended architecture natively
293 supports X gates.
295 :raises ValueError: There are repeats in the `qubit_subsets` specification.
296 """
297 self.correlations = qubit_subsets
299 self.all_qbs = [qb for subset in qubit_subsets for qb in subset]
301 def to_tuple(inp: list[Node]) -> tuple:
302 return tuple(inp)
304 self.subsets_matrix_map = OrderedDict.fromkeys(
305 sorted(map(to_tuple, self.correlations), key=len, reverse=True)
306 )
307 # ordered from largest to smallest via OrderedDict & sorted
308 self.subset_dimensions = [len(subset) for subset in self.subsets_matrix_map]
310 if len(self.all_qbs) != len(set(self.all_qbs)): 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true
311 raise ValueError("Qubit subsets are not mutually disjoint.")
313 xcirc = Circuit(1).X(0)
314 if backend is not None: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true
315 if backend.backend_info is None:
316 raise ValueError("No architecture associated with backend.")
317 nodes = backend.backend_info.nodes
318 if not all(node in nodes for node in self.all_qbs):
319 raise ValueError("Nodes do not all belong to architecture.")
320 backend.default_compilation_pass().apply(xcirc)
321 FlattenRegisters().apply(xcirc)
323 self.xbox = CircBox(xcirc)
325 def calibration_circuits(self) -> list[Circuit]:
326 """Generate calibration circuits according to the specified correlations.
328 :return: A list of calibration circuits to be run on the machine. The circuits
329 should be processed without compilation. Results from these circuits must
330 be given back to this class (via the `calculate_matrices` method) in the
331 same order.
332 """
334 major_state_dimensions = self.subset_dimensions[0]
335 n_circuits = 1 << major_state_dimensions
336 # output
337 self.prepared_circuits = []
338 self.state_infos = []
340 # set up base circuit for appending xbox to
341 base_circuit = Circuit()
342 c_reg = []
343 for index, qb in enumerate(self.all_qbs):
344 base_circuit.add_qubit(qb)
345 c_bit = Bit(index)
346 c_reg.append(c_bit)
347 base_circuit.add_bit(c_bit)
349 # generate state circuits for given correlations
350 for major_state_index in range(n_circuits):
351 state_circuit = base_circuit.copy()
352 # get bit string corresponding to basis state of biggest subset of qubits
353 major_state = int_to_binary(major_state_index, major_state_dimensions)
354 new_state_dicts = {}
355 # parallelise circuits, run uncorrelated subsets
356 # characterisation in parallel
357 for dim, qubits in zip(
358 self.subset_dimensions, self.subsets_matrix_map, strict=False
359 ):
360 # add state to prepared states
361 new_state_dicts[qubits] = major_state[:dim]
362 # find only qubits that are expected to be in 1 state,
363 # add xbox to given qubits
364 for flipped_qb in itertools.compress(qubits, major_state[:dim]):
365 state_circuit.add_circbox(self.xbox, [flipped_qb])
366 # Decompose boxes, add barriers to preserve circuit, add measures
367 DecomposeBoxes().apply(state_circuit)
368 for qb, cb in zip(self.all_qbs, c_reg, strict=False):
369 state_circuit.Measure(qb, cb)
371 # add to returned types
372 self.prepared_circuits.append(state_circuit)
373 self.state_infos.append((new_state_dicts, state_circuit.qubit_to_bit_map))
374 return self.prepared_circuits
376 def calculate_matrices(self, results_list: list[BackendResult]) -> None:
377 """Calculate the calibration matrices from the results of running calibration
378 circuits.
380 :param results_list: List of results from Backend. Must be in the same order as
381 the corresponding circuits generated by `calibration_circuits`.
383 :raises RuntimeError: Calibration circuits have not been generated yet.
384 """
385 if not self.state_infos: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true
386 raise RuntimeError(
387 "Ensure calibration states/circuits have been calculated first."
388 )
390 counter = 0
391 self.node_index_dict: dict[Node, tuple[int, int]] = {}
393 for qbs, dim in zip(
394 self.subsets_matrix_map, self.subset_dimensions, strict=False
395 ):
396 # for a subset with n qubits, create a 2^n by 2^n matrix
397 self.subsets_matrix_map[qbs] = np.zeros((1 << dim,) * 2, dtype=float)
398 for i in range(len(qbs)):
399 qb = qbs[i]
400 self.node_index_dict[qb] = (counter, i)
401 counter += 1 # noqa: SIM113
403 for result, state_info in zip(results_list, self.state_infos, strict=False):
404 state_dict = state_info[0]
405 qb_bit_map = state_info[1]
406 for qb_sub in self.subsets_matrix_map:
407 # bits of counts to consider
408 bits = [qb_bit_map[q] for q in qb_sub]
409 counts_dict = result.get_counts(cbits=bits)
410 for measured_state, count in counts_dict.items():
411 # intended state
412 prepared_state_index = binary_to_int(state_dict[qb_sub])
413 # produced state
414 measured_state_index = binary_to_int(measured_state)
415 # update characterisation matrix
416 M = self.subsets_matrix_map[qb_sub]
417 assert type(M) is np.ndarray
418 M[measured_state_index, prepared_state_index] += count
420 # normalise everything
421 self.characterisation_matrices = [
422 mat / np.sum(cast("np.ndarray", mat), axis=0)
423 for mat in self.subsets_matrix_map.values()
424 ]
426 def get_parallel_measure(self, circuit: Circuit) -> list[dict[Qubit, Bit]]:
427 """For a given circuit, produces and returns a `list[dict[Qubit, Bit]]` object required
428 for correcting counts results.
430 :param circuit: Circuit with some Measure operations.
432 :return: A list of dictionaries mapping Qubit to Bit where each separate
433 dictionary details some set of Measurement operations run in parallel.
434 """
435 parallel_measure = [circuit.qubit_to_bit_map]
436 # implies mid-circuit measurements, or that at least missing
437 # bits need to be checked for Measure operation
438 if len(parallel_measure[0]) != len(circuit.bits):
439 used_bits = set(parallel_measure[0].values())
440 for mc in circuit.commands_of_type(OpType.Measure):
441 bit = mc.bits[0]
442 if bit not in used_bits:
443 # mid-circuit measure, add as a separate parallel measure
444 parallel_measure.append({mc.qubits[0]: bit})
445 return parallel_measure
447 def correct_counts(
448 self,
449 result: BackendResult,
450 parallel_measures: list[dict[Qubit, Bit]],
451 method: str = "bayesian",
452 options: dict | None = None,
453 ) -> BackendResult:
454 """Modifies count distribution for result, such that the inversion of the pure
455 noise map represented by characterisation matrices is applied to it.
457 :param result: BackendResult object to be negated by pure noise object.
458 :param parallel_measures: Used to permute corresponding BackendResult object so
459 counts order matches noise characterisation and to amend characterisation
460 matrices to correct the right bits. SpamCorrecter.get_parallel_measure
461 returns the required object for a given circuit.
463 :raises ValueError: Measured qubit in result not characterised.
465 :return: A new result object with counts modified to reflect SPAM correction.
466 """
467 # the correction process assumes that when passed a list of matrices
468 # and a distribution to correct, that the j rows of matrix i
469 # corrects for the i, i+1,...i+j states in the passed distribution
470 # given information of which bits are measured on which qubits from
471 # parallel_measures, the following first produces matrices such that
472 # this condition is true
474 char_bits_order = []
475 correction_matrices = []
477 for mapping in parallel_measures:
478 # _reduce_matrices removes given qubits corresponding entries from
479 # characterisation matrices
480 unused_qbs = set(self.all_qbs.copy())
481 for q in mapping:
482 # no q duplicates as mapping is dict from qubit to bit
483 if q not in unused_qbs: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 raise ValueError(
485 f"Measured qubit {q} is not characterised by SpamCorrecter"
486 )
487 unused_qbs.remove(q) # type:ignore[arg-type]
488 char_bits_order.append(mapping[q])
489 correction_matrices.extend(
490 _reduce_matrices(
491 [self.node_index_dict[q] for q in unused_qbs],
492 self.characterisation_matrices,
493 )
494 )
496 # get counts object for returning later
497 counts = result.get_counts(cbits=char_bits_order)
498 in_vec = np.zeros(1 << len(char_bits_order), dtype=float)
499 # turn from counts to probability distribution
500 for state, count in counts.items():
501 in_vec[binary_to_int(state)] = count
502 Ncounts = np.sum(in_vec)
503 in_vec_norm = in_vec / Ncounts
505 # with counts and characterisation matrices orders matching,
506 # correct distribution
507 if method == "invert":
508 try:
509 subinverts = [
510 np.linalg.inv(submatrix) for submatrix in correction_matrices
511 ]
512 except np.linalg.LinAlgError:
513 raise ValueError( # noqa: B904
514 "Unable to invert calibration matrix: please re-run "
515 "calibration experiments or use an alternative correction method."
516 )
517 # assumes that order of rows in flattened subinverts equals
518 # order of bits in input vector
519 outvec = _compute_dot(subinverts, in_vec_norm)
520 # The entries of v will always sum to 1, but they may not all
521 # be in the range [0,1]. In order to make them genuine
522 # probabilities (and thus generate meaningful counts),
523 # we adjust them by setting all negative values to 0 and scaling
524 # the remainder.
525 outvec[outvec < 0] = 0
526 outvec /= sum(outvec)
528 elif method == "bayesian":
529 if options is None:
530 options = {}
531 tol_val = options.get("tol", 1 / Ncounts)
532 maxit = options.get("maxiter", None)
533 outvec = _bayesian_iterative_correct(
534 correction_matrices, in_vec_norm, tol=tol_val, max_it=maxit
535 )
537 else:
538 valid_methods = ("invert", "bayesian")
539 raise ValueError("Method must be one of: ", *valid_methods)
541 outvec *= Ncounts
543 # counter object with binary from distribution
544 corrected_counts = {
545 int_to_binary(index, len(char_bits_order)): Bcount
546 for index, Bcount in enumerate(outvec)
547 }
548 counter = Counter(
549 {
550 OutcomeArray.from_readouts([key]): ceil(val)
551 for key, val in corrected_counts.items()
552 }
553 )
554 # produce and return BackendResult object
555 return BackendResult(counts=counter, c_bits=char_bits_order)
557 def to_dict(self) -> dict:
558 """Get calibration information as a dictionary.
560 :return: Dictionary output
561 """
562 correlations = []
563 for subset in self.correlations:
564 correlations.append( # noqa: PERF401
565 [(uid.reg_name, uid.index) for uid in subset]
566 )
568 node_index_hashable = [
569 ((uid.reg_name, uid.index), self.node_index_dict[uid])
570 for uid in self.node_index_dict
571 ]
572 char_matrices = [m.tolist() for m in self.characterisation_matrices]
573 return {
574 "correlations": correlations,
575 "node_index_dict": node_index_hashable,
576 "characterisation_matrices": char_matrices,
577 }
579 @classmethod
580 def from_dict(class_obj, d: dict) -> "SpamCorrecter":
581 """Build a `SpamCorrecter` instance from a dictionary in the format returned
582 by `to_dict`.
584 :return: Dictionary of calibration information.
585 """
586 new_inst = class_obj(
587 [
588 [Node(*pair)]
589 for subset_tuple in d["correlations"]
590 for pair in subset_tuple
591 ]
592 )
593 new_inst.node_index_dict = {
594 Node(*pair[0]): (int(pair[1][0]), int(pair[1][1]))
595 for pair in d["node_index_dict"]
596 }
597 new_inst.characterisation_matrices = [
598 np.array(m) for m in d["characterisation_matrices"]
599 ]
600 return new_inst