Conversation
|
Looks like this PR is not ready to merge, because of the following issues:
Please fix the issues and try again If you have any trouble, please check the PR guidelines |
|
WalkthroughRemoves oplog-based observer recovery and metrics, disables oplog unconditionally, and introduces a token-driven reactivity system that monkey-patches observeChanges to manage per-user token callbacks and session invalidation. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client Observer
participant Connection as Connection._observeChanges
participant UserReactivity as userReactivity
participant Mongo as Mongo Users Collection
participant Service as Meteor Service
Client->>Connection: start observeChanges(users, fields:{tokens})
Connection->>UserReactivity: delegate observe hook
UserReactivity->>Mongo: initial fetch matching records
Mongo-->>UserReactivity: return records
UserReactivity->>UserReactivity: store per-user hashed-token callbacks
UserReactivity-->>Client: emit added/ready events
Service->>UserReactivity: processOnChange(diff, userId)
UserReactivity->>UserReactivity: detect removed tokens for userId
UserReactivity-->>Client: emit removed callbacks (session invalidation)
Client->>Service: client handles invalidation
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested Labels
Suggested Reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #38313 +/- ##
===========================================
+ Coverage 70.73% 70.88% +0.14%
===========================================
Files 3158 3162 +4
Lines 109359 109781 +422
Branches 19695 19719 +24
===========================================
+ Hits 77358 77817 +459
+ Misses 29966 29940 -26
+ Partials 2035 2024 -11
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@apps/meteor/server/services/meteor/userReactivity.ts`:
- Around line 17-72: The override
MongoInternals.Connection.prototype._observeChanges must be synchronous and
return a handle immediately (not an async function); change the async function
to a normal function that starts the initial DB query asynchronously,
immediately returns an object with stop(), isReady (initially false) and
isReadyPromise (a Promise resolved when the initial query completes), and when
the async query finishes set isReady=true and resolve isReadyPromise and invoke
callbacks.added as currently implemented; also forward additional query options
(e.g., sort, limit, skip, hint, etc.) from the options param when calling
mongo.rawCollection(...).find(...) so you don't only pass projection/fields;
keep the userCallbacks and serviceConfigCallbacks logic but ensure stop() can
cancel/cleanup even if called before the initial query resolves.
🧹 Nitpick comments (3)
apps/meteor/packages/rocketchat-mongo-config/server/index.js (1)
7-8: Drop the inline comment to match repo guidance.
The assignment is self-explanatory; keep the rationale out of the implementation.As per coding guidelines, Avoid code comments in the implementation.🧹 Suggested change
-// we always want Meteor to disable oplog tailing Package['disable-oplog'] = {};apps/meteor/server/services/meteor/userReactivity.ts (2)
11-16: Remove inline comments and the commented-out debug line.
These are implementation notes; keep code comment-free per repo guidance.As per coding guidelines, Avoid code comments in the implementation.🧹 Suggested change
-// Stores the callbacks for the disconnection reactivity bellow const userCallbacks = new Map(); -// Overrides the native observe changes to prevent database polling and stores the callbacks -// for the users' tokens to re-implement the reactivity based on our database listeners const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); @@ - // console.error('Connection.Collection.prototype._observeChanges', collectionName, selector, options); @@ -// Re-implement meteor's reactivity that uses observe to disconnect sessions when the token -// associated was removed export const processOnChange = (diff: Record<string, any>, id: string): void => {Also applies to: 33-33, 74-76
83-90: Clean up empty per-user callback sets.
After removals, empty sets remain inuserCallbacks; deleting them avoids unnecessary retention.🧹 Suggested change
if (cbs) { [...cbs] .filter(({ hashedToken }) => tokens === undefined || !tokens.includes(hashedToken)) .forEach((item) => { item.callbacks.removed(id); cbs.delete(item); }); + if (cbs.size === 0) { + userCallbacks.delete(id); + } }
| MongoInternals.Connection.prototype._observeChanges = async function ( | ||
| { | ||
| collectionName, | ||
| selector, | ||
| options = {}, | ||
| }: { | ||
| collectionName: string; | ||
| selector: Record<string, any>; | ||
| options?: { | ||
| projection?: Record<string, number>; | ||
| fields?: Record<string, number>; | ||
| }; | ||
| }, | ||
| _ordered: boolean, | ||
| callbacks: Callbacks, | ||
| ): Promise<any> { | ||
| // console.error('Connection.Collection.prototype._observeChanges', collectionName, selector, options); | ||
| let cbs: Set<{ hashedToken: string; callbacks: Callbacks }>; | ||
| let data: { hashedToken: string; callbacks: Callbacks }; | ||
| if (callbacks?.added) { | ||
| const records = await mongo | ||
| .rawCollection(collectionName) | ||
| .find(selector, { | ||
| ...(options.projection || options.fields ? { projection: options.projection || options.fields } : {}), | ||
| }) | ||
| .toArray(); | ||
|
|
||
| for (const { _id, ...fields } of records) { | ||
| callbacks.added(String(_id), fields); | ||
| } | ||
|
|
||
| if (collectionName === 'users' && selector['services.resume.loginTokens.hashedToken']) { | ||
| cbs = userCallbacks.get(selector._id) || new Set(); | ||
| data = { | ||
| hashedToken: selector['services.resume.loginTokens.hashedToken'], | ||
| callbacks, | ||
| }; | ||
|
|
||
| cbs.add(data); | ||
| userCallbacks.set(selector._id, cbs); | ||
| } | ||
| } | ||
|
|
||
| if (collectionName === 'meteor_accounts_loginServiceConfiguration') { | ||
| serviceConfigCallbacks.add(callbacks); | ||
| } | ||
|
|
||
| return { | ||
| stop(): void { | ||
| if (cbs) { | ||
| cbs.delete(data); | ||
| } | ||
| serviceConfigCallbacks.delete(callbacks); | ||
| }, | ||
| }; | ||
| }; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for observeChanges and _observeChanges usage
rg -n -g '*.{ts,tsx,js}' '\.observeChanges\s*\(' -C 3Repository: RocketChat/Rocket.Chat
Length of output: 1553
🏁 Script executed:
#!/bin/bash
# Search for _observeChanges references
rg -n -g '*.{ts,tsx,js}' '_observeChanges' -C 3Repository: RocketChat/Rocket.Chat
Length of output: 2804
🏁 Script executed:
#!/bin/bash
# Look for callbacks.ready usage in the codebase to understand the pattern
rg -n -g '*.{ts,tsx,js}' 'callbacks\.ready' -C 2Repository: RocketChat/Rocket.Chat
Length of output: 48
🏁 Script executed:
#!/bin/bash
# Check the entire file to understand context
wc -l apps/meteor/server/services/meteor/userReactivity.tsRepository: RocketChat/Rocket.Chat
Length of output: 121
🏁 Script executed:
#!/bin/bash
cat -n apps/meteor/server/services/meteor/userReactivity.tsRepository: RocketChat/Rocket.Chat
Length of output: 3434
🏁 Script executed:
#!/bin/bash
# Search for how _observeChanges return value is used - check call sites
rg -n -g '*.{ts,tsx,js}' 'Collection.*\.find\(' -C 2
rg -n -g '*.{ts,tsx,js}' 'mongo\.collection\(' -C 2Repository: RocketChat/Rocket.Chat
Length of output: 48
🏁 Script executed:
#!/bin/bash
# Look for Meteor/MongoDB connection usage to understand the contract better
rg -n -g '*.{ts,tsx,js}' 'MongoInternals\.Connection\.prototype' -A 5Repository: RocketChat/Rocket.Chat
Length of output: 527
🏁 Script executed:
#!/bin/bash
# Check if there are other observeChanges overrides or implementations
rg -n -g '*.{ts,tsx,js}' 'observeChanges.*=' -B 2 -A 5Repository: RocketChat/Rocket.Chat
Length of output: 793
🏁 Script executed:
#!/bin/bash
# Look for usage of observe/watch patterns and handle properties
rg -n -g '*.{ts,tsx,js}' '\.stop\(\)' -B 3 -A 1Repository: RocketChat/Rocket.Chat
Length of output: 30556
🏁 Script executed:
#!/bin/bash
# Search for sort/limit in find operations related to observeChanges
rg -n -g '*.{ts,tsx,js}' 'sort|limit' -B 2 -A 2 | head -100Repository: RocketChat/Rocket.Chat
Length of output: 9011
🏁 Script executed:
#!/bin/bash
# Look for Collection.find calls with options
rg -n -g '*.{ts,tsx,js}' 'collection.*observe' -i -B 2 -A 2Repository: RocketChat/Rocket.Chat
Length of output: 3795
🏁 Script executed:
#!/bin/bash
# Check if there's any Meteor documentation or type definitions in the repo
fd -e 'd.ts' -e 'types.ts' | xargs grep -l 'observeChanges\|_observeChanges' 2>/dev/null || echo "No type defs found"Repository: RocketChat/Rocket.Chat
Length of output: 84
🏁 Script executed:
#!/bin/bash
# Look for uses of observe or observeChanges on server that might use the handle
rg -n -g '*.{ts,tsx,js}' '\.observe\(' -B 2 -A 3 | head -80Repository: RocketChat/Rocket.Chat
Length of output: 5969
🏁 Script executed:
#!/bin/bash
# Check if Collection.observe is called anywhere (the method that would call _observeChanges)
rg -n -g '*.{ts,tsx,js}' 'collection\.observe|Collection\.observe' -B 2 -A 3Repository: RocketChat/Rocket.Chat
Length of output: 960
🏁 Script executed:
#!/bin/bash
# Look for await _observeChanges or any async calls to observe
rg -n -g '*.{ts,tsx,js}' 'await.*observe|observeChanges.*await' -B 1 -A 1Repository: RocketChat/Rocket.Chat
Length of output: 524
The _observeChanges override needs to return a synchronous handle with isReady and isReadyPromise properties.
This async override may break the Meteor contract. The original _observeChanges should return a synchronous handle immediately (not a Promise), with the initial query run asynchronously. The returned handle must include isReady (boolean) and isReadyPromise (Promise) properties—currently only stop() is provided. Additionally, the options parameter only forwards projection/fields but omits sort, limit, and other common query options, which could cause unexpected query behavior.
🤖 Prompt for AI Agents
In `@apps/meteor/server/services/meteor/userReactivity.ts` around lines 17 - 72,
The override MongoInternals.Connection.prototype._observeChanges must be
synchronous and return a handle immediately (not an async function); change the
async function to a normal function that starts the initial DB query
asynchronously, immediately returns an object with stop(), isReady (initially
false) and isReadyPromise (a Promise resolved when the initial query completes),
and when the async query finishes set isReady=true and resolve isReadyPromise
and invoke callbacks.added as currently implemented; also forward additional
query options (e.g., sort, limit, skip, hint, etc.) from the options param when
calling mongo.rawCollection(...).find(...) so you don't only pass
projection/fields; keep the userCallbacks and serviceConfigCallbacks logic but
ensure stop() can cancel/cleanup even if called before the initial query
resolves.
There was a problem hiding this comment.
this is completely wrong, where did you get this info from?
There was a problem hiding this comment.
🧩 Analysis chain
@sampaiodiego I apologize for the incorrect analysis. Let me search for the actual Meteor documentation to understand the correct contract for _observeChanges.
🌐 Web query:
Meteor MongoInternals Connection _observeChanges contract
💡 Result:
MongoInternals.Connection#_observeChanges is Meteor’s server-side internal hook that powers Mongo.Cursor#observeChanges(...) / observeChangesAsync(...). While it’s not a public API, the “contract” it must satisfy is effectively the same as the public observeChanges* semantics:
Inputs (conceptually)
- A cursor/query description (collection + selector + options like
sort/limit/skip/fields, etc.). - An “ordered” vs “unordered” mode:
- Unordered observers use
added/changed/removed. - Ordered observers use
addedBefore/movedBefore(and may still usechanged/removed).
- Unordered observers use
- A callbacks object.
Callback contract
Meteor will call (some subset of) these callbacks, with these meanings:
added(id, fields)oraddedBefore(id, fields, before)fieldsis the full document minus_id.beforeis the_idof the doc that follows it in sort order, ornullfor “at end”.
changed(id, fields)fieldscontains only changed keys; removed keys appear with valueundefined.
removed(id)movedBefore(id, before)(ordered observers only)beforeis the following doc’s id, ornullfor “at end”.
These definitions are documented forobserveChangesAsync, and apply equivalently to the underlying implementation. [1]
Timing / lifecycle contract
- Initial results are delivered before the call returns: Meteor calls
added/addedBeforezero-or-more times to publish the initial result set, then returns the handle. [1] _observeChangesreturns a live query handle with at least:stop()— stops observation and releases resources. [1]
Ordering rule (practical requirement)
If the query uses skip/limit (and generally any ordering-sensitive query), Meteor expects an ordered observe (callbacks like addedBefore instead of added), otherwise you’ll hit errors like: “must use ordered observe … with skip or limit”. [2]
If you share what you’re implementing (e.g., a custom observe driver / alternative Mongo driver), I can map the exact callback set you must support for your specific query shapes (ordered vs unordered) and how to translate DB events into these callbacks.
Sources:
[1] https://docs.meteor.com/api/collections (section: Cursor.observeChangesAsync callbacks + initial-add-before-return + handle.stop)
[2] https://stackoverflow.com/questions/36099080/server-function-call-on-document-observe-changes (ordered observe requirement/error)
@sampaiodiego I apologize for the incorrect review comment. Based on the Meteor documentation I found, I can see that I was wrong about the isReady and isReadyPromise properties being required—the standard contract only requires a stop() method on the returned handle.
Could you help me understand what was incorrect about my analysis? I want to learn from this mistake so I can provide better reviews in the future. Specifically:
- Is the async implementation appropriate for your use case?
- Are the query options (sort, limit, skip) not needed for the specific queries being observed here?
- Is there internal Rocket.Chat context about how this observer pattern works that I'm missing?
Thank you for your patience in correcting me.
🧠 Learnings used
Learnt from: sampaiodiego
Repo: RocketChat/Rocket.Chat PR: 37357
File: apps/meteor/ee/server/startup/federation.ts:39-74
Timestamp: 2025-11-05T20:53:57.761Z
Learning: In Rocket.Chat (apps/meteor/app/settings/server/CachedSettings.ts), the settings.watchMultiple() method immediately invokes its callback with current values if all requested settings exist in the store, then continues watching for subsequent changes. It does not wait for a setting to change before the first invocation.
Learnt from: Dnouv
Repo: RocketChat/Rocket.Chat PR: 37057
File: packages/apps-engine/src/definition/accessors/IUserRead.ts:23-27
Timestamp: 2025-09-25T09:59:26.461Z
Learning: AppUserBridge.getUserRoomIds in apps/meteor/app/apps/server/bridges/users.ts always returns an array of strings by mapping subscription documents to room IDs, never undefined, even when user has no room subscriptions.
Learnt from: tassoevan
Repo: RocketChat/Rocket.Chat PR: 38219
File: packages/core-typings/src/cloud/Announcement.ts:5-6
Timestamp: 2026-01-17T01:51:47.764Z
Learning: In packages/core-typings/src/cloud/Announcement.ts, the AnnouncementSchema.createdBy field intentionally overrides IBannerSchema.createdBy (object with _id and optional username) with a string enum ['cloud', 'system'] to match existing runtime behavior. This is documented as technical debt with a FIXME comment at apps/meteor/app/cloud/server/functions/syncWorkspace/handleCommsSync.ts:53 and should not be flagged as an error until the runtime behavior is corrected.
Learnt from: MartinSchoeler
Repo: RocketChat/Rocket.Chat PR: 37408
File: apps/meteor/client/views/admin/ABAC/useRoomAttributeOptions.tsx:53-69
Timestamp: 2025-11-10T19:06:20.146Z
Learning: In the Rocket.Chat repository, do not provide suggestions or recommendations about code sections marked with TODO comments. The maintainers have already identified these as future work and external reviewers lack the full context about implementation plans and timing.
There was a problem hiding this comment.
I don't know where you get the definitions for _observeChanges
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for any Meteor internal documentation or type definitions in the repo
rg -n -g '*.{ts,d.ts,js}' 'MongoInternals\.Connection' -A 10 -B 2
Length of output: 1156
🏁 Script executed:
#!/bin/bash
# Look for any comments or documentation about _observeChanges in the codebase
rg -n -g '*.{ts,js}' '_observeChanges' -B 5 -A 5
Length of output: 2947
🌐 Web query:
Meteor MongoInternals Connection _observeChanges source code implementation
💡 Result:
In Meteor core, MongoInternals.Connection.prototype._observeChanges lives in the server-side mongo package and is implemented (as of the Meteor 3.3.x line) in:
packages/mongo/mongo_connection.js(the connection class) (fossies.org)- It delegates most of the livequery wiring to
packages/mongo/observe_multiplex.ts(the “observe multiplexer” that deduplicates identical live queries) (fossies.org)
How _observeChanges is structured (high level)
- Build a cursor description (collection name, selector, options/projection/sort/limit, ordered vs unordered callbacks, etc.).
- Look up or create an
ObserveMultiplexerkeyed by that cursor description, so multiple callers observing the same query share one underlying driver and fan out callbacks. - The
ObserveMultiplexerchooses an observe driver:- Prefer
OplogObserveDriver(oplog tailing) when the query is supported. - Fall back to
PollingObserveDriverwhen it’s not. (fossies.org)
- Prefer
- The chosen driver runs the initial query (emitting initial
addedcallbacks) and then streams changes via oplog tailing or polling/diff.
The underlying drivers you’ll end up reading
- Oplog driver:
packages/mongo/oplog_observe_driver.js(exportsOplogObserveDriver) (fossies.org) - Polling fallback:
packages/mongo/polling_observe_driver.ts(fossies.org) - Background/architecture explanation of the decision and unsupported query shapes:
docs/long-form/oplog-observe-driver.md(fossies.org)
If you tell me which Meteor version/branch you’re targeting (e.g., Meteor 3.3.2, devel, etc.), I can point you to the exact function signature and the specific call chain inside that version’s mongo_connection.js / observe_multiplex.ts.
Citations:
- 1: https://fossies.org/linux/www/meteor-release-METEOR-3.3.1.tar.gz/index_o.html?utm_source=openai
- 2: https://fossies.org/linux/www/meteor-release-METEOR-3.3.1.tar.gz/index_o.html?utm_source=openai
- 3: https://fossies.org/linux/meteor/docs/long-form/oplog-observe-driver.md?utm_source=openai
- 4: https://fossies.org/linux/meteor/packages/mongo/oplog_observe_driver.js?utm_source=openai
- 5: https://fossies.org/linux/www/meteor-release-METEOR-3.3.1.tar.gz/index_o.html?utm_source=openai
- 6: https://fossies.org/linux/meteor/docs/long-form/oplog-observe-driver.md?utm_source=openai
@sampaiodiego I apologize for the confusion. I was making inferences based on:
- Public Meteor documentation about
observeChanges()(the public API) - Web searches that pointed to Meteor's internal source code structure
I don't have access to the actual Meteor internal source code or its type definitions in this repository, so I was incorrectly assuming that the internal _observeChanges method would follow the same patterns as the public API. This was a mistake on my part.
Could you help me understand:
- What is the actual contract for
MongoInternals.Connection.prototype._observeChanges? - Is the async implementation in this PR the correct approach for replacing the oplog-based reactivity?
- Are there any specific behaviors or return values that must be maintained?
I want to learn from this so I can provide more accurate reviews in the future, especially when it comes to internal Meteor APIs that your team is working with.
🧠 Learnings used
Learnt from: sampaiodiego
Repo: RocketChat/Rocket.Chat PR: 37357
File: apps/meteor/ee/server/startup/federation.ts:39-74
Timestamp: 2025-11-05T20:53:57.761Z
Learning: In Rocket.Chat (apps/meteor/app/settings/server/CachedSettings.ts), the settings.watchMultiple() method immediately invokes its callback with current values if all requested settings exist in the store, then continues watching for subsequent changes. It does not wait for a setting to change before the first invocation.
Learnt from: Dnouv
Repo: RocketChat/Rocket.Chat PR: 37057
File: packages/apps-engine/src/definition/accessors/IUserRead.ts:23-27
Timestamp: 2025-09-25T09:59:26.461Z
Learning: AppUserBridge.getUserRoomIds in apps/meteor/app/apps/server/bridges/users.ts always returns an array of strings by mapping subscription documents to room IDs, never undefined, even when user has no room subscriptions.
Learnt from: tassoevan
Repo: RocketChat/Rocket.Chat PR: 38219
File: packages/core-typings/src/cloud/Announcement.ts:5-6
Timestamp: 2026-01-17T01:51:47.764Z
Learning: In packages/core-typings/src/cloud/Announcement.ts, the AnnouncementSchema.createdBy field intentionally overrides IBannerSchema.createdBy (object with _id and optional username) with a string enum ['cloud', 'system'] to match existing runtime behavior. This is documented as technical debt with a FIXME comment at apps/meteor/app/cloud/server/functions/syncWorkspace/handleCommsSync.ts:53 and should not be flagged as an error until the runtime behavior is corrected.
Learnt from: MartinSchoeler
Repo: RocketChat/Rocket.Chat PR: 37408
File: apps/meteor/client/views/admin/ABAC/useRoomAttributeOptions.tsx:53-69
Timestamp: 2025-11-10T19:06:20.146Z
Learning: In the Rocket.Chat repository, do not provide suggestions or recommendations about code sections marked with TODO comments. The maintainers have already identified these as future work and external reviewers lack the full context about implementation plans and timing.
Co-authored-by: Kevin Aleman <kaleman960@gmail.com>
Proposed changes (including videos or screenshots)
Issue(s)
Steps to test or reproduce
Further comments
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.