Skip to content

Commit af8b927

Browse files
committed
Add more event fields: lsid, txnNumber and disambiguatedPaths
1 parent d0b81aa commit af8b927

File tree

2 files changed

+56
-0
lines changed

2 files changed

+56
-0
lines changed

src/change_stream/event.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,14 @@ pub struct ChangeStreamEvent<T> {
7474
/// The new name for the `ns` collection. Only included for `OperationType::Rename`.
7575
pub to: Option<ChangeNamespace>,
7676

77+
/// The identifier for the session associated with the transaction.
78+
/// Only present if the operation is part of a multi-document transaction.
79+
pub lsid: Option<Document>,
80+
81+
/// Together with the lsid, a number that helps uniquely identify a transaction.
82+
/// Only present if the operation is part of a multi-document transaction.
83+
pub txn_number: Option<i64>,
84+
7785
/// A `Document` that contains the `_id` of the document created or modified by the `insert`,
7886
/// `replace`, `delete`, `update` operations (i.e. CRUD operations). For sharded collections,
7987
/// also displays the full shard key for the document. The `_id` field is not repeated if it is
@@ -126,6 +134,12 @@ pub struct UpdateDescription {
126134

127135
/// Arrays that were truncated in the `Document`.
128136
pub truncated_arrays: Option<Vec<TruncatedArray>>,
137+
138+
/// When an update event reports changes involving ambiguous fields, the disambiguatedPaths
139+
/// document provides the path key with an array listing each path component.
140+
/// Note: The disambiguatedPaths field is only available on change streams started with the
141+
/// showExpandedEvents option
142+
pub disambiguated_paths: Option<Document>,
129143
}
130144

131145
/// Describes an array that has been truncated.

src/test/change_stream.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,48 @@ async fn split_large_event() -> Result<()> {
659659
Ok(())
660660
}
661661

662+
/// Test that transaction fields are parsed correctly
663+
#[tokio::test]
664+
async fn transaction_fields() -> Result<()> {
665+
let (client, coll, mut stream) =
666+
match init_stream("chang_stream_transaction_fields", true).await? {
667+
Some(t) => t,
668+
None => return Ok(()),
669+
};
670+
if client.is_sharded() {
671+
log_uncaptured("skipping change stream test on unsupported topology");
672+
return Ok(());
673+
}
674+
if !client.supports_transactions() {
675+
log_uncaptured(
676+
"skipping change stream transaction_fields test due to lack of transaction support",
677+
);
678+
return Ok(());
679+
}
680+
681+
let mut session = client.start_session().await.unwrap();
682+
let session_id = session.id().get("id").cloned();
683+
assert!(session_id.is_some());
684+
session.start_transaction().await.unwrap();
685+
coll.insert_one(doc! {"_id": 1})
686+
.session(&mut session)
687+
.await?;
688+
session.commit_transaction().await.unwrap();
689+
690+
let next_event = stream.next().await.transpose()?;
691+
assert!(matches!(next_event,
692+
Some(ChangeStreamEvent {
693+
operation_type: OperationType::Insert,
694+
document_key: Some(key),
695+
lsid: Some(lsid),
696+
txn_number: Some(1),
697+
..
698+
}) if key == doc! { "_id": 1 } && lsid.get("id") == session_id.as_ref()
699+
));
700+
701+
Ok(())
702+
}
703+
662704
// Regression test: `Collection::watch` uses the type parameter. This is not flagged as a test to
663705
// run because it's just asserting that this compiles.
664706
#[allow(unreachable_code, unused_variables, clippy::diverging_sub_expression)]

0 commit comments

Comments
 (0)