-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: message pickup live mode support #1638
feat!: message pickup live mode support #1638
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice @genaris :)
@@ -48,15 +48,15 @@ export class MessageSender { | |||
public constructor( | |||
envelopeService: EnvelopeService, | |||
transportService: TransportService, | |||
@inject(InjectionSymbols.MessageRepository) messageRepository: MessageRepository, | |||
@inject(InjectionSymbols.MessagePickupRepository) messagePickupRepository: MessagePickupRepository, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should rename repository, as it conflicts with all the other repositories that are based on the storage service.
Maybe we can use messagePickupQueue? Or messagePickupStorage?
Or maybe I should change my thinking on what a repository can be. As repository does make sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the beginning I named it MessagePickupQueue
but discussing about it with my colleagues we noticed that we are not using it as a queue, in the sense that we are enqueuing data to it but not exactly dequeuing data when retrieving it (as we must store until it is actually acknowledged by the mediatee). Also, the data on it is somewhat organized (i.e. by connectionId, recipientKey, state, etc.) so the term 'Repository' feels to me more appropriate than 'Storage', which is usually used to generically save data.
Having said this, I'm not happy because the unfortunate name clash with Repository
interface, so I'm more than open to opinions about how to improve the naming!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think MessageRepository
is best i.e. it's the place where you store and fetch messages. I think xxxQueue
would tend to incorrectly enforce the underlying technology, when any persistent storage may actually work. Pickup
is just a fragment of the interface (we're not 'picking up' messages received that cannot be delivered, rather we're queuing them for later pickup), so I would just drop it to avoid narrowing the exhibited responsibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with MessageRepository
is that it collides with Repository
classes used within the framework, most notably DidCommMessageRepository
, so leaving such generic name would be more confusing. The 'Pickup' word tries to specify that it is used for message pickup protocol.
It seems it is still not clear, as this MessagePickupRepository
is an interface rather than an implementation, like the other Repository
.
@@ -176,7 +176,7 @@ export class MessageSender { | |||
// If the other party shared a queue service endpoint in their did doc we queue the message | |||
if (queueService) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we at some point change this to only queue when it is allowed? So a connection must have queueing enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, I remember this from #1199. I think it will be important to find a way to configure this. I'm trying to find the best place to update the code to support this, as I don't want to make MessageSender
dependent on Mediator or Pickup modules API (we'll have some circular dependencies).
@@ -1,5 +1,5 @@ | |||
export const InjectionSymbols = { | |||
MessageRepository: Symbol('MessageRepository'), | |||
MessagePickupRepository: Symbol('MessagePickupRepository'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we make this a module config option, and expose the config on the new API, we don't need the injection symbol anymore (as you can inject the api and then do api.config.messagePickupRepository)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. However I think I'll need to update a bit the logic in order to avoid dependency cycles between MessageSender
and MessagePickupApi
. Probably adding some parameter to sendMessage
or even triggering an internal event that can be listened in Message Pickup.
* Deliver messages in the Message Pickup Queue for a given connection and key (if specified). | ||
* | ||
* Note that this is only available when there is an active session with the recipient. Message | ||
* Pickup protocol messages themselves are not added to pickup queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Smart! :)
packages/core/src/modules/message-pìckup/protocol/v2/V2MessagePickupProtocol.ts
Outdated
Show resolved
Hide resolved
packages/core/src/modules/message-pìckup/protocol/v2/V2MessagePickupProtocol.ts
Outdated
Show resolved
Hide resolved
packages/core/src/modules/message-pìckup/MessagePickupEvents.ts
Outdated
Show resolved
Hide resolved
* Pickup protocol messages themselves are not added to pickup queue | ||
* | ||
*/ | ||
public async deliverQueuedMessages(options: DeliverQueuedMessagesOptions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this take a sessionId instead, so you wont have the case where there's no session? Maybe that'll complicate it without benefits
I think this should at least return an object, where it e.g. optionally returns the message, and a state which indicates how the delivery went (e.g. for now no open session, or delivered). Also will this deliver all messages? Or should you call this in a loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to make it more flexible, I've added an API method to get live sessions and make this one to use sessionId instead. This way, the only possibility is that it will proceed with the delivery (e.g. the delivery message has been properly submitted) or an error happened (MessageSendingError or so), so it will throw and can be handled by the caller.
This method, now called deliverMessagesFromQueue
, will retrieve any pending messages from the queue and send the initial batch of batchSize
messages, starting a pickup loop that will be finished once all messages are delivered (the client will automatically request more messages when the status shows that there are pending messages). This initial batchSize
number is sent as an input parameter or taken from configuration.
There is also another method called deliverMessages
which is similar but intended to send specific messages. It will not retrieve other messages from the queue. This can be more appropriate when several messages for a given mediatee are received within a short time frame, as there will not be need of querying the queue multiple times.
Hi, as per the live-mode, I think the proposed approach will fix the problems we observed with websockets, where messages were irremediably lost, being enqueued without being acknowledged. So thanks for that, looking forward to it! |
I'm less convinced that the "central Message Pickup Repository" will address #1625. The "central" wording suggests that this repo will run in a separate instance. IMHO, the only robust solution is to rely on a notification mechanism that exists outside the mediator itself. Then each mediator instance subscribes to that notification platform (for example AWS EventBridge), and the responsibility of sending notifications lies entirely with the storage layer for queued messages. In short, the API should simply make it possible to configure a notification listener that will invoke the appropriate AFJ message pickup API. Also worth noting, sending those notifications should only happen after storage, not before, and should also not be a responsibility of AFJ because that's easily achievable using standard middleware (Redis or a CDC can do that). In even shorter, AFJ should handle notifications, not provide an internalised notification platform. All I need is an entry point that I can call from my app to tell it that a message for recipient x has been stored. That entry point will check whether the corresponding mediatee is connected and if so forward the message to them (as described in the original PR comment)(if not it does nothing). Sorry if that sounds a bit negative, I might simply be misunderstanding what is being proposed. Happy to help with testing. |
Hey @ericvergnaud, I think you're correct this PR doesn't directly address the issue of multiple mediator instances, but it does add methods to build w plugin on top of AFJ that does this. There's new events added that allow you to know when a message has been queued. The idea is that a) you hook into this event and emit some kind of notification that will be received by other mediators (we use redis pub/sub) and b) in the other mediator you call deliver messages method to deliver the messages to the recipient when a notification is received. A potential next step would be to make some default implementations plugins for notification, so it's easier to set up a mediator. Does that resolve your uncertainties with this PR, or are there things we should do differently? |
@TimoGlastra thanks for the clarification. I guess the current wording is slightly misleading then... One thing I remain concerned with is putting the responsibility of sending notifications with the mediator. Although it seems convenient, it won't be 100% reliable unless the mediator implements 2-phase commit:
I don't think the mediator should implement 2-phase commit. Rather I recommend delegating that task to the storage itself. PostgreSQL comes with a CDC that is guaranteed to run for every data change, even after a crash, because it relies on the journal. Most database engines provide the same, including sqlite. That said, the proposed mechanism can be very useful for testing, so I would keep it. But not advertise it as the standard way of syncing instances in a mediator cluster, rather provide examples of a correct end-to-end implementation. |
That makes sense, thank you for the explanation. I think CDC is a great solution to this. Is there anything we can do in AFJ to make this more easy to set up? CC @swcurran @dbluhm I think ACA-Py kafka / redis plugin would be susceptible to the same problem that @ericvergnaud describes here. |
I guess a useful thing that can be done:
(since sqlite cannot act as a server, not sure it makes sense to provide a cdc sample sample for it) Another useful thing is updating the documentation for describing a 'supported' cluster setup. For NR testing:
|
In this PR description, when I referred to a a'central message pickup repository' I was meaning a common storage shared among all AFJ instances (e.g. a PostgreSQL or Mongo database). That does not mean that it will reside in one of them. Through What is different from previous implementation is that now the framework gives a way for each mediator instance to push messages for a certain mediatee on-demand, so if it receives a notification that a new message has been added for it, it can send it immediately (by calling Also, there are new events triggered when Live Mode Sessions are opened and closed. This allows each mediator instance to keep track of all mediatees are connected to it, so it can e.g. subscribe to receive notifications only when there are new messages for them instead of listening to every change in message pickup database. Whether you want to use a CDC to notify about changes, or create a pub/sub mechanism where each AFJ instance publish a message to a channel and other instances listen to it, it is up to each implementation. It is not something enforced by thir PR.
I agree on this. I hope to have been clear that the goal of this PR is to make it possible to build an architecture like the one you are describing here. |
Something I want to clarify is that the new events are more related to session opening/closing than individual message queuing. You can of course hook into either I agree that this should be clearly documented, and add some reference plug-ins to show a full implementation. I'll do some more changes to this PR based on these comments and some experience I had recently while testing it, and submit it for review. |
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
…ation#1644) Signed-off-by: Berend Sliedrecht <sliedrecht@berend.io> Signed-off-by: Ariel Gentile <gentilester@gmail.com>
…oundation#1648) Signed-off-by: Timo Glastra <timo@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com>
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
Signed-off-by: Ariel Gentile <gentilester@gmail.com> feat: deliver messages individually, not fetching from the queue every time Signed-off-by: Ariel Gentile <gentilester@gmail.com> chore: revert to free runners (openwallet-foundation#1662) Signed-off-by: Ry Jones <ry@linux.com> Signed-off-by: Ariel Gentile <gentilester@gmail.com> chore: create settings.yml (openwallet-foundation#1663) Signed-off-by: Ry Jones <ry@linux.com> Signed-off-by: Ariel Gentile <gentilester@gmail.com> chore: fix ci and add note to readme (openwallet-foundation#1669) Signed-off-by: Timo Glastra <timo@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com> docs: update active maintainers (openwallet-foundation#1664) Signed-off-by: Karim Stekelenburg <karim@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com> feat: did:peer:2 and did:peer:4 support in DID Exchange (openwallet-foundation#1550) Signed-off-by: Ariel Gentile <gentilester@gmail.com> feat(presentation-exchange): added PresentationExchangeService (openwallet-foundation#1672) Signed-off-by: Berend Sliedrecht <sliedrecht@berend.io> Signed-off-by: Ariel Gentile <gentilester@gmail.com> chore: removed jan as maintainer (openwallet-foundation#1678) Signed-off-by: Timo Glastra <timo@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com> ci: change secret (openwallet-foundation#1679) Signed-off-by: Timo Glastra <timo@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com> chore: add meta to rxjs timeout errors (openwallet-foundation#1683) Signed-off-by: Timo Glastra <timo@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com> build(deps): bump ws and @types/ws (openwallet-foundation#1686) Signed-off-by: dependabot[bot] <support@github.com> Signed-off-by: Ariel Gentile <gentilester@gmail.com> build(deps): bump follow-redirects from 1.15.2 to 1.15.4 (openwallet-foundation#1694) Signed-off-by: dependabot[bot] <support@github.com> Signed-off-by: Ariel Gentile <gentilester@gmail.com> chore: update shared components libraries (openwallet-foundation#1691) Signed-off-by: Timo Glastra <timo@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com> fix: properly print key class (openwallet-foundation#1684) Signed-off-by: Timo Glastra <timo@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com> feat(present-proof): add support for aries RFC 510 (openwallet-foundation#1676) Signed-off-by: Berend Sliedrecht <sliedrecht@berend.io> Signed-off-by: Ariel Gentile <gentilester@gmail.com> fix(present-proof): isolated tests (openwallet-foundation#1696) Signed-off-by: Ariel Gentile <gentilester@gmail.com> feat(indy-vdr): register revocation registry definitions and status list (openwallet-foundation#1693) Signed-off-by: Ariel Gentile <gentilester@gmail.com> chore: rename to credo-ts (openwallet-foundation#1703) Signed-off-by: Ry Jones <ry@linux.com> Signed-off-by: Ariel Gentile <gentilester@gmail.com> ci: fix git checkout path and update setup-node (openwallet-foundation#1709) Signed-off-by: Ariel Gentile <gentilester@gmail.com> fix: remove check for DifPresentationExchangeService dependency (openwallet-foundation#1702) Signed-off-by: Sai Ranjit Tummalapalli <sairanjit.tummalapalli@ayanworks.com> Signed-off-by: Ariel Gentile <gentilester@gmail.com> docs: update zoom meeting link (openwallet-foundation#1706) Signed-off-by: Timo Glastra <timo@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com> refactor(oob)!: make label optional (openwallet-foundation#1680) Signed-off-by: Timo Glastra <timo@animo.id> Co-authored-by: Ariel Gentile <gentilester@gmail.com> Signed-off-by: Ariel Gentile <gentilester@gmail.com> feat: support short legacy connectionless invitations (openwallet-foundation#1705) Signed-off-by: Timo Glastra <timo@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com> feat(dids)!: did caching (openwallet-foundation#1710) feat: did caching Signed-off-by: Timo Glastra <timo@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com> fix: jsonld document loader node 18 (openwallet-foundation#1454) Signed-off-by: Timo Glastra <timo@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com> build(deps): bump amannn/action-semantic-pull-request from 5.3.0 to 5.4.0 (openwallet-foundation#1656) build(deps): bump amannn/action-semantic-pull-request Bumps [amannn/action-semantic-pull-request](https://github.com/amannn/action-semantic-pull-request) from 5.3.0 to 5.4.0. - [Release notes](https://github.com/amannn/action-semantic-pull-request/releases) - [Changelog](https://github.com/amannn/action-semantic-pull-request/blob/main/CHANGELOG.md) - [Commits](amannn/action-semantic-pull-request@v5.3.0...v5.4.0) --- updated-dependencies: - dependency-name: amannn/action-semantic-pull-request dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Timo Glastra <timo@animo.id> Signed-off-by: Ariel Gentile <gentilester@gmail.com> feat: did rotate (openwallet-foundation#1699) Signed-off-by: Ariel Gentile <gentilester@gmail.com> refactor: pickup protocol method names Signed-off-by: Ariel Gentile <gentilester@gmail.com>
f8c03a3
to
5afcd33
Compare
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work @genaris! I have some nits, but those can also be addressed later.
packages/core/src/modules/message-pickup/protocol/MessagePickupProtocolOptions.ts
Show resolved
Hide resolved
packages/core/src/modules/message-pickup/protocol/MessagePickupProtocolOptions.ts
Outdated
Show resolved
Hide resolved
packages/core/src/modules/message-pickup/protocol/v1/V1MessagePickupProtocol.ts
Outdated
Show resolved
Hide resolved
* - `MessageForwardingStrategy.DirectDelivery` - Deliver message directly. Do not add into queue (it might be manually added after, e.g. in case of failure) | ||
* | ||
* @default MessageForwardingStrategy.DirectDelivery | ||
* @todo Update default to QueueAndLiveModeDelivery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When do we want to change the defualt?
Isn't this something that can be determined automatically? E.g. first try direct delivery (in case the mediatee has an endpoint), if it fails, try live delivery (if an open session), if it fails add to queue.
I think you'd often want a combination of 2 and 3 right? Or is direct delivery only direct live delivery (I'm assuming it is when you can send it directly to an endpoint, but maybe that's wrong. If so, maybe we should call it DirectLiveDelivery
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more about selecting if we want to encapsulate forwarded messages in Message Pickup's Delivery
message (so we can have explicit acknowledge) or we want to simply send them to either the mediatee endpoint or an existing session with them. Do you have a better wording to reflect what this setting does?
I think this way of sending forwarded messages should be deprecated at some point, but the encapsulated one should be used only in case the other party supports Message Pickup V2, and that's something that probably can be determined automatically if we do some smarter logic on the agent (e.g. we can query the other party for protocol support or mark it as supported as soon as we receive the first Pickup V2 protocol loop from them).
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
As promised, some progress in supporting missing features from RFC 0685, most notably the "Live Mode", in such a way that might be compatible with a multi-instance architecture as discussed in #1625.
Conceptually, it consists mostly in changes in Mediator role, where we can choose our strategy when a Forward message is received:
MessagePickupRepository
, so it will be in charge of manually trigger a delivery of queued messages: this can be possible becauseMessagePickupModule
now emits events when Live Sessions are opened and closed (so any consumer of an AFJ instance can subscribe to a central Message Pickup Repository and know if a newly queued message belongs to any of its connected clients), andMessagePickupApi
exposes a method to deliver any queued message using V2 protocol.On
mediatee
side, there is a newMediatorPickupStrategy
calledPickUpV2LiveMode
where, once it connects to Mediator WebSocket, it will set Live Delivery mode on (retrieving, as usual, any pending message). CurrentPickUpV2
mode will work in polling mode (sending astatus-request
message everymediatorPollingInterval
milliseconds. This represents a breaking change in terms of settings, as previously it was just sending a Trust Ping and expected the mediator to send messages implicitly.Some changes done:
MediatorModule
now has a config parameter to select its message forwarding strategyMessagePickupApi
includes Live Mode session service that keeps track of all connected clients using V2 protocol in Live Mode and exposes a method to deliver any queued message to them. When new sessions are added or removed, it emits eventsMessageRepository
was renamed toMessagePickupRepository
(any better naming is more than welcome!) and methods were changed to allow removal of messages by their id and filter by connectionId/recipientKeyMessagePickupRepository
is now completely responsibility ofMessagePickupModule
: an in-memory default implementation will be instantiated if not specified in its config or defined externally before instantiating the AgentTransportService
now emits events when sessions are created and removed. This was done mainly to be consumed byMessagePickupSessionService
, but they can also be useful externallyThere are several TODOs that I would like to handle in further PRs and probably move to 0.5.x or 0.6.0 so we can have more time to fix and improve the architecture:
MessageSender
dependency on messagePickupRepository, e.g. by emitting an event when a packed message should be added to the pickup queue (which will be caught by MessagePickupApi). This will allow to remove InjectionSymbol 'MessagePickupRepository'.Some issues related to the spec that we'll need to double check:
delivery
message always come as a response to adelivery-request
. Otherwise, how can I know what's the limit of messages I can batch on it (or if I should set recipient_key parameter)?