Description
We are currently supporting two ways to trigger RPC calls.
- PooledRPCs allow many-to-many communication
send_recv
patterns. Every request is using a dedicated connection that is pooled and potentially reused after succesful completion of the RPC. [1] - BatchedComm is using a bidirectional one-to-one connection that is typically used to submit multiple smaller messages and submits a batch of messages every Xms.
A notable difference between the two methods is that 2.) guarantees message ordering while 1.) does not.
For many applications ordering is essential, e.g. to implement consistent distributed transactions.
It is very easy for developers to fall back to the pooled RPC approach that does not provide ordering whenever a response is useful. If both a response and ordering is required, there is currently no proper way to implement this.
This issue intends to discuss the possibiltiy of introducing standard functionality to support this use case, i.e. provide an ordered send_recv
functionality
[1] Using a dedicated connection is primarily useful to submit large amounts of payload data s.t. the stream is not blocked for other smaller messages while the large payload is being submitted.
Naive approach w/out additional infrastructure
It is possible to write a send_recv
-like pattern using the batched stream by breaking a coroutine into two, e.g.
# Server A
async def batched_ordered():
bcommToB: BatchedSend
bcommToB.write({
"op": "inc",
"ix": 1,
})
async def handle_inc_response(x):
assert x == 2
# Server B
async def handle_inc(comm, x):
comm.write({
"op": "inc-response",
"ix": x + 1,
})
This approach has a couple of downsides
- Writing to a BatchedSend in the first place is awkward since we need to assemble the message ourselves. The PooledRPC is building the msg for us, mimicking an actual remote call.
- The handler that needs/wants to respond cannot simply return the result but needs to take care of writing to the comm itself.
- ServerA needs to implement a dedicated response handler which is cumbersome and decouples the response from the caller, i.e. it is impossible to share local context
Suggested approach
Ideally, the batched, ordered communication allows a similar access pattern as the PooledRPCCalls and constructs the message behind the scenes. Further, any returned results would be sent back to the caller.
Effectively, stream handlers and comm handlers would be identical and there would no longer be the need to distinguish the two.
# Server A
async def batched_ordered():
bcommToB: OrderedBatchedSendRcv
expected = 2
res = await bcommToB.inc(1)
assert res == expected
# Server B
async def handle_inc(x):
return x + 1
To enable such an API we would need to change both sender and receiver of the stream to deal with responses.
Sender
On sender side we are stricing to provide a similar API to what PooledRPCCall
is currently offering.
A notable difference between the PooledRPCCall
and this suggested approach is that the batched comm we are using here is already used/read in Server.handle_stream
, i.e. we cannot wait for the response ourselves. Instead we can introduce a new handler that is listening to all responses (or special case/inline this into handle_stream)
class BatchedResponseMixin: # Could be directly part of the server, ofc
stream_handlers = {
"bcomm_recv": self.handle_send_recv_batched_response
}
_response_events : dict[Comm, dict[str, asyncio.Future]]= {}
async def handle_send_recv_batched_response(self, comm, response, request_id):
if response["status"] == "OK":
self._response_events[comm][request_id].set_result(response["result"])
else:
self._response_events[comm][request_id].set_exception(response["result"])
def send_recv_batched(self, bcomm, msg):
request_id : int = _gen_unique_id() # e.g. a counter
msg["__request__id__"] = request_id
self._response_events[bcomm][request_id] = fut = asyncio.Future()
bcomm.send(**msg)
return fut
# Should mimick PooledRPCCall as good as possible
class RPCCallBatched:
def __init__(self, addr, server: BatchedResponseMixin):
self.server = server
self.bcomm = server.batched_comms[addr]
def __getattr__(self, key):
# A kwarg reply==False version could just construct the message, send and return
# w/out a future to await or instantiate the future and set a None result immediately
# for API consistency. This way we could get rid of all the {"op": ...} messages in the
# code base but we'd have a reply kwarg... not sure what's the lesser evil
async def send_recv_batched(**kwargs):
return await self.server.send_recv_batched(
self.bcomm, op=key, **kwargs
)
return send_recv_batched
The sender generates a unique ID for every request (could be simply a monotonic counter) and attaches this to every request. It also creates and an asyncio.Future
(or some other synchronization primitive) that is mapped to that request ID and can be awaited to receive the result once available. We then use a dedicated stream handler send_recv_batched
that is accepting the response and is setting the result to the future. The receiver side subsequently submits the response to this dedicated response handler with the original request_id
.
Receiver
async def _wrap_stream_handler(bcomm, request_id, handler, **kwargs):
try:
res = await handler(**kwargs)
msg = {
"status": "OK",
"response": res,
"request_id": request_id,
}
bcomm.send(msg)
Caveats
The above pseudo code is obviously not complete. I believe the most critical missing component is us dealing with dead remotes. The current send_recv
will naturally fail since the used comm is being closed once the remote is dead. Since we are not awaiting the comm.read
we will need to handle this case explicitly in the handle_stream
try/except/finally.