Parallel computation using Map¶
An example of how to run several CPU intensive nodes in parallel and aggregate the results.
Example worker¶
We use an encryption function as a placeholder for a CPU intensive worker function. For demonstration purposes we ensure that the plaintexts are short enough (and the work factor small enough) that the graph runs in under 10s.
The following worker is in the Tierkreis GitHub repo at examples/example_workers/auth_worker
:
import secrets
from sys import argv
from time import time
from typing import NamedTuple
from tierkreis.controller.data.models import portmapping
import pyscrypt # type: ignore
from tierkreis import Worker
worker = Worker("auth_worker")
@portmapping
class EncryptionResult(NamedTuple):
ciphertext: str
time_taken: float
@worker.task()
def encrypt(plaintext: str, work_factor: int) -> EncryptionResult:
start_time = time()
salt = secrets.token_bytes(32)
ciphertext = pyscrypt.hash( # type:ignore
password=plaintext.encode(), salt=salt, N=work_factor, r=1, p=1, dkLen=32
)
time_taken = time() - start_time
return EncryptionResult(ciphertext=str(ciphertext), time_taken=time_taken)
if __name__ == "__main__":
worker.app(argv)
Generating stubs¶
Since this worker uses the Tierkreis Python library, we can automatically generate stub files using the following command. The stub files will provide us with type hints in the graph building process later on.
!cd ../../../examples/example_workers/auth_worker && uv run main.py --stubs-path ../../../docs/source/tutorial/auth_stubs.py > /dev/null 2>&1
Writing a graph¶
We can import this stub file to help create our graph.
The graph builder manipulates references to values, not the values themselves.
(The one exception to this rule is when we add a constant value to a graph using GraphBuilder.const
. Then the actual value is added to the graph definition and GraphBuilder.const
returns a reference to this value.)
The references are type checked using the TKR
type.
I.e. a reference to an int
has the type TKR[int]
.
from typing import NamedTuple
from tierkreis.models import EmptyModel, TKR
from tierkreis.builder import GraphBuilder
from tierkreis.builtins.stubs import mean
from auth_stubs import encrypt, EncryptionResult
def map_body():
g = GraphBuilder(TKR[str], EncryptionResult)
result = g.task(encrypt(plaintext=g.inputs, work_factor=g.const(2**14)))
g.outputs(result)
return g
class GraphOutputs(NamedTuple):
average_time_taken: TKR[float]
ciphertexts: TKR[list[str]]
def graph():
g = GraphBuilder(EmptyModel, GraphOutputs)
plaintexts = g.const([f"plaintext+{n}" for n in range(20)])
results = g.map(map_body(), plaintexts)
ciphertexts = g.map(lambda x: x.ciphertext, results)
times = g.map(lambda x: x.time_taken, results)
av = g.task(mean(values=times))
out = GraphOutputs(ciphertexts=ciphertexts, average_time_taken=av)
g.outputs(out)
return g
Running the graph¶
In order to run a graph we need to choose a storage backend and executor. In this example we choose a simple filestorage backend and the UV executor. For the UV executor the registry path should be a folder containing all the workers we use.
Then we pass the storage, executor and graph into the run_graph
function.
At this point we have the option to pass additional graph inputs.
import json
from pathlib import Path
import time
from uuid import UUID
from tierkreis import run_graph
from tierkreis.executor import UvExecutor
from tierkreis.storage import FileStorage, read_outputs
storage = FileStorage(UUID(int=2048), "auth_graph", do_cleanup=True)
executor = UvExecutor(
registry_path=Path("../../../examples/example_workers"), logs_path=storage.logs_path
)
start = time.time()
run_graph(storage, executor, graph().data, {})
total_time = time.time() - start
outputs = read_outputs(storage)
av = outputs["average_time_taken"]
ciphertexts = outputs["ciphertexts"]
print(f"Encrypted 20 plaintexts in {total_time:1g}s with mean encryption time {av:1g}")
Encrypted 20 plaintexts in 20.4659s with mean encryption time 17.8141
We should see that the mean time to encrypt a single plaintext is quite close to the time taken for the whole workflow, which indicates that the encryptions were run in parallel.