Coverage for /home/runner/work/tket/tket/pytket/pytket/utils/spam.py: 91%

221 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-02 12:44 +0000

1# Copyright Quantinuum 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import itertools 

16from collections import Counter, OrderedDict 

17from collections.abc import Callable, Iterable 

18from functools import lru_cache 

19from math import ceil, log2 

20from typing import cast 

21 

22import numpy as np 

23 

24from pytket.backends import Backend 

25from pytket.backends.backendresult import BackendResult 

26from pytket.circuit import Bit, CircBox, Circuit, Node, OpType, Qubit 

27from pytket.passes import DecomposeBoxes, FlattenRegisters 

28from pytket.utils.outcomearray import OutcomeArray 

29 

30 

31def compress_counts( 

32 counts: dict[tuple[int, ...], float], tol: float = 1e-6, round_to_int: bool = False 

33) -> dict[tuple[int, ...], int | float]: 

34 """Filter counts to remove states that have a count value (which can be a 

35 floating-point number) below a tolerance, and optionally round to an 

36 integer. 

37 

38 :param counts: Input counts 

39 :param tol: Value below which counts are pruned. Defaults to 1e-6. 

40 :param round_to_int: Whether to round each count to an integer. Defaults to False. 

41 

42 :return: Filtered counts 

43 """ 

44 valprocess: Callable[[float], int | float] = lambda x: ( 

45 round(x) if round_to_int else x 

46 ) 

47 processed_pairs = ( 

48 (key, valprocess(val)) for key, val in counts.items() if val > tol 

49 ) 

50 return {key: val for key, val in processed_pairs if val > 0} 

51 

52 

53@lru_cache(maxsize=128) 

54def binary_to_int(bintuple: tuple[int]) -> int: 

55 """Convert a binary tuple to corresponding integer, with most significant bit as 

56 the first element of tuple. 

57 

58 :param bintuple: Binary tuple 

59 

60 :return: Integer 

61 """ 

62 integer = 0 

63 for index, bitset in enumerate(reversed(bintuple)): 

64 if bitset: 

65 integer |= 1 << index 

66 return integer 

67 

68 

69@lru_cache(maxsize=128) 

70def int_to_binary(val: int, dim: int) -> tuple[int, ...]: 

71 """Convert an integer to corresponding binary tuple, with most significant bit as 

72 the first element of tuple. 

73 

74 :param val: input integer 

75 :param dim: Bit width 

76 

77 :return: Binary tuple of width dim 

78 """ 

79 return tuple(map(int, format(val, f"0{dim}b"))) 

80 

81 

82######################################### 

83### _compute_dot and helper functions ### 

84### 

85### With thanks to 

86### https://math.stackexchange.com/a/3423910 

87### and especially 

88### https://gist.github.com/ahwillia/f65bc70cb30206d4eadec857b98c4065 

89### on which this code is based. 

90def _unfold(tens: np.ndarray, mode: int, dims: list[int]) -> np.ndarray: 

91 """Unfolds tensor into matrix. 

92 

93 :param tens: Tensor with shape equivalent to dimensions 

94 :param mode: Specifies axis move to front of matrix in unfolding of tensor 

95 :param dims: Gives shape of tensor passed 

96 

97 :return: Matrix with shape (dims[mode], prod(dims[/mode])) 

98 """ 

99 if mode == 0: 

100 return tens.reshape(dims[0], -1) 

101 return np.moveaxis(tens, mode, 0).reshape(dims[mode], -1) 

102 

103 

104def _refold(vec: np.ndarray, mode: int, dims: list[int]) -> np.ndarray: 

105 """Refolds vector into tensor. 

106 

107 :param vec: Tensor with length equivalent to the product of dimensions given in 

108 dims 

109 :param mode: Axis tensor was unfolded along 

110 :param dims: Shape of tensor 

111 

112 :return: Tensor folded from vector with shape equivalent to given dimensions 

113 """ 

114 if mode == 0: 

115 return vec.reshape(dims) 

116 # Reshape and then move dims[mode] back to its 

117 # appropriate spot (undoing the `unfold` operation). 

118 tens = vec.reshape([dims[mode]] + [d for m, d in enumerate(dims) if m != mode]) 

119 return np.moveaxis(tens, 0, mode) 

120 

121 

122def _compute_dot(submatrices: Iterable[np.ndarray], vector: np.ndarray) -> np.ndarray: 

123 """Multiplies the kronecker product of the given submatrices with given vector. 

124 

125 :param submatrices: Submatrices multiplied 

126 :param vector: Vector multplied 

127 

128 :return: Kronecker product of arguments 

129 """ 

130 dims = [A.shape[0] for A in submatrices] 

131 vt = vector.reshape(dims) 

132 for i, A in enumerate(submatrices): 

133 vt = _refold(A @ _unfold(vt, i, dims), i, dims) 

134 return vt.ravel() 

135 

136 

137def _bayesian_iteration( 

138 submatrices: Iterable[np.ndarray], 

139 measurements: np.ndarray, 

140 t: np.ndarray, 

141 epsilon: float, 

142) -> np.ndarray: 

143 """Transforms T corresponds to a Bayesian iteration, used to modfiy 

144 measurements. 

145 

146 :param submatrices: submatrices to be inverted and applied to measurements. 

147 :param measurements: Probability distribution over set of states to be amended. 

148 :param t: Some transform to act on measurements. 

149 :param epsilon: A stabilization parameter to define an affine transformation for 

150 application to submatrices, eliminating zero probabilities. 

151 

152 :return: Transformed distribution vector. 

153 """ 

154 # Transform t according to the Bayesian iteration 

155 # The parameter epsilon is a stabilization parameter which defines an affine 

156 # transformation to apply to the submatrices to eliminate zero probabilities. This 

157 # transformation preserves the property that all columns sum to 1 

158 if epsilon == 0: 158 ↛ 162line 158 didn't jump to line 162 because the condition on line 158 was always true

159 # avoid copying if we don't need to 

160 As = submatrices 

161 else: 

162 As = [ 

163 epsilon / submatrix.shape[0] + (1 - epsilon) * submatrix 

164 for submatrix in submatrices 

165 ] 

166 z = _compute_dot(As, t) 

167 if np.isclose(z, 0).any(): 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 raise ZeroDivisionError 

169 return cast( 

170 "np.ndarray", t * _compute_dot([A.transpose() for A in As], measurements / z) 

171 ) 

172 

173 

174def _bayesian_iterative_correct( 

175 submatrices: Iterable[np.ndarray], 

176 measurements: np.ndarray, 

177 tol: float = 1e-5, 

178 max_it: int | None = None, 

179) -> np.ndarray: 

180 """Finds new states to represent application of inversion of submatrices on 

181 measurements. Converges when update states within tol range of previously 

182 tested states. 

183 

184 :param submatrices: Matrices comprising the pure noise characterisation. 

185 :param input_vector: Vector corresponding to some counts distribution. 

186 :param tol: tolerance of closeness of found results 

187 :param max_it: Maximum number of inversions attempted to correct results. 

188 """ 

189 # based on method found in https://arxiv.org/abs/1910.00129 

190 

191 vector_size = measurements.size 

192 # uniform initial 

193 true_states = np.full(vector_size, 1 / vector_size) 

194 prev_true = true_states.copy() 

195 converged = False 

196 count = 0 

197 epsilon: float = 0 # stabilization parameter, adjusted dynamically 

198 while not converged: 

199 if max_it: 

200 if count >= max_it: 

201 break 

202 count += 1 

203 try: 

204 true_states = _bayesian_iteration( 

205 submatrices, measurements, true_states, epsilon 

206 ) 

207 converged = np.allclose(true_states, prev_true, atol=tol) 

208 prev_true = true_states.copy() 

209 except ZeroDivisionError: 

210 # Shift the stabilization parameter up a bit (always < 0.5). 

211 epsilon = 0.99 * epsilon + 0.01 * 0.5 

212 

213 return true_states 

214 

215 

216def _reduce_matrix(indices_to_remove: list[int], matrix: np.ndarray) -> np.ndarray: 

217 """Removes indices from indices_to_remove from binary associated to indexing of 

218 matrix, producing a new transition matrix. To do so, it assigns all transition 

219 probabilities as the given state in the remaining indices binary, with the removed 

220 binary in state 0. This is an assumption on the noise made because it is likely 

221 that unmeasured qubits will be in that state. 

222 

223 :param indices_to_remove: Binary index of state matrix is mapping to be removed. 

224 :param matrix: Transition matrix where indices correspond to some binary state. 

225 

226 :return: Transition matrix with removed entries. 

227 """ 

228 

229 new_n_qubits = int(log2(matrix.shape[0])) - len(indices_to_remove) 

230 if new_n_qubits == 0: 

231 return np.array([]) 

232 bin_map = {} 

233 mat_dim = 1 << new_n_qubits 

234 for index in range(mat_dim): 

235 # get current binary 

236 bina = list(int_to_binary(index, new_n_qubits)) 

237 # add 0's to fetch old binary to set values from 

238 for i in sorted(indices_to_remove): 

239 bina.insert(i, 0) 

240 # get index of values 

241 bin_map[index] = binary_to_int(tuple(bina)) 

242 

243 new_mat = np.zeros((mat_dim,) * 2, dtype=float) 

244 for i in range(len(new_mat)): 

245 old_row_index = bin_map[i] 

246 for j in range(len(new_mat)): 

247 old_col_index = bin_map[j] 

248 new_mat[i, j] = matrix[old_row_index, old_col_index] 

249 return new_mat 

250 

251 

252def _reduce_matrices( 

253 entries_to_remove: list[tuple[int, int]], matrices: list[np.ndarray] 

254) -> list[np.ndarray]: 

255 """Removes some dimensions from some matrices. 

256 

257 :param entries_to_remove: Via indexing, details dimensions to be removed. 

258 :param matrices: All matrices to have dimensions removed. 

259 

260 :return: Matrices with some dimensions removed. 

261 """ 

262 organise: dict[int, list] = {k: [] for k in range(len(matrices))} 

263 for unused in entries_to_remove: 

264 # unused[0] is index in matrices 

265 # unused[1] is qubit index in matrix 

266 organise[unused[0]].append(unused[1]) 

267 output_matrices = [_reduce_matrix(organise[m], matrices[m]) for m in organise] 

268 return [ 

269 mat / np.sum(mat, axis=0) for mat in [x for x in output_matrices if len(x) != 0] 

270 ] 

271 

272 

273class SpamCorrecter: 

274 """A class for generating "state preparation and measurement" (SPAM) calibration 

275 experiments for ``pytket`` backends, and correcting counts generated from them. 

276 

277 Supports saving calibrated state to a dictionary format, and restoring from the 

278 dictionary. 

279 """ 

280 

281 def __init__(self, qubit_subsets: list[list[Node]], backend: Backend | None = None): 

282 """Construct a new `SpamCorrecter`. 

283 

284 :param qubit_subsets: A list of lists of correlated Nodes of an `Architecture`. 

285 Qubits within the same list are assumed to only have SPAM errors correlated 

286 with each other. Thus to allow SPAM errors between all qubits you should 

287 provide a single list. 

288 :param backend: Backend on which the experiments are intended to be run 

289 (optional). If provided, the qubits in `qubit_subsets` must be nodes in the 

290 backend's associated `Architecture`. If not provided, it is assumed that the 

291 experiment will be run on an `Architecture`with the nodes in 

292 `qubit_subsets`, and furthermore that the intended architecture natively 

293 supports X gates. 

294 

295 :raises ValueError: There are repeats in the `qubit_subsets` specification. 

296 """ 

297 self.correlations = qubit_subsets 

298 

299 self.all_qbs = [qb for subset in qubit_subsets for qb in subset] 

300 

301 def to_tuple(inp: list[Node]) -> tuple: 

302 return tuple(inp) 

303 

304 self.subsets_matrix_map = OrderedDict.fromkeys( 

305 sorted(map(to_tuple, self.correlations), key=len, reverse=True) 

306 ) 

307 # ordered from largest to smallest via OrderedDict & sorted 

308 self.subset_dimensions = [len(subset) for subset in self.subsets_matrix_map] 

309 

310 if len(self.all_qbs) != len(set(self.all_qbs)): 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true

311 raise ValueError("Qubit subsets are not mutually disjoint.") 

312 

313 xcirc = Circuit(1).X(0) 

314 if backend is not None: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true

315 if backend.backend_info is None: 

316 raise ValueError("No architecture associated with backend.") 

317 nodes = backend.backend_info.nodes 

318 if not all(node in nodes for node in self.all_qbs): 

319 raise ValueError("Nodes do not all belong to architecture.") 

320 backend.default_compilation_pass().apply(xcirc) 

321 FlattenRegisters().apply(xcirc) 

322 

323 self.xbox = CircBox(xcirc) 

324 

325 def calibration_circuits(self) -> list[Circuit]: 

326 """Generate calibration circuits according to the specified correlations. 

327 

328 :return: A list of calibration circuits to be run on the machine. The circuits 

329 should be processed without compilation. Results from these circuits must 

330 be given back to this class (via the `calculate_matrices` method) in the 

331 same order. 

332 """ 

333 

334 major_state_dimensions = self.subset_dimensions[0] 

335 n_circuits = 1 << major_state_dimensions 

336 # output 

337 self.prepared_circuits = [] 

338 self.state_infos = [] 

339 

340 # set up base circuit for appending xbox to 

341 base_circuit = Circuit() 

342 c_reg = [] 

343 for index, qb in enumerate(self.all_qbs): 

344 base_circuit.add_qubit(qb) 

345 c_bit = Bit(index) 

346 c_reg.append(c_bit) 

347 base_circuit.add_bit(c_bit) 

348 

349 # generate state circuits for given correlations 

350 for major_state_index in range(n_circuits): 

351 state_circuit = base_circuit.copy() 

352 # get bit string corresponding to basis state of biggest subset of qubits 

353 major_state = int_to_binary(major_state_index, major_state_dimensions) 

354 new_state_dicts = {} 

355 # parallelise circuits, run uncorrelated subsets 

356 # characterisation in parallel 

357 for dim, qubits in zip( 

358 self.subset_dimensions, self.subsets_matrix_map, strict=False 

359 ): 

360 # add state to prepared states 

361 new_state_dicts[qubits] = major_state[:dim] 

362 # find only qubits that are expected to be in 1 state, 

363 # add xbox to given qubits 

364 for flipped_qb in itertools.compress(qubits, major_state[:dim]): 

365 state_circuit.add_circbox(self.xbox, [flipped_qb]) 

366 # Decompose boxes, add barriers to preserve circuit, add measures 

367 DecomposeBoxes().apply(state_circuit) 

368 for qb, cb in zip(self.all_qbs, c_reg, strict=False): 

369 state_circuit.Measure(qb, cb) 

370 

371 # add to returned types 

372 self.prepared_circuits.append(state_circuit) 

373 self.state_infos.append((new_state_dicts, state_circuit.qubit_to_bit_map)) 

374 return self.prepared_circuits 

375 

376 def calculate_matrices(self, results_list: list[BackendResult]) -> None: 

377 """Calculate the calibration matrices from the results of running calibration 

378 circuits. 

379 

380 :param results_list: List of results from Backend. Must be in the same order as 

381 the corresponding circuits generated by `calibration_circuits`. 

382 

383 :raises RuntimeError: Calibration circuits have not been generated yet. 

384 """ 

385 if not self.state_infos: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true

386 raise RuntimeError( 

387 "Ensure calibration states/circuits have been calculated first." 

388 ) 

389 

390 counter = 0 

391 self.node_index_dict: dict[Node, tuple[int, int]] = {} 

392 

393 for qbs, dim in zip( 

394 self.subsets_matrix_map, self.subset_dimensions, strict=False 

395 ): 

396 # for a subset with n qubits, create a 2^n by 2^n matrix 

397 self.subsets_matrix_map[qbs] = np.zeros((1 << dim,) * 2, dtype=float) 

398 for i in range(len(qbs)): 

399 qb = qbs[i] 

400 self.node_index_dict[qb] = (counter, i) 

401 counter += 1 # noqa: SIM113 

402 

403 for result, state_info in zip(results_list, self.state_infos, strict=False): 

404 state_dict = state_info[0] 

405 qb_bit_map = state_info[1] 

406 for qb_sub in self.subsets_matrix_map: 

407 # bits of counts to consider 

408 bits = [qb_bit_map[q] for q in qb_sub] 

409 counts_dict = result.get_counts(cbits=bits) 

410 for measured_state, count in counts_dict.items(): 

411 # intended state 

412 prepared_state_index = binary_to_int(state_dict[qb_sub]) 

413 # produced state 

414 measured_state_index = binary_to_int(measured_state) 

415 # update characterisation matrix 

416 M = self.subsets_matrix_map[qb_sub] 

417 assert type(M) is np.ndarray 

418 M[measured_state_index, prepared_state_index] += count 

419 

420 # normalise everything 

421 self.characterisation_matrices = [ 

422 mat / np.sum(cast("np.ndarray", mat), axis=0) 

423 for mat in self.subsets_matrix_map.values() 

424 ] 

425 

426 def get_parallel_measure(self, circuit: Circuit) -> list[dict[Qubit, Bit]]: 

427 """For a given circuit, produces and returns a `list[dict[Qubit, Bit]]` object required 

428 for correcting counts results. 

429 

430 :param circuit: Circuit with some Measure operations. 

431 

432 :return: A list of dictionaries mapping Qubit to Bit where each separate 

433 dictionary details some set of Measurement operations run in parallel. 

434 """ 

435 parallel_measure = [circuit.qubit_to_bit_map] 

436 # implies mid-circuit measurements, or that at least missing 

437 # bits need to be checked for Measure operation 

438 if len(parallel_measure[0]) != len(circuit.bits): 

439 used_bits = set(parallel_measure[0].values()) 

440 for mc in circuit.commands_of_type(OpType.Measure): 

441 bit = mc.bits[0] 

442 if bit not in used_bits: 

443 # mid-circuit measure, add as a separate parallel measure 

444 parallel_measure.append({mc.qubits[0]: bit}) 

445 return parallel_measure 

446 

447 def correct_counts( 

448 self, 

449 result: BackendResult, 

450 parallel_measures: list[dict[Qubit, Bit]], 

451 method: str = "bayesian", 

452 options: dict | None = None, 

453 ) -> BackendResult: 

454 """Modifies count distribution for result, such that the inversion of the pure 

455 noise map represented by characterisation matrices is applied to it. 

456 

457 :param result: BackendResult object to be negated by pure noise object. 

458 :param parallel_measures: Used to permute corresponding BackendResult object so 

459 counts order matches noise characterisation and to amend characterisation 

460 matrices to correct the right bits. SpamCorrecter.get_parallel_measure 

461 returns the required object for a given circuit. 

462 

463 :raises ValueError: Measured qubit in result not characterised. 

464 

465 :return: A new result object with counts modified to reflect SPAM correction. 

466 """ 

467 # the correction process assumes that when passed a list of matrices 

468 # and a distribution to correct, that the j rows of matrix i 

469 # corrects for the i, i+1,...i+j states in the passed distribution 

470 # given information of which bits are measured on which qubits from 

471 # parallel_measures, the following first produces matrices such that 

472 # this condition is true 

473 

474 char_bits_order = [] 

475 correction_matrices = [] 

476 

477 for mapping in parallel_measures: 

478 # _reduce_matrices removes given qubits corresponding entries from 

479 # characterisation matrices 

480 unused_qbs = set(self.all_qbs.copy()) 

481 for q in mapping: 

482 # no q duplicates as mapping is dict from qubit to bit 

483 if q not in unused_qbs: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 raise ValueError( 

485 f"Measured qubit {q} is not characterised by SpamCorrecter" 

486 ) 

487 unused_qbs.remove(q) # type:ignore[arg-type] 

488 char_bits_order.append(mapping[q]) 

489 correction_matrices.extend( 

490 _reduce_matrices( 

491 [self.node_index_dict[q] for q in unused_qbs], 

492 self.characterisation_matrices, 

493 ) 

494 ) 

495 

496 # get counts object for returning later 

497 counts = result.get_counts(cbits=char_bits_order) 

498 in_vec = np.zeros(1 << len(char_bits_order), dtype=float) 

499 # turn from counts to probability distribution 

500 for state, count in counts.items(): 

501 in_vec[binary_to_int(state)] = count 

502 Ncounts = np.sum(in_vec) 

503 in_vec_norm = in_vec / Ncounts 

504 

505 # with counts and characterisation matrices orders matching, 

506 # correct distribution 

507 if method == "invert": 

508 try: 

509 subinverts = [ 

510 np.linalg.inv(submatrix) for submatrix in correction_matrices 

511 ] 

512 except np.linalg.LinAlgError: 

513 raise ValueError( # noqa: B904 

514 "Unable to invert calibration matrix: please re-run " 

515 "calibration experiments or use an alternative correction method." 

516 ) 

517 # assumes that order of rows in flattened subinverts equals 

518 # order of bits in input vector 

519 outvec = _compute_dot(subinverts, in_vec_norm) 

520 # The entries of v will always sum to 1, but they may not all 

521 # be in the range [0,1]. In order to make them genuine 

522 # probabilities (and thus generate meaningful counts), 

523 # we adjust them by setting all negative values to 0 and scaling 

524 # the remainder. 

525 outvec[outvec < 0] = 0 

526 outvec /= sum(outvec) 

527 

528 elif method == "bayesian": 

529 if options is None: 

530 options = {} 

531 tol_val = options.get("tol", 1 / Ncounts) 

532 maxit = options.get("maxiter", None) 

533 outvec = _bayesian_iterative_correct( 

534 correction_matrices, in_vec_norm, tol=tol_val, max_it=maxit 

535 ) 

536 

537 else: 

538 valid_methods = ("invert", "bayesian") 

539 raise ValueError("Method must be one of: ", *valid_methods) 

540 

541 outvec *= Ncounts 

542 

543 # counter object with binary from distribution 

544 corrected_counts = { 

545 int_to_binary(index, len(char_bits_order)): Bcount 

546 for index, Bcount in enumerate(outvec) 

547 } 

548 counter = Counter( 

549 { 

550 OutcomeArray.from_readouts([key]): ceil(val) 

551 for key, val in corrected_counts.items() 

552 } 

553 ) 

554 # produce and return BackendResult object 

555 return BackendResult(counts=counter, c_bits=char_bits_order) 

556 

557 def to_dict(self) -> dict: 

558 """Get calibration information as a dictionary. 

559 

560 :return: Dictionary output 

561 """ 

562 correlations = [] 

563 for subset in self.correlations: 

564 correlations.append( # noqa: PERF401 

565 [(uid.reg_name, uid.index) for uid in subset] 

566 ) 

567 

568 node_index_hashable = [ 

569 ((uid.reg_name, uid.index), self.node_index_dict[uid]) 

570 for uid in self.node_index_dict 

571 ] 

572 char_matrices = [m.tolist() for m in self.characterisation_matrices] 

573 return { 

574 "correlations": correlations, 

575 "node_index_dict": node_index_hashable, 

576 "characterisation_matrices": char_matrices, 

577 } 

578 

579 @classmethod 

580 def from_dict(class_obj, d: dict) -> "SpamCorrecter": 

581 """Build a `SpamCorrecter` instance from a dictionary in the format returned 

582 by `to_dict`. 

583 

584 :return: Dictionary of calibration information. 

585 """ 

586 new_inst = class_obj( 

587 [ 

588 [Node(*pair)] 

589 for subset_tuple in d["correlations"] 

590 for pair in subset_tuple 

591 ] 

592 ) 

593 new_inst.node_index_dict = { 

594 Node(*pair[0]): (int(pair[1][0]), int(pair[1][1])) 

595 for pair in d["node_index_dict"] 

596 } 

597 new_inst.characterisation_matrices = [ 

598 np.array(m) for m in d["characterisation_matrices"] 

599 ] 

600 return new_inst