Skip to content

Ordered send_recv pattern for RPCs #7480

Open
@fjetter

Description

@fjetter

We are currently supporting two ways to trigger RPC calls.

  1. PooledRPCs allow many-to-many communication send_recv patterns. Every request is using a dedicated connection that is pooled and potentially reused after succesful completion of the RPC. [1]
  2. BatchedComm is using a bidirectional one-to-one connection that is typically used to submit multiple smaller messages and submits a batch of messages every Xms.

A notable difference between the two methods is that 2.) guarantees message ordering while 1.) does not.

For many applications ordering is essential, e.g. to implement consistent distributed transactions.
It is very easy for developers to fall back to the pooled RPC approach that does not provide ordering whenever a response is useful. If both a response and ordering is required, there is currently no proper way to implement this.

This issue intends to discuss the possibiltiy of introducing standard functionality to support this use case, i.e. provide an ordered send_recv functionality

[1] Using a dedicated connection is primarily useful to submit large amounts of payload data s.t. the stream is not blocked for other smaller messages while the large payload is being submitted.

Naive approach w/out additional infrastructure

It is possible to write a send_recv-like pattern using the batched stream by breaking a coroutine into two, e.g.

# Server A

async def batched_ordered():
    bcommToB: BatchedSend
    bcommToB.write({
        "op": "inc",
        "ix": 1,
    })

async def handle_inc_response(x):
    assert x == 2


# Server B
async def handle_inc(comm, x):
    comm.write({
        "op": "inc-response",
        "ix": x + 1,
    })

This approach has a couple of downsides

  1. Writing to a BatchedSend in the first place is awkward since we need to assemble the message ourselves. The PooledRPC is building the msg for us, mimicking an actual remote call.
  2. The handler that needs/wants to respond cannot simply return the result but needs to take care of writing to the comm itself.
  3. ServerA needs to implement a dedicated response handler which is cumbersome and decouples the response from the caller, i.e. it is impossible to share local context

Suggested approach

Ideally, the batched, ordered communication allows a similar access pattern as the PooledRPCCalls and constructs the message behind the scenes. Further, any returned results would be sent back to the caller.
Effectively, stream handlers and comm handlers would be identical and there would no longer be the need to distinguish the two.

# Server A

async def batched_ordered():
    bcommToB: OrderedBatchedSendRcv
    expected = 2
    res = await bcommToB.inc(1)
    assert res == expected

# Server B
async def handle_inc(x):
    return x + 1

To enable such an API we would need to change both sender and receiver of the stream to deal with responses.

Sender

On sender side we are stricing to provide a similar API to what PooledRPCCall is currently offering.
A notable difference between the PooledRPCCall and this suggested approach is that the batched comm we are using here is already used/read in Server.handle_stream, i.e. we cannot wait for the response ourselves. Instead we can introduce a new handler that is listening to all responses (or special case/inline this into handle_stream)

class BatchedResponseMixin:  # Could be directly part of the server, ofc

    stream_handlers = {
        "bcomm_recv": self.handle_send_recv_batched_response
    }
    _response_events : dict[Comm, dict[str, asyncio.Future]]= {}

    async def handle_send_recv_batched_response(self, comm, response, request_id):
        if response["status"] == "OK":
            self._response_events[comm][request_id].set_result(response["result"])
        else:
            self._response_events[comm][request_id].set_exception(response["result"])

   def send_recv_batched(self, bcomm, msg):
        request_id : int = _gen_unique_id()  # e.g. a counter
        msg["__request__id__"] = request_id
        self._response_events[bcomm][request_id] = fut = asyncio.Future()
        bcomm.send(**msg)
        return fut

# Should mimick PooledRPCCall as good as possible
class RPCCallBatched:
    def __init__(self, addr, server: BatchedResponseMixin):
        self.server = server
        self.bcomm = server.batched_comms[addr]

    def __getattr__(self, key):
        # A kwarg reply==False version could just construct the message, send and return 
        # w/out a future to await or instantiate the future and set a None result immediately
        # for API consistency. This way we could get rid of all the {"op": ...} messages in the
        # code base but we'd have a reply kwarg... not sure what's the lesser evil

        async def send_recv_batched(**kwargs):
            return await self.server.send_recv_batched(
                self.bcomm, op=key, **kwargs
            )
        return send_recv_batched

The sender generates a unique ID for every request (could be simply a monotonic counter) and attaches this to every request. It also creates and an asyncio.Future (or some other synchronization primitive) that is mapped to that request ID and can be awaited to receive the result once available. We then use a dedicated stream handler send_recv_batched that is accepting the response and is setting the result to the future. The receiver side subsequently submits the response to this dedicated response handler with the original request_id.

Receiver

async def _wrap_stream_handler(bcomm, request_id, handler, **kwargs):
    try:
        res = await handler(**kwargs)
        msg = {
            "status": "OK",
            "response": res,
            "request_id": request_id,
        }
        bcomm.send(msg)

Caveats

The above pseudo code is obviously not complete. I believe the most critical missing component is us dealing with dead remotes. The current send_recv will naturally fail since the used comm is being closed once the remote is dead. Since we are not awaiting the comm.read we will need to handle this case explicitly in the handle_stream try/except/finally.

cc @hendrikmakait @graingert

Metadata

Metadata

Assignees

No one assigned

    Labels

    asynciocoreenhancementImprove existing functionality or make things work betterfeatureSomething is missing

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions