-
-
Notifications
You must be signed in to change notification settings - Fork 34
feat: MongoDB Consumer based on MongoDB ChangeStream subscription #258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
processors?: MongoDBProcessor<MessageType>[]; | ||
}; | ||
|
||
export type EventStoreDBEventStoreConsumerConfig< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FIX: There are some leftovers from EventStoreDB
inspiration in the naming, this will need to be adjusted 🙂
> = { | ||
consumerId?: string; | ||
|
||
processors?: MongoDBProcessor<MessageType>[]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CONCERN: Do we need MongoDBProcessor here? The intention behind consumers is that you can plug any processors you want, so consuming messages is decoupled from handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it was my mistake. Good catch!
connectionString: string; | ||
clientOptions?: MongoClientOptions; | ||
client?: never; | ||
onHandleStart?: ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QUESTION: Could you expand on the reasoning behind onHandleStart
and onHandleEnd
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I needed tha ton some point of the development, but now I dont :) Removed.
| FullDocument<EventType, EventMetaDataType, T> | ||
| UpdateDescription<ReadEvent<EventType, EventMetaDataType>>; | ||
|
||
export const mongoDBEventsConsumer = < |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
export const mongoDBEventsConsumer = < | |
export const mongoDBMessagesConsumer = < |
const processors = options.processors ?? []; | ||
const subscribe = _subscribe( | ||
options.changeStreamFullDocumentPolicy, | ||
client.db(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SUGGESTION: Eventually, we should also allow passing db, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
options: { processorId: string; partition?: string; collectionName?: string }, | ||
): Promise<ReadProcessorCheckpointResult> => { | ||
const result = await client | ||
.db() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SUGGESTION: Here, we'll also need to provide an option to pass db name through options.
}; | ||
}; | ||
|
||
export const generateVersionPolicies = async (db: Db) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QUESTION: Could you explain more on why is it needed and provide some reference links for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It checks the MongoDB version and returns a version-dependent configuration. Hm, actually, there is a difference between versions 5 and 6. What is your minimum support version?
event.data.productItem.productId, | ||
); | ||
|
||
if (event.data.productItem.productId === lastProductItemId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SUGGESTION: You can use a built-in stop after capabilities here to make this check easier. See
Line 189 in bd65823
stopAfter: (event) => |
} | ||
| { success: false; reason: 'IGNORED' | 'MISMATCH' }; | ||
|
||
export const storeProcessorCheckpoint = async <Position extends string | null>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SUGGESTION: It'd be great to add some integration tests for that.
options: { | ||
processorId: string; | ||
version: number | undefined; | ||
newPosition: null extends Position |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FIX: In the end we need to support here also other types of positions. We'll need to be passing checkpoint as we're doing in ESDB. Without it, we'll be able to only use mongo processor in the MongoDB event store, and we want in the end to use it also with other event stores.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
esdb supports only the bigint
export const storeProcessorCheckpoint = async <Position extends bigint | null>
Check out my changes, is it sth you were thinking of?
? { success: true, newPosition: options.newPosition } | ||
: { | ||
success: false, | ||
reason: result.matchedCount === 0 ? 'IGNORED' : 'MISMATCH', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SUGGESTION: I might be wrong, but this doesn't seem to be correct. IGNORED
is meant to be used when the stored checkpoint is bigger than the one we're passing (it means that it was already handled). Checking for matched count may mean multiple reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arturwojnar thanks for doing it!
I did a first round of review, I added some questions/comments/suggestions but overall, it looks like a great first step, after clarifications, adding more tests I'll be happy to merge it!
It'd be great also if you could update descriptions with the highlights of this PR.
@arturwojnar could you ensure that linter, build and tests are passing? See: https://github.com/event-driven-io/emmett/actions/runs/16644021116/job/47194905100?pr=258. Thanks in advance! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good start based off of the other consumers. Nothing that @oskardudycz hasn't already mentioned. I think some of the boillerplate can be removed since the addition of centralized consumer types and stuff.
Probably want to add some documentation that this consumer implementation requires change streams be enabled and can't be used in single-instance environments
|
||
stream = subscribe<Event, CheckpointType>(startFrom); | ||
|
||
void (async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oskardudycz I do it this way, let's say sequentially, one by one, instead relying on stream events (stream.on('data')
).
It is because, as it is done in tests for three following stream appends, the value of the lastCheckpoint
(processors.ts) doesn't have time to change because of lasting I/O operations. That causes that storeProcessorCheckpoint
throws MISMATCH and checkpoints get no persisted.
This solution ain't perfect, though, because there is still possibility for the race because at line 293 we have asynchronous handling of a processor function.
In my Hermes repo I handle this case by implementing a queue of messages to ACK, so messages can be finished processing out of order but still they will be stored / ACK in order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arturwojnar check ESDB implementation, there it’s handled with custom transform and callbacks
async _transform( |
This gives a guarantee of sequential processing and passing it to producer. That should eliminate getting out of order and race conditions. I’m considering to also apply the similar pattern inside processors.
} | ||
}, | ||
close: async () => { | ||
if (stream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same as stop... :/
This PR intends to provide a consumer relying on the MongoDB Change Streams.
Example of what caused me thinking about this feature 👇
☝️ This is clipping of a projection's evolve function. As you see, to achieve the reactiveness I did something very ugly 🤮 - I converted the evolve into an asynchronous function and on processing the
B2BInstitutionUsedLicencesIncreased
event, I append to a stream another event -B2BLincenceObtained
.The latter event is a consequence of the first, but it should not be implicitly and directly coupled to the first one. Especially, if that happens in the projection's internals.
So, the reactiveness is the clue of this PR.
MongoDB brings the Change Stream functionality. This is a simple pull mechanism on MongoDB's oplog.
By subscribing to the Change Stream and storing last processed message's position (here's called a token) we implement, in fact, a simple version of the outbox pattern.
Here's the fixed version: