Source code for tierkreis.worker.callback

"""Callback client."""

from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Dict
from urllib.parse import urlparse

from grpclib.client import Channel

import tierkreis.core.protos.tierkreis.v1alpha1.runtime as pr
from tierkreis.client.runtime_client import RuntimeClient
from tierkreis.client.server_client import ServerRuntime
from tierkreis.core.signature import Signature
from tierkreis.core.tierkreis_graph import Location, TierkreisGraph
from tierkreis.core.values import TierkreisValue


[docs] class Callback(RuntimeClient): """Runtime client for use within a worker function, to submit graphs back to the parent runtime. """ def __init__(self, channel: Channel, loc: Location): self.runtime = ServerRuntime(channel) self.loc = loc
[docs] async def get_signature(self) -> Signature: return await self.runtime.get_signature(self.loc)
[docs] async def run_graph( self, graph: TierkreisGraph, /, **py_inputs: Any, ) -> Dict[str, TierkreisValue]: return await self.runtime.run_graph(graph, self.loc, **py_inputs)
[docs] async def type_check_graph(self, graph: TierkreisGraph) -> TierkreisGraph: return await self.runtime.type_check_graph(graph, self.loc)
[docs] @asynccontextmanager async def callback_server(callback: pr.Callback) -> AsyncIterator[RuntimeClient]: """Context manager for connection to a callback server.""" url = urlparse(callback.uri) host, port = url.hostname, url.port assert host is not None async with Channel(host, port) as channel: yield Callback(channel, callback.loc)