Parallel computation using Map

An example of how to run several CPU intensive nodes in parallel and aggregate the results.

Example worker

We use an encryption function as a placeholder for a CPU intensive worker function. For demonstration purposes we ensure that the plaintexts are short enough (and the work factor small enough) that the graph runs in under 10s.

The following worker is in the Tierkreis GitHub repo at examples/example_workers/auth_worker:

import secrets
from sys import argv
from time import time
from typing import NamedTuple

from tierkreis.controller.data.models import portmapping
import pyscrypt  # type: ignore
from tierkreis import Worker

worker = Worker("auth_worker")


@portmapping
class EncryptionResult(NamedTuple):
    ciphertext: str
    time_taken: float


@worker.task()
def encrypt(plaintext: str, work_factor: int) -> EncryptionResult:
    start_time = time()
    salt = secrets.token_bytes(32)
    ciphertext = pyscrypt.hash(  # type:ignore
        password=plaintext.encode(), salt=salt, N=work_factor, r=1, p=1, dkLen=32
    )
    time_taken = time() - start_time

    return EncryptionResult(ciphertext=str(ciphertext), time_taken=time_taken)


if __name__ == "__main__":
    worker.app(argv)

Generating stubs

Since this worker uses the Tierkreis Python library, we can automatically generate stub files using the following command. The stub files will provide us with type hints in the graph building process later on.

!cd ../../../examples/example_workers/auth_worker && uv run main.py --stubs-path ../../../docs/source/tutorial/auth_stubs.py > /dev/null 2>&1

Writing a graph

We can import this stub file to help create our graph.

The graph builder manipulates references to values, not the values themselves. (The one exception to this rule is when we add a constant value to a graph using GraphBuilder.const. Then the actual value is added to the graph definition and GraphBuilder.const returns a reference to this value.) The references are type checked using the TKR type. I.e. a reference to an int has the type TKR[int].

from typing import NamedTuple
from tierkreis.models import EmptyModel, TKR
from tierkreis.builder import GraphBuilder
from tierkreis.builtins.stubs import mean

from auth_stubs import encrypt, EncryptionResult


def map_body():
    g = GraphBuilder(TKR[str], EncryptionResult)
    result = g.task(encrypt(plaintext=g.inputs, work_factor=g.const(2**14)))
    g.outputs(result)
    return g


class GraphOutputs(NamedTuple):
    average_time_taken: TKR[float]
    ciphertexts: TKR[list[str]]


def graph():
    g = GraphBuilder(EmptyModel, GraphOutputs)
    plaintexts = g.const([f"plaintext+{n}" for n in range(20)])
    results = g.map(map_body(), plaintexts)

    ciphertexts = g.map(lambda x: x.ciphertext, results)
    times = g.map(lambda x: x.time_taken, results)

    av = g.task(mean(values=times))
    out = GraphOutputs(ciphertexts=ciphertexts, average_time_taken=av)

    g.outputs(out)
    return g

Running the graph

In order to run a graph we need to choose a storage backend and executor. In this example we choose a simple filestorage backend and the UV executor. For the UV executor the registry path should be a folder containing all the workers we use.

Then we pass the storage, executor and graph into the run_graph function. At this point we have the option to pass additional graph inputs.

import json
from pathlib import Path
import time
from uuid import UUID
from tierkreis import run_graph
from tierkreis.executor import UvExecutor
from tierkreis.storage import FileStorage, read_outputs

storage = FileStorage(UUID(int=2048), "auth_graph", do_cleanup=True)
executor = UvExecutor(
    registry_path=Path("../../../examples/example_workers"), logs_path=storage.logs_path
)
start = time.time()
run_graph(storage, executor, graph().data, {})
total_time = time.time() - start

outputs = read_outputs(storage)
av = outputs["average_time_taken"]
ciphertexts = outputs["ciphertexts"]
print(f"Encrypted 20 plaintexts in {total_time:1g}s with mean encryption time {av:1g}")
Encrypted 20 plaintexts in 20.4659s with mean encryption time 17.8141

We should see that the mean time to encrypt a single plaintext is quite close to the time taken for the whole workflow, which indicates that the encryptions were run in parallel.