Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,11 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
transition(to: .syncing(.init(bufferedObjectOperations: [], syncSequence: nil)), userCallbackQueue: userCallbackQueue)
}

// RTO4d: Clear buffered object operations without applying them
if case let .syncing(syncingData) = state {
syncingData.bufferedObjectOperations = []
}

// We only care about the case where HAS_OBJECTS is not set (RTO4b); if it is set then we're going to shortly receive an OBJECT_SYNC instead (RTO4a)
guard !hasObjects else {
return
Expand All @@ -523,7 +528,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo

// I have, for now, not directly implemented the "perform the actions for object sync completion" of RTO4b4 since my implementation doesn't quite match the model given there; here you only have a SyncObjectsPool if you have an OBJECT_SYNC in progress, which you might not have upon receiving an ATTACHED. Instead I've just implemented what seem like the relevant side effects. Can revisit this if "the actions for object sync completion" get more complex.

// RTO4b3, RTO4b4, RTO4b5, RTO5c3, RTO5c4, RTO5c5, RTO5c8
// RTO4b3, RTO4b4, RTO5c3, RTO5c4, RTO5c5, RTO5c8
transition(to: .synced, userCallbackQueue: userCallbackQueue)
}

Expand Down Expand Up @@ -558,9 +563,8 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
// Figure out whether to continue any existing sync sequence or start a new one
let isNewSyncSequence = syncCursor == nil || syncingData.syncSequence?.id != syncCursor?.sequenceID
if isNewSyncSequence {
// RTO5a2a, RTO5a2b: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3
// RTO5a2a: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3
syncingData.syncSequence = nil
syncingData.bufferedObjectOperations = []
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ struct InternalDefaultRealtimeObjectsTests {

// @spec RTO5a2
// @spec RTO5a2a
// @spec RTO5a2b
@Test
func newSequenceIdDiscardsInFlightSync() async throws {
let internalQueue = TestFactories.createInternalQueue()
Expand All @@ -145,7 +144,7 @@ struct InternalDefaultRealtimeObjectsTests {

#expect(realtimeObjects.testsOnly_hasSyncSequence)

// Inject an OBJECT; it will get buffered per RTO8a and subsequently discarded per RTO5a2b
// Inject an OBJECT; it will get buffered per RTO8a
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
TestFactories.mapCreateOperationMessage(objectId: "map:3@789"),
Expand All @@ -172,10 +171,10 @@ struct InternalDefaultRealtimeObjectsTests {
)
}

// Verify only the second sequence's objects were applied (RTO5a2a - previous cleared)
// Verify the second sequence's objects and the buffered OBJECT were applied (RTO5a2a - previous sync pool cleared, but buffered operations retained and applied at sync completion)
let pool = realtimeObjects.testsOnly_objectsPool
#expect(pool.entries["map:1@123"] == nil) // From discarded first sequence
#expect(pool.entries["map:3@789"] == nil) // Check we discarded the OBJECT that was buffered during discarded first sequence (RTO5a2b)
#expect(pool.entries["map:3@789"] != nil) // Buffered OBJECT was retained across new sequence and applied at sync completion
#expect(pool.entries["map:2@456"] != nil) // From completed second sequence
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
}
Expand Down Expand Up @@ -389,14 +388,62 @@ struct InternalDefaultRealtimeObjectsTests {
#expect(realtimeObjects.testsOnly_hasSyncSequence)
}

// MARK: - RTO4d Tests

// @spec RTO4d
@Test
func clearsBufferedOperationsOnAttached() {
let internalQueue = TestFactories.createInternalQueue()
let realtimeObjects = InternalDefaultRealtimeObjectsTests.createDefaultRealtimeObjects(internalQueue: internalQueue)

// Start a sync sequence
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_handleObjectSyncProtocolMessage(
objectMessages: [
TestFactories.mapObjectMessage(objectId: "map:sync1@123"),
],
protocolMessageChannelSerial: "seq1:cursor1",
)
}

#expect(realtimeObjects.testsOnly_hasSyncSequence)

// Buffer some OBJECT messages during the sync (RTO8a)
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
TestFactories.mapCreateOperationMessage(objectId: "map:buffered@456"),
])
}

// Receive ATTACHED with hasObjects = true (RTO4d should clear the buffer)
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_onChannelAttached(hasObjects: true)
}

// Complete a new sync sequence
internalQueue.ably_syncNoDeadlock {
realtimeObjects.nosync_handleObjectSyncProtocolMessage(
objectMessages: [
TestFactories.mapObjectMessage(objectId: "map:sync2@789"),
],
protocolMessageChannelSerial: nil, // Single-message sync
)
}

// Verify the previously buffered operation was NOT applied (it was cleared by RTO4d)
let pool = realtimeObjects.testsOnly_objectsPool
#expect(pool.entries["map:buffered@456"] == nil)
// Verify the new sync's objects were applied
#expect(pool.entries["map:sync2@789"] != nil)
}

// MARK: - RTO4b Tests

// @spec RTO4b1
// @spec RTO4b2
// @spec RTO4b2a
// @spec RTO4b3
// @spec RTO4b4
// @spec RTO4b5
@available(iOS 17.0.0, tvOS 17.0.0, *)
@Test
func handlesHasObjectsFalse() async throws {
Expand Down Expand Up @@ -462,7 +509,7 @@ struct InternalDefaultRealtimeObjectsTests {
#expect(newRoot as AnyObject === originalPool.root as AnyObject) // Should be same instance
#expect(newRoot.testsOnly_data.isEmpty) // Should be zero-valued (empty)

// RTO4b3, RTO4b4, RTO4b5: SyncObjectsPool must be cleared, sync sequence cleared, BufferedObjectOperations cleared
// RTO4b3, RTO4b4: SyncObjectsPool must be cleared, sync sequence cleared
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
}

Expand Down
Loading