Coverage for /home/runner/work/tket/tket/pytket/pytket/utils/spam.py: 92%
223 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-14 11:30 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-14 11:30 +0000
1# Copyright Quantinuum
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import itertools
16from collections import Counter, OrderedDict
17from collections.abc import Callable, Iterable
18from functools import lru_cache
19from math import ceil, log2
20from typing import cast
22import numpy as np
24from pytket.backends import Backend
25from pytket.backends.backendresult import BackendResult
26from pytket.circuit import Bit, CircBox, Circuit, Node, OpType, Qubit
27from pytket.passes import DecomposeBoxes, FlattenRegisters
28from pytket.utils.outcomearray import OutcomeArray
29from pytket.utils.results import CountsDict, StateTuple
31ParallelMeasures = list[dict[Qubit, Bit]]
34def compress_counts(
35 counts: dict[StateTuple, float], tol: float = 1e-6, round_to_int: bool = False
36) -> CountsDict:
37 """Filter counts to remove states that have a count value (which can be a
38 floating-point number) below a tolerance, and optionally round to an
39 integer.
41 :param counts: Input counts
42 :type counts: Dict[StateTuple, float]
43 :param tol: Value below which counts are pruned. Defaults to 1e-6.
44 :type tol: float, optional
45 :param round_to_int: Whether to round each count to an integer. Defaults to False.
46 :type round_to_int: bool, optional
48 :return: Filtered counts
49 :rtype: CountsDict
50 """
51 valprocess: Callable[[float], int | float] = lambda x: (
52 int(round(x)) if round_to_int else x
53 )
54 processed_pairs = (
55 (key, valprocess(val)) for key, val in counts.items() if val > tol
56 )
57 return {key: val for key, val in processed_pairs if val > 0}
60@lru_cache(maxsize=128)
61def binary_to_int(bintuple: tuple[int]) -> int:
62 """Convert a binary tuple to corresponding integer, with most significant bit as
63 the first element of tuple.
65 :param bintuple: Binary tuple
66 :type bintuple: Tuple[int]
68 :return:
69 Integer :rtype: int
70 """
71 integer = 0
72 for index, bitset in enumerate(reversed(bintuple)):
73 if bitset:
74 integer |= 1 << index
75 return integer
78@lru_cache(maxsize=128)
79def int_to_binary(val: int, dim: int) -> tuple[int, ...]:
80 """Convert an integer to corresponding binary tuple, with most significant bit as
81 the first element of tuple.
83 :param val: input integer
84 :type val: int
85 :param dim: Bit width
86 :type dim: int
88 :return: Binary tuple of width dim
89 :rtype: Tuple[int, ...]
90 """
91 return tuple(map(int, format(val, f"0{dim}b")))
94#########################################
95### _compute_dot and helper functions ###
96###
97### With thanks to
98### https://math.stackexchange.com/a/3423910
99### and especially
100### https://gist.github.com/ahwillia/f65bc70cb30206d4eadec857b98c4065
101### on which this code is based.
102def _unfold(tens: np.ndarray, mode: int, dims: list[int]) -> np.ndarray:
103 """Unfolds tensor into matrix.
105 :param tens: Tensor with shape equivalent to dimensions
106 :type tens: np.ndarray
107 :param mode: Specifies axis move to front of matrix in unfolding of tensor
108 :type mode: int
109 :param dims: Gives shape of tensor passed
110 :type dims: List[int]
112 :return: Matrix with shape (dims[mode], prod(dims[/mode]))
113 :rtype: np.ndarray
114 """
115 if mode == 0:
116 return tens.reshape(dims[0], -1)
117 return np.moveaxis(tens, mode, 0).reshape(dims[mode], -1)
120def _refold(vec: np.ndarray, mode: int, dims: list[int]) -> np.ndarray:
121 """Refolds vector into tensor.
123 :param vec: Tensor with length equivalent to the product of dimensions given in
124 dims
125 :type vec: np.ndarray
126 :param mode: Axis tensor was unfolded along
127 :type mode: int
128 :param dims: Shape of tensor
129 :type dims: List[int]
131 :return: Tensor folded from vector with shape equivalent to given dimensions
132 :rtype: np.ndarray
133 """
134 if mode == 0:
135 return vec.reshape(dims)
136 # Reshape and then move dims[mode] back to its
137 # appropriate spot (undoing the `unfold` operation).
138 tens = vec.reshape([dims[mode]] + [d for m, d in enumerate(dims) if m != mode])
139 return np.moveaxis(tens, 0, mode)
142def _compute_dot(submatrices: Iterable[np.ndarray], vector: np.ndarray) -> np.ndarray:
143 """Multiplies the kronecker product of the given submatrices with given vector.
145 :param submatrices: Submatrices multiplied
146 :type submatrices: Iterable[np.ndarray]
147 :param vector: Vector multplied
148 :type vector: np.ndarray
150 :return: Kronecker product of arguments
151 :rtype: np.ndarray
152 """
153 dims = [A.shape[0] for A in submatrices]
154 vt = vector.reshape(dims)
155 for i, A in enumerate(submatrices):
156 vt = _refold(A @ _unfold(vt, i, dims), i, dims)
157 return vt.ravel()
160def _bayesian_iteration(
161 submatrices: Iterable[np.ndarray],
162 measurements: np.ndarray,
163 t: np.ndarray,
164 epsilon: float,
165) -> np.ndarray:
166 """Transforms T corresponds to a Bayesian iteration, used to modfiy
167 measurements.
169 :param submatrices: submatrices to be inverted and applied to measurements.
170 :type submatrices: Iterable[np.ndarray]
171 :param measurements: Probability distribution over set of states to be amended.
172 :type measurements: np.ndarray
173 :param t: Some transform to act on measurements.
174 :type t: np.ndarray
175 :param epsilon: A stabilization parameter to define an affine transformation for
176 application to submatrices, eliminating zero probabilities.
177 :type epsilon: float
179 :return: Transformed distribution vector.
180 :rtype: np.ndarray
181 """
182 # Transform t according to the Bayesian iteration
183 # The parameter epsilon is a stabilization parameter which defines an affine
184 # transformation to apply to the submatrices to eliminate zero probabilities. This
185 # transformation preserves the property that all columns sum to 1
186 if epsilon == 0: 186 ↛ 190line 186 didn't jump to line 190 because the condition on line 186 was always true
187 # avoid copying if we don't need to
188 As = submatrices
189 else:
190 As = [
191 epsilon / submatrix.shape[0] + (1 - epsilon) * submatrix
192 for submatrix in submatrices
193 ]
194 z = _compute_dot(As, t)
195 if np.isclose(z, 0).any(): 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 raise ZeroDivisionError
197 return cast(
198 np.ndarray, t * _compute_dot([A.transpose() for A in As], measurements / z)
199 )
202def _bayesian_iterative_correct(
203 submatrices: Iterable[np.ndarray],
204 measurements: np.ndarray,
205 tol: float = 1e-5,
206 max_it: int | None = None,
207) -> np.ndarray:
208 """Finds new states to represent application of inversion of submatrices on
209 measurements. Converges when update states within tol range of previously
210 tested states.
212 :param submatrices: Matrices comprising the pure noise characterisation.
213 :type submatrices: Iterable[np.ndarray]
214 :param input_vector: Vector corresponding to some counts distribution.
215 :type input_vector: np.ndarray
216 :param tol: tolerance of closeness of found results
217 :type tol: float
218 :param max_it: Maximum number of inversions attempted to correct results.
219 :type max_it: int
220 """
221 # based on method found in https://arxiv.org/abs/1910.00129
223 vector_size = measurements.size
224 # uniform initial
225 true_states = np.full(vector_size, 1 / vector_size)
226 prev_true = true_states.copy()
227 converged = False
228 count = 0
229 epsilon: float = 0 # stabilization parameter, adjusted dynamically
230 while not converged:
231 if max_it:
232 if count >= max_it:
233 break
234 count += 1
235 try:
236 true_states = _bayesian_iteration(
237 submatrices, measurements, true_states, epsilon
238 )
239 converged = np.allclose(true_states, prev_true, atol=tol)
240 prev_true = true_states.copy()
241 except ZeroDivisionError:
242 # Shift the stabilization parameter up a bit (always < 0.5).
243 epsilon = 0.99 * epsilon + 0.01 * 0.5
245 return true_states
248def reduce_matrix(indices_to_remove: list[int], matrix: np.ndarray) -> np.ndarray:
249 """Removes indices from indices_to_remove from binary associated to indexing of
250 matrix, producing a new transition matrix. To do so, it assigns all transition
251 probabilities as the given state in the remaining indices binary, with the removed
252 binary in state 0. This is an assumption on the noise made because it is likely
253 that unmeasured qubits will be in that state.
255 :param indices_to_remove: Binary index of state matrix is mapping to be removed.
256 :type indices_to_remove: List[int]
257 :param matrix: Transition matrix where indices correspond to some binary state.
258 :type matrix: np.ndarray
260 :return: Transition matrix with removed entries.
261 :rtype: np.ndarray
262 """
264 new_n_qubits = int(log2(matrix.shape[0])) - len(indices_to_remove)
265 if new_n_qubits == 0:
266 return np.array([])
267 bin_map = dict()
268 mat_dim = 1 << new_n_qubits
269 for index in range(mat_dim):
270 # get current binary
271 bina = list(int_to_binary(index, new_n_qubits))
272 # add 0's to fetch old binary to set values from
273 for i in sorted(indices_to_remove):
274 bina.insert(i, 0)
275 # get index of values
276 bin_map[index] = binary_to_int(tuple(bina))
278 new_mat = np.zeros((mat_dim,) * 2, dtype=float)
279 for i in range(len(new_mat)):
280 old_row_index = bin_map[i]
281 for j in range(len(new_mat)):
282 old_col_index = bin_map[j]
283 new_mat[i, j] = matrix[old_row_index, old_col_index]
284 return new_mat
287def reduce_matrices(
288 entries_to_remove: list[tuple[int, int]], matrices: list[np.ndarray]
289) -> list[np.ndarray]:
290 """Removes some dimensions from some matrices.
292 :param entries_to_remove: Via indexing, details dimensions to be removed.
293 :type entries_to_remove: List[Tuple[int, int]]
294 :param matrices: All matrices to have dimensions removed.
295 :type matrices: List[np.ndarray]
297 :return: Matrices with some dimensions removed.
298 :rtype: List[np.ndarray]
299 """
300 organise: dict[int, list] = dict({k: [] for k in range(len(matrices))})
301 for unused in entries_to_remove:
302 # unused[0] is index in matrices
303 # unused[1] is qubit index in matrix
304 organise[unused[0]].append(unused[1])
305 output_matrices = [reduce_matrix(organise[m], matrices[m]) for m in organise]
306 return [
307 mat / np.sum(mat, axis=0) for mat in [x for x in output_matrices if len(x) != 0]
308 ]
311class SpamCorrecter:
312 """A class for generating "state preparation and measurement" (SPAM) calibration
313 experiments for ``pytket`` backends, and correcting counts generated from them.
315 Supports saving calibrated state to a dictionary format, and restoring from the
316 dictionary.
317 """
319 def __init__(self, qubit_subsets: list[list[Node]], backend: Backend | None = None):
320 """Construct a new `SpamCorrecter`.
322 :param qubit_subsets: A list of lists of correlated Nodes of an `Architecture`.
323 Qubits within the same list are assumed to only have SPAM errors correlated
324 with each other. Thus to allow SPAM errors between all qubits you should
325 provide a single list.
326 :type qubit_subsets: List[List[Node]]
327 :param backend: Backend on which the experiments are intended to be run
328 (optional). If provided, the qubits in `qubit_subsets` must be nodes in the
329 backend's associated `Architecture`. If not provided, it is assumed that the
330 experiment will be run on an `Architecture`with the nodes in
331 `qubit_subsets`, and furthermore that the intended architecture natively
332 supports X gates.
334 :raises ValueError: There are repeats in the `qubit_subsets` specification.
335 """
336 self.correlations = qubit_subsets
338 self.all_qbs = [qb for subset in qubit_subsets for qb in subset]
340 def to_tuple(inp: list[Node]) -> tuple:
341 return tuple(inp)
343 self.subsets_matrix_map = OrderedDict.fromkeys(
344 sorted(map(to_tuple, self.correlations), key=len, reverse=True)
345 )
346 # ordered from largest to smallest via OrderedDict & sorted
347 self.subset_dimensions = [len(subset) for subset in self.subsets_matrix_map]
349 if len(self.all_qbs) != len(set(self.all_qbs)): 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true
350 raise ValueError("Qubit subsets are not mutually disjoint.")
352 xcirc = Circuit(1).X(0)
353 if backend is not None: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true
354 if backend.backend_info is None:
355 raise ValueError("No architecture associated with backend.")
356 nodes = backend.backend_info.nodes
357 if not all(node in nodes for node in self.all_qbs):
358 raise ValueError("Nodes do not all belong to architecture.")
359 backend.default_compilation_pass().apply(xcirc)
360 FlattenRegisters().apply(xcirc)
362 self.xbox = CircBox(xcirc)
364 def calibration_circuits(self) -> list[Circuit]:
365 """Generate calibration circuits according to the specified correlations.
367 :return: A list of calibration circuits to be run on the machine. The circuits
368 should be processed without compilation. Results from these circuits must
369 be given back to this class (via the `calculate_matrices` method) in the
370 same order.
371 :rtype: List[Circuit]
372 """
374 major_state_dimensions = self.subset_dimensions[0]
375 n_circuits = 1 << major_state_dimensions
376 # output
377 self.prepared_circuits = []
378 self.state_infos = []
380 # set up base circuit for appending xbox to
381 base_circuit = Circuit()
382 c_reg = []
383 for index, qb in enumerate(self.all_qbs):
384 base_circuit.add_qubit(qb)
385 c_bit = Bit(index)
386 c_reg.append(c_bit)
387 base_circuit.add_bit(c_bit)
389 # generate state circuits for given correlations
390 for major_state_index in range(n_circuits):
391 state_circuit = base_circuit.copy()
392 # get bit string corresponding to basis state of biggest subset of qubits
393 major_state = int_to_binary(major_state_index, major_state_dimensions)
394 new_state_dicts = {}
395 # parallelise circuits, run uncorrelated subsets
396 # characterisation in parallel
397 for dim, qubits in zip(self.subset_dimensions, self.subsets_matrix_map):
398 # add state to prepared states
399 new_state_dicts[qubits] = major_state[:dim]
400 # find only qubits that are expected to be in 1 state,
401 # add xbox to given qubits
402 for flipped_qb in itertools.compress(qubits, major_state[:dim]):
403 state_circuit.add_circbox(self.xbox, [flipped_qb])
404 # Decompose boxes, add barriers to preserve circuit, add measures
405 DecomposeBoxes().apply(state_circuit)
406 for qb, cb in zip(self.all_qbs, c_reg):
407 state_circuit.Measure(qb, cb)
409 # add to returned types
410 self.prepared_circuits.append(state_circuit)
411 self.state_infos.append((new_state_dicts, state_circuit.qubit_to_bit_map))
412 return self.prepared_circuits
414 def calculate_matrices(self, results_list: list[BackendResult]) -> None:
415 """Calculate the calibration matrices from the results of running calibration
416 circuits.
418 :param results_list: List of results from Backend. Must be in the same order as
419 the corresponding circuits generated by `calibration_circuits`.
420 :type counts_list: List[BackendResult]
422 :raises RuntimeError: Calibration circuits have not been generated yet.
423 """
424 if not self.state_infos: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true
425 raise RuntimeError(
426 "Ensure calibration states/circuits have been calculated first."
427 )
429 counter = 0
430 self.node_index_dict: dict[Node, tuple[int, int]] = dict()
432 for qbs, dim in zip(self.subsets_matrix_map, self.subset_dimensions):
433 # for a subset with n qubits, create a 2^n by 2^n matrix
434 self.subsets_matrix_map[qbs] = np.zeros((1 << dim,) * 2, dtype=float)
435 for i in range(len(qbs)):
436 qb = qbs[i]
437 self.node_index_dict[qb] = (counter, i)
438 counter += 1
440 for result, state_info in zip(results_list, self.state_infos):
441 state_dict = state_info[0]
442 qb_bit_map = state_info[1]
443 for qb_sub in self.subsets_matrix_map:
444 # bits of counts to consider
445 bits = [qb_bit_map[q] for q in qb_sub]
446 counts_dict = result.get_counts(cbits=bits)
447 for measured_state, count in counts_dict.items():
448 # intended state
449 prepared_state_index = binary_to_int(state_dict[qb_sub])
450 # produced state
451 measured_state_index = binary_to_int(measured_state)
452 # update characterisation matrix
453 M = self.subsets_matrix_map[qb_sub]
454 assert type(M) is np.ndarray
455 M[measured_state_index, prepared_state_index] += count
457 # normalise everything
458 self.characterisation_matrices = [
459 mat / np.sum(cast(np.ndarray, mat), axis=0)
460 for mat in self.subsets_matrix_map.values()
461 ]
463 def get_parallel_measure(self, circuit: Circuit) -> ParallelMeasures:
464 """For a given circuit, produces and returns a ParallelMeasures object required
465 for correcting counts results.
467 :param circuit: Circuit with some Measure operations.
468 :type circuit: Circuit
470 :return: A list of dictionaries mapping Qubit to Bit where each separate
471 dictionary details some set of Measurement operations run in parallel.
472 :rtype: ParallelMeasures
473 """
474 parallel_measure = [circuit.qubit_to_bit_map]
475 # implies mid-circuit measurements, or that at least missing
476 # bits need to be checked for Measure operation
477 if len(parallel_measure[0]) != len(circuit.bits):
478 used_bits = set(parallel_measure[0].values())
479 for mc in circuit.commands_of_type(OpType.Measure):
480 bit = mc.bits[0]
481 if bit not in used_bits:
482 # mid-circuit measure, add as a separate parallel measure
483 parallel_measure.append({mc.qubits[0]: bit})
484 return parallel_measure
486 def correct_counts(
487 self,
488 result: BackendResult,
489 parallel_measures: ParallelMeasures,
490 method: str = "bayesian",
491 options: dict | None = None,
492 ) -> BackendResult:
493 """Modifies count distribution for result, such that the inversion of the pure
494 noise map represented by characterisation matrices is applied to it.
496 :param result: BackendResult object to be negated by pure noise object.
497 :type result: BackendResult
498 :param parallel_measures: Used to permute corresponding BackendResult object so
499 counts order matches noise characterisation and to amend characterisation
500 matrices to correct the right bits. SpamCorrecter.get_parallel_measure
501 returns the required object for a given circuit.
502 :type parallel_measures: ParallelMeasures
504 :raises ValueError: Measured qubit in result not characterised.
506 :return: A new result object with counts modified to reflect SPAM correction.
507 :rtype: BackendResult
508 """
509 # the correction process assumes that when passed a list of matrices
510 # and a distribution to correct, that the j rows of matrix i
511 # corrects for the i, i+1,...i+j states in the passed distribution
512 # given information of which bits are measured on which qubits from
513 # parallel_measures, the following first produces matrices such that
514 # this condition is true
516 char_bits_order = []
517 correction_matrices = []
519 for mapping in parallel_measures:
520 # reduce_matrices removes given qubits corresponding entries from
521 # characterisation matrices
522 unused_qbs = set(self.all_qbs.copy())
523 for q in mapping:
524 # no q duplicates as mapping is dict from qubit to bit
525 if q not in unused_qbs: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true
526 raise ValueError(
527 f"Measured qubit {q} is not characterised by SpamCorrecter"
528 )
529 unused_qbs.remove(q) # type:ignore[arg-type]
530 char_bits_order.append(mapping[q])
531 correction_matrices.extend(
532 reduce_matrices(
533 [self.node_index_dict[q] for q in unused_qbs],
534 self.characterisation_matrices,
535 )
536 )
538 # get counts object for returning later
539 counts = result.get_counts(cbits=char_bits_order)
540 in_vec = np.zeros(1 << len(char_bits_order), dtype=float)
541 # turn from counts to probability distribution
542 for state, count in counts.items():
543 in_vec[binary_to_int(state)] = count
544 Ncounts = np.sum(in_vec)
545 in_vec_norm = in_vec / Ncounts
547 # with counts and characterisation matrices orders matching,
548 # correct distribution
549 if method == "invert":
550 try:
551 subinverts = [
552 np.linalg.inv(submatrix) for submatrix in correction_matrices
553 ]
554 except np.linalg.LinAlgError:
555 raise ValueError(
556 "Unable to invert calibration matrix: please re-run "
557 "calibration experiments or use an alternative correction method."
558 )
559 # assumes that order of rows in flattened subinverts equals
560 # order of bits in input vector
561 outvec = _compute_dot(subinverts, in_vec_norm)
562 # The entries of v will always sum to 1, but they may not all
563 # be in the range [0,1]. In order to make them genuine
564 # probabilities (and thus generate meaningful counts),
565 # we adjust them by setting all negative values to 0 and scaling
566 # the remainder.
567 outvec[outvec < 0] = 0
568 outvec /= sum(outvec)
570 elif method == "bayesian":
571 if options is None:
572 options = {}
573 tol_val = options.get("tol", 1 / Ncounts)
574 maxit = options.get("maxiter", None)
575 outvec = _bayesian_iterative_correct(
576 correction_matrices, in_vec_norm, tol=tol_val, max_it=maxit
577 )
579 else:
580 valid_methods = ("invert", "bayesian")
581 raise ValueError("Method must be one of: ", *valid_methods)
583 outvec *= Ncounts
585 # counter object with binary from distribution
586 corrected_counts = {
587 int_to_binary(index, len(char_bits_order)): Bcount
588 for index, Bcount in enumerate(outvec)
589 }
590 counter = Counter(
591 {
592 OutcomeArray.from_readouts([key]): ceil(val)
593 for key, val in corrected_counts.items()
594 }
595 )
596 # produce and return BackendResult object
597 return BackendResult(counts=counter, c_bits=char_bits_order)
599 def to_dict(self) -> dict:
600 """Get calibration information as a dictionary.
602 :return: Dictionary output
603 :rtype: Dict
604 """
605 correlations = []
606 for subset in self.correlations:
607 correlations.append([(uid.reg_name, uid.index) for uid in subset])
609 node_index_hashable = [
610 ((uid.reg_name, uid.index), self.node_index_dict[uid])
611 for uid in self.node_index_dict
612 ]
613 char_matrices = [m.tolist() for m in self.characterisation_matrices]
614 return {
615 "correlations": correlations,
616 "node_index_dict": node_index_hashable,
617 "characterisation_matrices": char_matrices,
618 }
620 @classmethod
621 def from_dict(class_obj, d: dict) -> "SpamCorrecter":
622 """Build a `SpamCorrecter` instance from a dictionary in the format returned
623 by `to_dict`.
625 :return: Dictionary of calibration information.
626 :rtype: SpamCorrecter
627 """
628 new_inst = class_obj(
629 [
630 [Node(*pair)]
631 for subset_tuple in d["correlations"]
632 for pair in subset_tuple
633 ]
634 )
635 new_inst.node_index_dict = dict(
636 [
637 (Node(*pair[0]), (int(pair[1][0]), int(pair[1][1])))
638 for pair in d["node_index_dict"]
639 ]
640 )
641 new_inst.characterisation_matrices = [
642 np.array(m) for m in d["characterisation_matrices"]
643 ]
644 return new_inst