# Copyright 2019-2023 Quantinuum
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
from copy import deepcopy
from itertools import repeat
from typing import List, OrderedDict, Sequence, Tuple, Union, cast
import networkx as nx # type: ignore
import numpy as np
from pytket import Bit, Circuit
from pytket.backends import Backend, ResultHandle
from pytket.backends.backendresult import BackendResult
from pytket.utils.outcomearray import OutcomeArray
from .mittask import (
CircuitShots,
IOTask,
MitTask,
Wire,
)
from .task_graph import TaskGraph
[docs]
def backend_compile_circuit_shots_task_gen(
backend: Backend, optimisation_level: int = 1
) -> MitTask:
"""
For each circuit in passed List[CircuitShots] argument, pass
circuit to compile_circuit method of given backend with given optimisation level.
Returns new wire wherein each circuit has been compiled.
:param backend: Backend object from which compile circuit method is called
:param optimisation_level: Optimisation level of backend called.
"""
def task(obj, circ_shots: List[CircuitShots]) -> Tuple[List[CircuitShots]]:
return (
[
CircuitShots(
backend.get_compiled_circuit(
cs.Circuit, optimisation_level=optimisation_level
),
cs.Shots,
)
for cs in circ_shots
],
)
return MitTask(
_label="CompileCircuitShots", _n_in_wires=1, _n_out_wires=1, _method=task
)
[docs]
def backend_handle_task_gen(backend: Backend) -> MitTask:
"""
Passes every tuple of Circuit and Shots to the backend object MitTask
is defined by, returning a handle for later retrieving results for each circuit.
If different numbers of shots are passed, each circuit is run with the maximum number of shots.
:param backend: Backend circuits are run through.
:return: Pure function that adds passes circuits to backend and gets handles.
"""
def task(obj, circuit_wires: List[CircuitShots]) -> Tuple[List[ResultHandle]]:
"""
:param circuit_wires: Circuits to be run on backend, number of shots to run of each.
:return: ResultHandles from process_circuits method.
"""
if len(circuit_wires) != 0:
circs, shots = map(list, zip(*circuit_wires))
results = backend.process_circuits(
cast(List[Circuit], circs), n_shots=cast(Sequence[int], shots)
)
return (results,)
else:
return ([],)
return MitTask(
_label="CircuitsToHandles", _n_in_wires=1, _n_out_wires=1, _method=task
)
[docs]
def backend_res_task_gen(backend: Backend) -> MitTask:
"""
For each ResultHandle passed to task, retrieves a BackendResult object from
the backend the task is defined by.
:param backend: backend holding results for handles.
"""
def task(obj, handles: List[ResultHandle]) -> Tuple[List[BackendResult]]:
"""
:param handles: ResultHandle objects previously produced from backend.
:return: For each ResultHandle in handles, a BackendResult object retrieved from backend.
"""
results = backend.get_results(handles)
return (results,)
return MitTask(
_label="HandlesToResults", _n_in_wires=1, _n_out_wires=1, _method=task
)
[docs]
class MitRes(TaskGraph):
"""
A TaskGraph extension of mitigation of counts/shots for individual circuits.
"""
[docs]
def __init__(
self,
backend: Backend,
_label: str = "MitRes",
) -> None:
"""
MitRes objects are defined by the backend objects all circuits are executed on.
:param backend: Pytket backend default constructor which tasks are generated from.
:param label: Name for identification of MitRes object.
"""
# set member variables
self._label = _label
self.G = None
self.characterisation: dict = {}
# default constructor runs all circuits through passed Backend
self._task_graph = nx.MultiDiGraph()
# if requested, all data is held in cache and can be accessed after running
self._cache: OrderedDict[str, Tuple[MitTask, List[Wire]]] = OrderedDict()
c2h = backend_handle_task_gen(backend)
h2r = backend_res_task_gen(backend)
self._i, self._o = IOTask.Input, IOTask.Output
self._task_graph.add_edge(self._i, c2h, key=(0, 0), data=None)
self._task_graph.add_edge(c2h, h2r, key=(0, 0), data=None)
self._task_graph.add_edge(h2r, self._o, key=(0, 0), data=None)
[docs]
def check_prepend_wires(self, task: Union[MitTask, "TaskGraph"]) -> bool:
"""
Confirms that the number of out wires the passed task has is equal
to the number of out wires from the input, and that the number
of in wires the passed task has is 1. Also checks that the
task.run attribute argument is List[CircuitShots] and that its
return type is Tuple[List[CircuitShots]].
:param task: MitTask or TaskGraph object for checking wire numbers of.
:return: True if task is suitable for prepending, False if not.
"""
sig = inspect.signature(task.run)
params = list(sig.parameters.values())
return (
(task.n_out_wires == self.n_in_wires)
and (task.n_in_wires == 1)
and (len(params) == 1)
and (params[0].annotation == List[CircuitShots])
and (sig.return_annotation == Tuple[List[CircuitShots]])
)
[docs]
def check_append_wires(self, task: Union[MitTask, "TaskGraph"]) -> bool:
"""
Confirms that the number of in wires the passed task has is equal
to the number of in wires to the output, and that the number
of out wires the passed task has is 1. Also checks that the
task.run attribute argument is List[BackendResult] and that its
return type is Tuple[List[BackendResult]].
:param task: MitTask or TaskGraph object for checking wire numbers of.
:return: True if task is suitable for apppending, False if not.
"""
sig = inspect.signature(task.run)
params = list(sig.parameters.values())
return (
(task.n_in_wires == self.n_out_wires)
and (task.n_out_wires == 1)
and (len(params) == 1)
and (params[0].annotation == List[BackendResult])
and (sig.return_annotation == Tuple[List[BackendResult]])
)
[docs]
def __str__(self) -> str:
return f"<MitRes::{self._label}>"
[docs]
def __call__( # type: ignore[override]
self,
circuits_wire: List[List[CircuitShots]],
cache: bool = False,
characterisation: dict = {},
) -> Tuple[List[BackendResult]]:
return cast(
Tuple[List[BackendResult]],
super().run(
cast(List[Wire], circuits_wire),
cache=cache,
characterisation=characterisation,
),
)
[docs]
def from_TaskGraph(self, task_graph: TaskGraph):
"""
Returns a MitRes object from a TaskGraph object.
:param task_graph: TaskGraph object to copy tasks from.
:return: Copied TaskGraph as MitRes
"""
if task_graph.n_in_wires != 1 or task_graph.n_out_wires != 1:
raise TypeError(
"Type signature of passed task_graph.run method does not equal MitRun.run type signature. Number of in and out wires does not match."
)
# can index as previous check means there should only be one edge
input_parameters = list(
inspect.signature(
list(task_graph._task_graph.out_edges(task_graph._i))[0][1].run
).parameters.values()
)
if (
len(input_parameters) != 1
and input_parameters[0].annotation != List[CircuitShots]
):
raise TypeError(
"Type signature of passed task_graph.run method does not equal MitRes.run type signature. First MitTask in graph should expect a single argument of List[Circuitshots], but expects {}.".format(
input_parameters
)
)
# can index as previous check means there should only be one edge
return_annotation = inspect.signature(
list(task_graph._task_graph.in_edges(task_graph._o))[0][0].run
).return_annotation
if return_annotation != Tuple[List[BackendResult]]:
raise TypeError(
"Type signature of passed task_graph.run method does not equal MitRes.run type signature. Last MitTask in"
"task graph should return Tuple[List[BackendResult]], but returns {}.".format(
return_annotation
)
)
self._task_graph = deepcopy(task_graph._task_graph)
self._label = task_graph._label
return self
[docs]
def parallel(self, task: Union[MitTask, "TaskGraph"]):
"""
Requests to add new MitTask/TaskGraph to TaskGraph object in parallel.
Not permitted for MitRes, raises TypeError.
:param task: New task to be added in parallel.
"""
raise TypeError("MitRes.parallel forbidden.")
[docs]
def add_n_wires(self, num_wires: int):
"""
Requests to add num_wires number of edges between the input vertex
and output vertex, with no type restrictions. Not permitted for MitRes,
raises TypeError.
:param num_wires: Number of edges to add between input and output vertices.
"""
raise TypeError("MitRes.add_n_wires forbidden.")
[docs]
def add_wire(self):
"""
Requests to add a single edge between the input vertex and output vertex.
Not permitted for MitRes, raises TypeError.
"""
raise TypeError("MitRes.add_wire forbidden.")
[docs]
def run( # type: ignore[override]
self,
circuit_shots: List[CircuitShots],
cache: bool = False,
characterisation: dict = {},
) -> List[BackendResult]:
"""
Overloaded run method from TaskGraph class to add type checking.
A single experiment is defined by a Tuple containing a circuit to be run
on some backend and the number of shots to take of said circuit.
For each combination of Circuit and shots run returns a BackendResult object
containing counts/shots.
:param circuit_shotss: Each tuple in circuit_wires contains a Circuit to run on
internal backends and the number of shots to take of said circuit.
:return: A BackendResult object for each combination of circuit and shots.
"""
return self([circuit_shots], cache, characterisation)[0]
[docs]
def split_shots_task_gen(max_shots: int) -> MitTask:
"""
When the number of shots requested is higher than max_shots, this task
splits the jobs into several individual jobs, where the total number of
shots matches that initially requested.
:param max_shots: The maximum number of shots per job.
:return: A task dividing the job into several smaller ones.
"""
def task(
obj, circuit_wires: List[CircuitShots]
) -> Tuple[List[CircuitShots], List[int]]:
"""
This task divides a job into several smaller ones, if the number of
shots requested is larger then the maximum allowed.
:param circuit_wires: A list of the circuits to be run, and the number
of shots to be taken.
:return: A new list of circuits, some of which repeat with a smaller
number of shots.
"""
# A new list of circuits and shots
split_circuit_wires = []
# and index of which circuits in the new list correspond to which in
# the original list. This is to say all circuits in the new list
# assigned an index i by this index list correspond to original
# circuit i.
split_index: List[int] = []
# For each circuit, check if the number of shots requested is greater
# then the maximum, and divide the job into several smaller ones if
# this is the case.
for i, circ_shots in enumerate(circuit_wires):
div_val = circ_shots.Shots // max_shots
# Divide into jobs of size max_shots
if circ_shots.Shots > max_shots:
split_circuit_wires.extend(
[
CircuitShots(circ_shots.Circuit, max_shots)
for _ in range(div_val)
]
)
# Add remaining shots
if circ_shots.Shots % max_shots > 0:
split_circuit_wires.append(
CircuitShots(circ_shots.Circuit, circ_shots.Shots % max_shots)
)
# Add new job indexes
split_index.extend(
repeat(i, div_val + min(1, circ_shots.Shots % max_shots))
)
return (split_circuit_wires, split_index)
return MitTask(_label="SplitShots", _n_in_wires=1, _n_out_wires=2, _method=task)
[docs]
def group_shots_task_gen() -> MitTask:
"""
Returns task which groups together jobs that correspond to the same
circuit. Original larger job could have been split by task returned
by split_shots_task_gen.
:return: Task merging jobs which correspond to the same circuit.
"""
def task(
obj, results_wires: List[BackendResult], split_index: List[int]
) -> Tuple[List[BackendResult]]:
"""
Task which merges jobs which correspond to the same circuit.
:param results_wires: A list of results, some of which correspond to
the same circuit.
:param split_index: A list of indexes identifying which results
correspond to which circuit. Those results with index i correspond to
the circuit i.
:return: A new list of results, where results which correspond to the
same circuit have been grouped together.
"""
# A new list of results where some results have been merged.
merged_results = []
# A list of lists. Each internal list contains results that correspond
# to the circuit of the same index.
grouped_results = [
[result for result, i in zip(results_wires, split_index) if i == j]
for j in range(split_index[-1] + 1)
]
# For each circuit, combine the results.
for chunk in grouped_results:
# Concatenate shots from all results
combined_shots = np.concatenate([result.get_shots() for result in chunk])
outcome_array = OutcomeArray.from_readouts(combined_shots)
merged_results.append(
BackendResult(
shots=outcome_array, c_bits=cast(Sequence[Bit], chunk[0].c_bits)
)
)
return (merged_results,)
return MitTask(_label="MergeShots", _n_in_wires=2, _n_out_wires=1, _method=task)
[docs]
def gen_shot_split_MitRes(backend: Backend, max_shots: int) -> MitRes:
"""
Wraps a mitres in tasks which ensures shots requested by individual
jobs never exceed max_shots. It does so by splitting the shots over several
jobs if necessary.
:param backend: The backend on which to run the jobs.
:param max_shots: The maximum number of shots that each job should request.
:return: A MitRes object.
"""
_experiment_mitres = MitRes(backend=backend)
_experiment_taskgraph = TaskGraph().from_TaskGraph(_experiment_mitres)
_experiment_taskgraph.add_wire()
# Prepend splitting task.
split_shots = split_shots_task_gen(max_shots)
_experiment_taskgraph.prepend(split_shots)
# Append merging task.
merge_shots = group_shots_task_gen()
_experiment_taskgraph.append(merge_shots)
return MitRes(backend).from_TaskGraph(_experiment_taskgraph)