Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Applied events log starts at zero #141

Open
robertlemke opened this issue Apr 27, 2017 · 17 comments
Open

Applied events log starts at zero #141

robertlemke opened this issue Apr 27, 2017 · 17 comments

Comments

@robertlemke
Copy link
Member

Here's something which caused quite some trouble for me today:

I introduced a new process manager (the same would apply to any other event listener) which was supposed to run a couple of commands when certain events arrived.

Now, because the "applied events log" starts with 0 if no event has ever been processed by a particular event listener, all events from the beginning of time will be applied. Which means, in my case, that Docker images were built (well, the process manager tried at least) for projects and code which doesn't exist anymore at present, because the events were from a long time ago.

Here's the related code:

    public function getHighestAppliedSequenceNumber(string $eventListenerClassName): int
    {
        $eventListenerIdentifier = $this->renderEventListenerIdentifier($eventListenerClassName);
        $appliedEventsLog = $this->entityManager->find(AppliedEventsLog::class, $eventListenerIdentifier);
        return ($appliedEventsLog instanceof AppliedEventsLog ? $appliedEventsLog->highestAppliedSequenceNumber : 0);
    }

I think that the default should be the currently highest sequence number in the event store instead of 0. Opinions?

@albe
Copy link
Member

albe commented Apr 27, 2017

Hmm, I'm not sure tbh. I think the default itself is sane. If nothing is in the AppliedEventsLog, it should be assumed that it's a completely new Listener that needs to catch up from the start.

If that is not intended in some circumstances (like with process managers, that do side-effects and hence replay/catchup is wrong), we should maybe provide means (via CLI command or sth) to "mark all events applied".

@robertlemke
Copy link
Member Author

we should maybe provide means (via CLI command or sth) to "mark all events applied"

The question is how this could be nicely integrated in an automatic deployment. For projections I introduce the --only-empty flag for ./flow projection:replayall which I can run on every deployment. If I want a projection to be replayed (usually because it has a schema change), I simply provide a Doctrine migration which creates the projection table with a new name (..._v3).

I could imagine a CLI command but it's pretty specific to this case ... Something like:

./flow event:sethighestappliedsequencenumber --listener 'Flownative\Beach\Domain\ProcessManager\AutomaticImageBuildProcessManager' 1403

./flow event:sethighestappliedsequencenumber --listener 'Flownative\Beach\Domain\ProcessManager\AutomaticImageBuildProcessManager' --at-least 1403

The first call would set the highest applied sequence number for the given event listener to 1403, no matter what it was before. The second call would set it to 1403 if it was 0 or less than 1403 or leave it alone if it was higher than 1403.

@bwaidelich
Copy link
Member

I agree to what @albe wrote, starting from 0 is the default behavior I would expect.
Maybe we can find another way to solve this. In general projections should always be replayable IMO.

@albe
Copy link
Member

albe commented Apr 28, 2017

Here's an idea we came up with at the weekly:

Processes have the very clear concept of a lifecycle, as is described in e.g. https://leanpub.com/esversioning/read#leanpub-auto-versioning-process-managers. This means that a Process knows exactly when it is in effect, or generically if it is already active and in use or not yet/no more.

There are some ways this could be implemented:

  • let this be a concern of the domain logic, i.e. the user, to just ignore events that come in according to some rules

Pro: nothing to implement from our side, apart from properly document this
Con: All events still need to be passed through every new Process; user needs to fully take care himself

  • extend AbstractProcessManager (or create ProcessManagerInterface?) with some isActive(...some arguments): bool method that needs to be implemented by the user, which decides on a per case basis if the process should already be in effect and handle incoming events (especially in the case of catchUp after a new deployment). For that the "some Arguments" would probably have to be the event it should receive, including metadata like the sequenceNumber, i.e. the rawEvent.

Pro: it is very explicit and makes the lifecycle a core concept of the ProcessManager
Con: it would still unnecessarily need to load and run through all old events, in order to decide if they need to be skipped

  • Not discussed in meeting, just came up after writing the above: extend AbstractProcessManager with a method like getLifecycleFilter(): EventStreamFilterInterface (probably extended with getMin/MaxOccurenceTimestamp?), which will be applied when fetching events from the store to catchUp the ProcessManager

Pro: Makes it explicit, but on a generic enough level, what the "scope" of the ProcessManager lifecycle is; It is optimal from performance perspective
Con: EventStreamFilter is probably too generic, since it makes little sense for the ProcessManager to restrict itself on a streamName/Prefix (YAGNI), or the EventTypes (which is done automatically anyway); would need to extend EventStreamFilter with some datetime based filtering to be really useful

  • Also not discussed and maybe an "easy way out" for now: Only change AbstractAsynchronousProcessManager::getHighestAppliedSequenceNumber() to return the current store sequenceNumber if 0 is returned from the appliedEventsLogRepository, since this somehow makes sense as default for ProcessManagers (only)

@robertlemke
Copy link
Member Author

So, it happened again, this time on a bigger scale in production. I refactored the name of a PHP namespace, which contained a couple of process managers. Can you guess what happened? Because the table neos_eventsourcing_eventlistener_appliedeventslog did not contain any matching entries for the affected process managers, the event loop started to replay all events for these "new" listeners.

As far as I can tell, the worst thing which happened was lots of notification emails sent to Beach users, and I need to remove a couple of nonsense events and replay projections. But it could have gone really wrong.

My consequence for now is that I rather create a new (Flownative) package with a process manager which has some more protection measures (like I wrote earlier). Let's see, maybe when I have more practical experience with this, the code can be eventually become part of the event sourcing package again.

@albe
Copy link
Member

albe commented Sep 29, 2017

What a bummer :/ Sorry that happened (again). Would the last point have avoided the issue? Afais it would, no?

Only change AbstractAsynchronousProcessManager::getHighestAppliedSequenceNumber() to return the current store sequenceNumber if 0 is returned from the appliedEventsLogRepository, since this somehow makes sense as default for ProcessManagers (only)

We could implement that pretty quickly to have a solution for now and then rethink a broader concept if necessary.

@hlubek
Copy link
Contributor

hlubek commented Sep 29, 2017

The issue sounds to me like we should have less "magic", speak inferred logic in this part. I like the notion of a "durable name" like in https://nats.io/documentation/streaming/nats-streaming-intro/:

Durable subscriptions - Subscriptions may also specify a “durable name” which will survive client restarts. Durable subscriptions cause the server to track the last acknowledged message sequence number for a client and durable name. When the client restarts/resubscribes, and uses the same client ID and durable name, the server will resume delivery beginning with the earliest unacknowledged message for this durable subscription.

And also a feature where a process manager (or event listener) could specify the start position in the stream:

Historical message replay by subject - New subscriptions may specify a start position in the stream of messages stored for the subscribed subject’s channel. By using this option, message delivery may begin at:
The earliest message stored for this subject
The most recently stored message for this subject, prior to the start of the current subscription. This is commonly thought of as “last value” or “initial value” caching.
A specific date/time in nanoseconds
An historical offset from the current server date/time, e.g. the last 30 seconds.
A specific message sequence number

So the the process manager (or event listener) would have more control and e.g. the start position could be specified in a setting for a specific deployment because it might have a special business need.

Like I discussed with Robert already I think we have quite some patterns in this package that aren't unique to the event sourcing architecture but instances of a work queue, message publishing with exact once delivery and so on, so we should treat the problems correctly and use the "right" abstractions.

@bwaidelich
Copy link
Member

bwaidelich commented Sep 29, 2017

I share Christophers concern about too much magic..
A "durable name" might improve things, but the root cause is that we have an at-least-once-delivery so Process Managers should be prepared to process the same event twice.
I'm not sure what exactly that means in terms of implementation, but IMO we should differenciate between the different scenarios:

1. A new PR should only listen to future Events

For this I'd expect a new command that allows to (re)set the highestAppliedSequenceNumber (we need that anyways in order to replay a PR w/o having to update the DB manually).
This command could be called via Deployment Scripts, too. It's an extra step but I would highly prefer this explicitness over some "magic default".

Alternatively I could imagine some interface that tells the system from what time the PR should be active (we would need a new EventStore filter for that to be performant probably).

2. A PR should not process Events it has already processed

Instead of (only) relying on the highestAppliedSequenceNumber a good practice for a PR is probably to manage its state in a way that prevents such things.
For example, the common SignUpConfirmationMail-PR:
First of all it should probably skip sign-up events that are X days old.
And then it should probably keep a state like this (simplified):

correlationId sent
abc 0
def 1
...

and only add new rows for new correlation ids
(the correlationId in that case uniquely reference individual sign-ups)

Implementing a "general-purpose" exactly-once-delivery could also be an option, but this is quite complex and maybe impossible ;)

@albe
Copy link
Member

albe commented Sep 30, 2017

I like the notion of a "durable name"

Yes, this is a very core concept of pubsub systems and basically what we do already with async EventListeners (and hence PMs). The basic concept is that the information of what messages have been processed is stored with/in the consumer. The difference being that the "name" is decided by the EventListener/ProcessManager class.

Historical message replay by subject - New subscriptions may specify a start position in the stream of messages stored for the subscribed subject’s channel. By using this option, message delivery may begin at:
The earliest message stored for this subject
The most recently stored message for this subject, prior to the start of the current subscription. This is commonly thought of as “last value” or “initial value” caching.
A specific date/time in nanoseconds
An historical offset from the current server date/time, e.g. the last 30 seconds.
A specific message sequence number

I like this a lot. I don't have use cases for all from above in my mind, but they sound reasonable. Is there also a case for "from last" vs. "from next"?

For this I'd expect a new command that allows to (re)set the highestAppliedSequenceNumber

Why a command for starting a process? Do we start processes with a command now already? Processes are rather started by some events (a process is an event-driven state machine), and I wouldn't want to have such "process meta information" in the event payloads (or even event metadata). IMO this is an inherent information of the process definition - it specifies the lifecycle of the process. So we're back at what I wrote earlier, no?

Alternatively I could imagine some interface that tells the system from what time the PR should be active (we would need a new EventStore filter for that to be performant probably).

See point no. 3 from my post from April.

A PR should not process Events it has already processed

First of all, this is not what this issue is really about. A new PM that starts will not have anything processed yet, but he still might want to only start processing "from now/point X" (and possibly even only "until point Y", ie. have a clear lifecycle).

Anyway, yes, but unless we implement generic exactly-once-processing semantics (which only Apache Kafka managed to do lately), this will be a concern of the process manager implementation. So the best thing we can do (until then), is to clearly document this and give different approaches to achieve the intended goal.

Since we currently only provide at-least-once semantics inside the PM, the PM has to make his processing of events idempotent, meaning that a message that is incoming a second time does not change it's state (and hence not produce any side-effects, since those only happen on state transition).
This can be done by checking if the incoming message has already been processed, either by storing an "already processed" counter, i.e. the "highestAppliedSequenceNumber", or, as you mentioned, by keeping a list of already processed unique message Ids. But here comes the real issue: unless updating the counter/id list happens within a single transaction with the PM state-change/side-effect, you don't win anything.

So let's go with the example of the e-mail sending process, because it's probably the most obvious use-case, and look what happens in which order:
The PM changes it's (FSM) state due to a new incoming Event, invoking a method to process the state-change effects:

  • The PM possibly persists some data to a database/storage
  • The PM sends an e-mail
  • The PM persists it's "highestAppliedSequenceNumber"

unless all those three happen transactionally, i.e. all-or-none, this will still be "at-least-once" with possible duplicate processing/side-effects. 1. + 3. can be solved by persisting the PM data inside the same database as the "highestAppliedSequenceNumber" and wrapping both in a transaction (let's discuss this in a separate issue, it would be a cool feature for a lot of use-cases!). Easy enough as long as we're running ontop of a relational database. But the sending of the e-mail can not be achieved transactionally with the database updates. It may still happen that the e-mail is sent and the update to the "highestAppliedSequenceNumber" fails, rolling back the database transaction. The next time, this will be repeated leading to duplicate e-mails. So the user still needs to make a decision if he prefers "possibly duplicate", or "possibly not sent" e-mails (for that he'd need to postpone the actual sending until after the persisting of the applied sequence number).

Note: The first is much less likely to happen though, because the "update highest processes sequence number" is very unlikely to fail.

Note2: It doesn't matter if we push 1. + 2. into another part of the system by just dispatching a command to "send e-mail" from the PM. We now just have the two problems of transactionally dispatching the command together with the sequence number update and solving the same "exactly-once-processing" issue on the handling side of the command.

@bwaidelich
Copy link
Member

bwaidelich commented Oct 2, 2017

Hey,

For this I'd expect a new command that allows to (re)set the highestAppliedSequenceNumber

Why a command for starting a process? Do we start processes with a command now already?

A CLI Command ;)

See point no. 3 from my post from April.

Where did you post that, do you have a link?
Edit: Nevermind, you meant #141 (comment) I assume ;)

A PR should not process Events it has already processed

First of all, this is not what this issue is really about.

Uh no? I think it's at least related and I know that Robert had issues with that.

But here comes the real issue: unless updating the counter/id list happens within a single transaction with the PM state-change/side-effect, you don't win anything.

Right, that's what I tried to demonstrate with the example table:
an entry would be made with sent == 0 when processing the event (unless there is already an entry for the given id).
Another option would be to block any other event handling during that time but that's not a viable solution for longer-running things (like sending an email)

@albe
Copy link
Member

albe commented Oct 4, 2017

A CLI Command ;)

Dang! Unubiquitous language

Edit: Nevermind, you meant #141 (comment) I assume ;)

Exactly :)

Uh no?

Yeah, "do not do the same work twice" vs. "start doing work from point X". And yes, they are related when the latter is in context of "resume from previous", but not if it starts from new and should just ignore everything prior to X.

Another option would be to block any other event handling during that time

Better not do that unless unavoidable. But I totally did not see the parallel processing issue here. Needs some more thought

@albe
Copy link
Member

albe commented Oct 11, 2017

Addition regarding

Only change AbstractAsynchronousProcessManager::getHighestAppliedSequenceNumber() to return the current store sequenceNumber if 0 is returned from the appliedEventsLogRepository, since this somehow makes sense as default for ProcessManagers (only)

This is not as easily possible, because for that the ProcessManager would need a direct dependency on the EventStore (via EventStoreManager) and the EventStore would need to open up the getStreamVersion(EventStreamFilter $filter) method, which is suboptimal as it adds coupling.

I'm currently in favor of going with

extend AbstractProcessManager with a method like getLifecycleFilter(): EventStreamFilterInterface (probably extended with getMin/MaxOccurenceTimestamp?), which will be applied when fetching events from the store to catchUp the ProcessManager

Which in turn would fit what Christopher said about Historical message replay by subject, because we'd need a time based filter inside EventStreamFilterInterface for that anyway and hence could provide filters for all the mentioned cases.

What do you all think?

@bwaidelich
Copy link
Member

I've been thinking about this again, talked to Robert and finally took the time to actually read all the comments here (previously I frankly only skimmed the text between other tasks).

@albe the link to Gregs book was helpful! I especially like the part about Event Sourced Process Managers and I think that would be a solid solution for many issues related to the persistence of PMs (in short it means that a PM works like an aggregate and it's state is reconstituted from events whenever it receives a new message).

This won't solve the "this PM should only be active from timestamp x"-problem on it's own, but IMO your suggested solution of a "lifecycle filter" works well with that approach:

Quite possible that I miss something, but I could imagine the following steps:

1. Make correlationId a core concept

We should be able to publish events with a correlationId and to find those in the EventStore.
A simple implementation could be:

$eventWithCorrelationId = EventWithCorrelationId::fromEvent($myDomainEvent, $someCorrelationId);
$this->eventPublisher->publish($streamName, $eventWithCorrelationId);

(similar to #152)

2. Extend EventStreamFilter

..with minimumRecordedAt, maximumRecordedAt, correlationId (or similar)

3. Add a method to the PM interface that returns a EventStreamFilterInterface

..or maybe, more explicit, new interfaces with getters for the minimumRecordedAt timestamp and another one for the correlationId

4. Replay events on the PM when it's invoked

When an event is to be handled by an Event Sourced PM, we would fetch all relevant events from the store (by applying the filter from the new methods above + the event types the PM handles).
And then reconstitute the PM similar to an aggregate and finally let it handle the new event(s).

Would that work?

@albe
Copy link
Member

albe commented Oct 12, 2017

  1. Make correlationId a core concept

That would be a good thing I guess. We need to take care of two things though:

  • Adding EventWithCorrelationId will probably lead to some nasty construction chains: EventWithCorrelationId::fromEvent(new EventWithMetadata(new MyDomainEvent($somethingSomething, ...), $metadata), $correlationId).
    A correlationId IMO is part of the event metadata, so maybe we should incorporate that into the existing EventWithMetadata class
  • The concept of correlationId is not obvious, so we need to make sure that it is used correctly. However, for this to be the case, we need to apply some magic dust (e.g. automatically generating/setting Ids on incoming commands/new events), or lots and lots of very comprehensive documentation.

We already started a discussion about correlationId (and causationId) in #6 (comment) ff and #115. So maybe we should just start a new issue only on this topic and move+join the previous discussion points there?

  1. Extend EventStreamFilter

I think we're in agreement on this then :) #readytoimplement

  1. Add a method to the PM interface that returns a EventStreamFilterInterface

I'd prefer this over separate getMinimumRecordedAt/getMaximumRecordedAt/getCorrelationId methods, because it more cleanly encapsulates a single concern (the process lifecycle). Also the method names are rather technical (though exact).

  1. Replay events on the PM when it's invoked

Getting back to the issue about PMs not being able to replay, since they cause side effects. So I guess you're thinking about replaying the PM State projection.
Conceptually yes, but IMO it's an implementation detail of the PM if it stores it's internal state in form of a snapshot or not. Considering the PM state is generally very concise since it's just a FSM, I'd lean towards storing snapshots as that should be very cheap too. Assuming we do use the existing Projection for the state, that would already be a given. If we implement PMs more on the side of an Aggregate, we need to think of a good way to separate the (re)building of the state from triggering the side-effects, otherwise we end up in the same issue again.

I could imagine the latter something along these lines:

  • the PM whenFooHappened() methods are building up the state and do not execute any logic
  • an internal logic in the base implementation triggers state changes according to the definition of the FSM and the incoming events
  • the PM implements onSomeStateChange() methods that should actually contain the side-effects (emit command, send some e-mail, yadda yadda)
  • on replay, the triggering of state change callbacks is supressed

@bwaidelich
Copy link
Member

bwaidelich commented Oct 12, 2017

We already started a discussion about correlationId (and causationId) in #6 (comment)

Yes, I already assigned the issue to myself and pushed a first PR: #157

Extend EventStreamFilter

I think we're in agreement on this then

With the PR above I tried to make this easier to extend

Considering the PM state is generally very concise since it's just a FSM

FSM? Fliegendes Spaghettimonster? :)

I think, we can really just re-use most of the logic we already have for the EventSourcedAggregateRoot with a few differences:

  • The state is not "manually" reconstituted via some Repository::get() call, but upon every event that is handled by the PM
  • To find out which events to load, we could look at the event types the PM handles plus the correlationId of the new event (If that's not flexible enough, the PM could have a method getEventStreamFilter(EventInterface $eventToHandle): EventStreamFilterInterface)
    • The reconstitution could be done similar to what we do now: call_user_func($this->eventSourcedProcessManagerClassName . '::reconstituteFromEventStream', $eventStream);
  • Finally the new event(s) are send to the corresponding when*() methods
  • ...and SomeEventSourcedPM::pullUncommittedEvents() are published (note that an event-sourced PM does not dispatch commands but events according to Gregs book ("Note that Event Sourced Process Managers tend to only receive and raise events, they will not as example open up a HTTP connection and directly interact with some thing directly.")

Question: To which stream to publish those events to. Again I think we can use a mechanism similar to what we already have with the Aggregate Roots, but instead of <Package.Key>:<AggregateClassName>:<AggregateIdentifier> it could be <Package.Key>:<ProcessManagerClassName>:<CorrelationId>.

@albe
Copy link
Member

albe commented Oct 16, 2017

FSM? Fliegendes Spaghettimonster? :)

Finite State Machine. But the Spaghettimonster sometimes would fit too ;)

The state is not "manually" reconstituted via some Repository::get() call, but upon every event that is handled by the PM

As said, I would make that one option, but not the only one (and maybe not even the default). Seeing the PM as a FSM basically means that it just needs to store it's current state, which is a single value. Of course you can recreate this state by reapplying all events, but

  • unless we have a good optimization for loading only the events that apply to a single process (i.e. an instance, not a whole process manager), this is becoming overhead by a lot. The correlationId can help there, but I'm not sure all process-relevant events can be found with a single correlationId. Take the example of "receive bonus when a recommended friend registers" process - you'll hardly have a single correlationId around over the (infinite) time this process takes.

  • it makes thinking about the process itself a bit harder IMO, because for every state change you first need to come up that state by following all events. You don't have an anchor-point to say: This process was in this state and then X happened (thinking debugging here and looking at the system internals)

To find out which events to load, we could look at the event types the PM handles plus the correlationId of the new event

What I found nice is to think about each PM and each Projection defining their own EventStream. Technically this boils down to building an index (like https://eventstore.org/blog/20130218/projections-5-indexing/) or in our case a specific EventFilter, since in MySQL you don't want to create a lot of indexes and building an own table is also not a good solution. But conceptually, you gain a lot of freedom to think about your specific PM/Projection, because you can basically assume they have their own record of things that only belongs to them.

note that an event-sourced PM does not dispatch commands but events according to Gregs book

I need to (re)read that part. TBH I'm not so sure about this, because it has some consequences and also doesn't quite fit my concept of a "Manager". I recently read a nice quote about them: "Managers don't do useful things on their own, they just order other people to do useful things" - and IMO this also applies to ProcessManagers in this case. Also, if the PM only raises events but does not open a (HTTP) connection - how would he send an e-mail? He somehow needs to either do it himself (don't do that) or order some other service (local or remote) to do it.

@bwaidelich
Copy link
Member

As said, I would make that one option

Yes, absolutely. I could imagine a new interface (EventSourceProcessManagerInterface or similar) that can be implemented to opt in to this behavior.

I'm not sure all process-relevant events can be found with a single correlationId

mh, yes, good point. But everything I read about PM implementation (I could not find that much tbh) assumes that a PM operates on events with the same correlation id.
And I can't think of a case where that wouldn't work:

Take the example of "receive bonus when a recommended friend registers" process

The process starts with the recommendation, which would get a correlation id. If a user registers upon such an invitation, that "was registered" event gets the same correlation id.

it makes thinking about the process itself a bit harder IMO, because for every state change you first need to come up that state by following all events

I don't really see that. IMO it's exactly the same as with a persistent PM, just with a local state instead of a persisted one. Basically like an aggregate vs a projector.

What I found nice is to think about each PM and each Projection defining their own EventStream

Yes, that could be a general (optional) interface any EventListener can implement. It would be a powerful tool, but quite "low-levelly"

an event-sourced PM does not dispatch commands [...]

I need to (re)read that part. TBH I'm not so sure about this

I'm also not 100% sure yet, why this would be a requirement. But I think it is in order to avoid that a command is dispatched twice (i.e. duplicate email is sent)

how would he send an e-mail?

Easy, it would publish an event, some EmailEventListener listens to.
For the "recommendation bonus" example above, the event alone might be enough to update the (read)model.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants