Skip to content

feat: MongoDB Consumer based on MongoDB ChangeStream subscription #258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

arturwojnar
Copy link

@arturwojnar arturwojnar commented Jul 31, 2025

This PR intends to provide a consumer relying on the MongoDB Change Streams.

Example of what caused me thinking about this feature 👇

switch (type) {
  case 'B2BInstitutionUsedLicencesIncreased':
    if (state._disabled) {
      throw new InstitutionInvalidState({
        details: { currentState: 'deleted' },
        message: `Cannot use a licence of a disabled institution.`,
      })
    }

    // Hack.
    const licenceId = generateLicenceId()
    await getEventStore().appendToStream<LicenceEvent>(
      toStreamName(LICENCES_STREAM_TYPE, event.patientId.toString()),
      [
        literalObject<B2BLincenceObtained>({
          type: 'B2BLincenceObtained',
          metadata: {
            patientId: event.patientId,
            institutionId: metadata.institutionId,
            licenceId,
          },
          data: { obtainedAt: event.timestamp, duration: state.licenceDuration as Duration },
        }),
      ],
    )

    return literalObject<B2BInstitution>({
      ...state,
      usedLicences: state.usedLicences + 1,
      availableLicences: state.availableLicences - 1,
    })
}

☝️ This is clipping of a projection's evolve function. As you see, to achieve the reactiveness I did something very ugly 🤮 - I converted the evolve into an asynchronous function and on processing the B2BInstitutionUsedLicencesIncreased event, I append to a stream another event - B2BLincenceObtained.

The latter event is a consequence of the first, but it should not be implicitly and directly coupled to the first one. Especially, if that happens in the projection's internals.

So, the reactiveness is the clue of this PR.

MongoDB brings the Change Stream functionality. This is a simple pull mechanism on MongoDB's oplog.

By subscribing to the Change Stream and storing last processed message's position (here's called a token) we implement, in fact, a simple version of the outbox pattern.

Here's the fixed version:

consumer.reactor<B2BInstitutionUsedLicencesIncreased>({
  processorId: v4(),
  eachMessage: async (event) => {
    await getEventStore().appendToStream<LicenceEvent>(
      toStreamName(LICENCES_STREAM_TYPE, event.patientId.toString()),
      [
        literalObject<B2BLincenceObtained>({
          type: 'B2BLincenceObtained',
          metadata: {
            patientId: event.patientId,
            institutionId: metadata.institutionId,
            licenceId,
          },
          data: { obtainedAt: event.timestamp, duration: state.licenceDuration },
        }),
      ],
    )
  },
  connectionOptions: {
    client,
  },
});

@arturwojnar arturwojnar marked this pull request as draft July 31, 2025 08:31
processors?: MongoDBProcessor<MessageType>[];
};

export type EventStoreDBEventStoreConsumerConfig<
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIX: There are some leftovers from EventStoreDB inspiration in the naming, this will need to be adjusted 🙂

> = {
consumerId?: string;

processors?: MongoDBProcessor<MessageType>[];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CONCERN: Do we need MongoDBProcessor here? The intention behind consumers is that you can plug any processors you want, so consuming messages is decoupled from handling.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it was my mistake. Good catch!

connectionString: string;
clientOptions?: MongoClientOptions;
client?: never;
onHandleStart?: (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QUESTION: Could you expand on the reasoning behind onHandleStart and onHandleEnd?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I needed tha ton some point of the development, but now I dont :) Removed.

| FullDocument<EventType, EventMetaDataType, T>
| UpdateDescription<ReadEvent<EventType, EventMetaDataType>>;

export const mongoDBEventsConsumer = <
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
export const mongoDBEventsConsumer = <
export const mongoDBMessagesConsumer = <

const processors = options.processors ?? [];
const subscribe = _subscribe(
options.changeStreamFullDocumentPolicy,
client.db(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUGGESTION: Eventually, we should also allow passing db, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image That happens here, right?

options: { processorId: string; partition?: string; collectionName?: string },
): Promise<ReadProcessorCheckpointResult> => {
const result = await client
.db()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUGGESTION: Here, we'll also need to provide an option to pass db name through options.

};
};

export const generateVersionPolicies = async (db: Db) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QUESTION: Could you explain more on why is it needed and provide some reference links for that?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It checks the MongoDB version and returns a version-dependent configuration. Hm, actually, there is a difference between versions 5 and 6. What is your minimum support version?

event.data.productItem.productId,
);

if (event.data.productItem.productId === lastProductItemId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUGGESTION: You can use a built-in stop after capabilities here to make this check easier. See

}
| { success: false; reason: 'IGNORED' | 'MISMATCH' };

export const storeProcessorCheckpoint = async <Position extends string | null>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUGGESTION: It'd be great to add some integration tests for that.

options: {
processorId: string;
version: number | undefined;
newPosition: null extends Position
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIX: In the end we need to support here also other types of positions. We'll need to be passing checkpoint as we're doing in ESDB. Without it, we'll be able to only use mongo processor in the MongoDB event store, and we want in the end to use it also with other event stores.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

esdb supports only the bigint

export const storeProcessorCheckpoint = async <Position extends bigint | null>

Check out my changes, is it sth you were thinking of?

? { success: true, newPosition: options.newPosition }
: {
success: false,
reason: result.matchedCount === 0 ? 'IGNORED' : 'MISMATCH',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUGGESTION: I might be wrong, but this doesn't seem to be correct. IGNORED is meant to be used when the stored checkpoint is bigger than the one we're passing (it means that it was already handled). Checking for matched count may mean multiple reasons.

Copy link
Collaborator

@oskardudycz oskardudycz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arturwojnar thanks for doing it!

I did a first round of review, I added some questions/comments/suggestions but overall, it looks like a great first step, after clarifications, adding more tests I'll be happy to merge it!

It'd be great also if you could update descriptions with the highlights of this PR.

@oskardudycz
Copy link
Collaborator

@arturwojnar could you ensure that linter, build and tests are passing? See: https://github.com/event-driven-io/emmett/actions/runs/16644021116/job/47194905100?pr=258. Thanks in advance!

Copy link
Collaborator

@alex-laycalvert alex-laycalvert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good start based off of the other consumers. Nothing that @oskardudycz hasn't already mentioned. I think some of the boillerplate can be removed since the addition of centralized consumer types and stuff.

Probably want to add some documentation that this consumer implementation requires change streams be enabled and can't be used in single-instance environments


stream = subscribe<Event, CheckpointType>(startFrom);

void (async () => {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oskardudycz I do it this way, let's say sequentially, one by one, instead relying on stream events (stream.on('data')).

It is because, as it is done in tests for three following stream appends, the value of the lastCheckpoint (processors.ts) doesn't have time to change because of lasting I/O operations. That causes that storeProcessorCheckpoint throws MISMATCH and checkpoints get no persisted.

This solution ain't perfect, though, because there is still possibility for the race because at line 293 we have asynchronous handling of a processor function.

In my Hermes repo I handle this case by implementing a queue of messages to ACK, so messages can be finished processing out of order but still they will be stored / ACK in order.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arturwojnar check ESDB implementation, there it’s handled with custom transform and callbacks

This gives a guarantee of sequential processing and passing it to producer. That should eliminate getting out of order and race conditions. I’m considering to also apply the similar pattern inside processors.

}
},
close: async () => {
if (stream) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as stop... :/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants