Source code for qermit.noise_model.transpiler_backend

import multiprocessing
import uuid
from collections import Counter
from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Tuple

from pytket import Bit, Circuit
from pytket.backends.backendresult import BackendResult
from pytket.backends.resulthandle import ResultHandle
from pytket.extensions.qiskit import AerBackend
from pytket.passes import BasePass, CustomPass
from pytket.utils.outcomearray import OutcomeArray


[docs] class TranspilerBackend: """ Provides a backend like interface for noise simulation via compiler passes. In particular, for each shot a new circuit is generated by applying the given compiler pass. Attributes: transpiler: Compiler pass to apply to simulate noise on a single instance of a circuit. max_batch_size: Shots are simulated in batches. This is the largest shot batch size permitted. result_dict: A dictionary mapping handles to results. backend: Backend used to simulate compiled circuits. n_cores: The number of cores used when simulating shots in parallel. """ transpiler: BasePass max_batch_size: int result_dict: Dict[ResultHandle, BackendResult] n_cores: int backend = AerBackend() def __init__( self, transpiler: BasePass, result_dict: Dict[ResultHandle, BackendResult] = {}, max_batch_size: int = 1000, n_cores: int = 1, ): """Initialisation method. :param transpiler: Compiler to use during noise simulation. :param max_batch_size: Size of the largest batch of shots, defaults to 1000. The total number of shots is distributed between bathes of size 1000 plus a smaller batch for left over shot. These batches will be distributed to multiple cores. :param result_dict: Results dictionary, may be used to store existing results within backend, defaults to {} :param n_cores: Shots will be taken in parallel. This parameter specifies the number of cores to use. The default is to use one core. """ self.transpiler = transpiler self.max_batch_size = max_batch_size self.result_dict = result_dict self.n_cores = n_cores
[docs] def default_compilation_pass(self, **kwargs) -> BasePass: """Return a compiler pass which has no affect on the circuit.""" return CustomPass(transform=lambda circuit: circuit)
[docs] def rebase_pass(self) -> BasePass: """Return a compiler pass which has no affect on the circuit.""" return CustomPass(transform=lambda circuit: circuit)
[docs] def run_circuit( self, circuit: Circuit, n_shots: int, **kwargs, ) -> BackendResult: """Return results of running one circuit. :param circuit: Circuit to run :param n_shots: Number of shots to be taken from circuit. :return: Result of running circuit. """ handle = self.process_circuit(circuit, n_shots, **kwargs) return self.get_result(handle=handle)
[docs] def process_circuits( self, circuits: Sequence[Circuit], n_shots: Sequence[int], ) -> List[ResultHandle]: """Processes a collection of circuits by making use multiple calls to process_circuit. :param circuits: A collection of circuit to run. :param n_shots: The number of shots which should be taken from each circuit. :return: The result handle for each circuit. """ return [ self.process_circuit(circuit=circuit, n_shots=n) for circuit, n in zip(circuits, n_shots) ]
[docs] def process_circuit( self, circuit: Circuit, n_shots: int, **kwargs, ) -> ResultHandle: """[summary] :param circuit: Submits circuit to run on noisy backend. :param n_shots: Number of shots to take from circuit. :return: Handle identifying results in `result_dict`. """ handle = ResultHandle(str(uuid.uuid4())) counts = self.get_counts( circuit=circuit, n_shots=n_shots, cbits=circuit.bits, ) self.result_dict[handle] = BackendResult( counts=Counter( {OutcomeArray.from_readouts([key]): val for key, val in counts.items()} ), c_bits=circuit.bits, ) return handle
[docs] def get_results(self, handles: Iterable[ResultHandle]) -> List[BackendResult]: """Get the results corresponding to a collection of result handles. :param handles: A collection of handles to retrieve. :return: The results corresponding to the given collection of """ return [self.get_result(handle) for handle in handles]
[docs] def get_result(self, handle: ResultHandle) -> BackendResult: """Retrieve result from backend. :param handle: Handle identifying result. :return: Result corresponding to handle. """ return self.result_dict[handle]
def _gen_transpiled_circuit(self, circuit: Circuit) -> Circuit: """Generate compiled circuit by copying and compiling it. :param circuit: Circuit to be compiled. :return: Compiled circuit. """ transpiled_circuit = circuit.copy() self.transpiler.apply(transpiled_circuit) self.backend.rebase_pass().apply(transpiled_circuit) return transpiled_circuit def _gen_batches(self, circuit: Circuit, n_shots: int) -> Iterator[List[Circuit]]: """Iterator generating lists of circuits of size `max_batch_size` until all shots have been accounted for. :param circuit: Circuit to batch into shots. :param n_shots: Number of shots to take from circuit. :return: List of compiled circuits, which is to say noisy circuits. """ # Return lists of size max_batch_size containing unique # compiled instances of the given circuit. for _ in range(n_shots // self.max_batch_size): yield [ self._gen_transpiled_circuit(circuit) for _ in range(self.max_batch_size) ] # If less than max_batch_size shots remain to be returned, # return what's left. if n_shots % self.max_batch_size > 0: yield [ self._gen_transpiled_circuit(circuit) for _ in range(n_shots % self.max_batch_size) ] @staticmethod def _get_batch_counts( circuit_list: List[Circuit], cbits_list: Optional[List[List]], ) -> Counter[Tuple[int, ...]]: """Run each circuit in the given list for one shot, collating the results into a single counter. :param circuit_list: The list of circuits to run for one shot each. :param cbits_list: The classical bits to return the measurements of :return: The collated counter object. """ if cbits_list is not None: cbits = [Bit.from_list(cbit_list) for cbit_list in cbits_list] else: cbits = None backend = AerBackend() result_list = backend.run_circuits(circuit_list, n_shots=1) return sum( (result.get_counts(cbits=cbits) for result in result_list), Counter() )
[docs] def get_counts( self, circuit: Circuit, n_shots: int, cbits: Optional[List[Bit]] = None, ) -> Counter[Tuple[int, ...]]: """Generate shots from the given circuit. :param circuit: Circuit to take shots from. :param n_shots: Number of shots to take from circuit. :param cbits: Classical bits to return shots from, defaults to returning all. :return: Counter detailing shots from circuit. """ if self.n_cores > 1: if cbits is not None: cbits_list = [cbit.to_list() for cbit in cbits] else: cbits_list = None with multiprocessing.Pool(self.n_cores) as pool: processes = [ pool.apply_async( self._get_batch_counts, args=(circuit_list, cbits_list) ) for circuit_list in self._gen_batches(circuit, n_shots) ] counter_list = [p.get() for p in processes] return sum(counter_list, Counter()) else: counter: Counter = Counter() for circuit_list in self._gen_batches(circuit, n_shots): result_list = self.backend.run_circuits(circuit_list, n_shots=1) counter += sum( (result.get_counts(cbits=cbits) for result in result_list), Counter(), ) return counter