Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,7 @@ private synchronized void onConnected(ProtocolMessage message) {
maxIdleInterval = connectionDetails.maxIdleInterval;
connectionStateTtl = connectionDetails.connectionStateTtl;
maxMessageSize = connectionDetails.maxMessageSize;
siteCode = connectionDetails.siteCode; // CD2j

/* set the clientId resolved from token, if any */
String clientId = connectionDetails.clientId;
Expand Down Expand Up @@ -2033,6 +2034,7 @@ private boolean isFatalError(ErrorInfo err) {
private CMConnectivityListener connectivityListener;
private long connectionStateTtl = Defaults.connectionStateTtl;
public int maxMessageSize = Defaults.maxMessageSize;
public String siteCode; // CD2j
long maxIdleInterval = Defaults.maxIdleInterval;
private int disconnectedRetryAttempt = 0;

Expand Down
10 changes: 10 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ public class ConnectionDetails {
* Spec: CD2f, RTN14e, DF1a
*/
public Long connectionStateTtl;
/**
* An opaque string identifying the server instance that the client is connected to.
* Used as a key in siteTimeserials maps for LiveObjects operations.
* <p>
* Spec: CD2j
*/
public String siteCode;

ConnectionDetails() {
maxIdleInterval = Defaults.maxIdleInterval;
Expand Down Expand Up @@ -114,6 +121,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException {
case "connectionStateTtl":
connectionStateTtl = unpacker.unpackLong();
break;
case "siteCode":
siteCode = unpacker.unpackString();
break;
default:
Log.v(TAG, "Unexpected field: " + fieldName);
unpacker.skipValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.ably.lib.objects.type.map.LiveMapValue
import io.ably.lib.realtime.ChannelState
import io.ably.lib.types.AblyException
import io.ably.lib.types.ProtocolMessage
import io.ably.lib.types.PublishResult
import io.ably.lib.util.Log
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
Expand All @@ -31,6 +32,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal

internal var state = ObjectsState.Initialized

/**
* Set of serials for operations applied locally upon ACK, awaiting deduplication of the server echo.
* @spec RTO7b, RTO7b1
*/
internal val appliedOnAckSerials = mutableSetOf<String>()

/**
* @spec RTO4 - Used for handling object messages and object sync messages
*/
Expand Down Expand Up @@ -125,13 +132,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
)
)

// RTO11g - Publish the message
publish(arrayOf(msg))
// RTO11i - publish and apply locally on ACK
publishAndApply(arrayOf(msg))

// RTO11h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope
return objectsPool.get(objectId) as? LiveMap ?: withContext(sequentialScope.coroutineContext) {
objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveMap
}
// RTO11h2 - Return existing object if found after apply
return objectsPool.get(objectId) as? LiveMap
?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d
}

private suspend fun createCounterAsync(initialValue: Number): LiveCounter {
Expand Down Expand Up @@ -161,13 +167,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
)
)

// RTO12g - Publish the message
publish(arrayOf(msg))
// RTO12i - publish and apply locally on ACK
publishAndApply(arrayOf(msg))

// RTO12h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope
return objectsPool.get(objectId) as? LiveCounter ?: withContext(sequentialScope.coroutineContext) {
objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveCounter
}
// RTO12h2 - Return existing object if found after apply
return objectsPool.get(objectId) as? LiveCounter
?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d
}

/**
Expand All @@ -182,15 +187,55 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
/**
* Spec: RTO15
*/
internal suspend fun publish(objectMessages: Array<ObjectMessage>) {
internal suspend fun publish(objectMessages: Array<ObjectMessage>): PublishResult {
// RTO15b, RTL6c - Ensure that the channel is in a valid state for publishing
adapter.throwIfUnpublishableState(channelName)
adapter.ensureMessageSizeWithinLimit(objectMessages)
// RTO15e - Must construct the ProtocolMessage as per RTO15e1, RTO15e2, RTO15e3
val protocolMessage = ProtocolMessage(ProtocolMessage.Action.`object`, channelName)
protocolMessage.state = objectMessages
// RTO15f, RTO15g - Send the ProtocolMessage using the adapter and capture success/failure
adapter.sendAsync(protocolMessage)
return adapter.sendAsync(protocolMessage) // RTO15h
}

/**
* Publishes the given object messages and, upon receiving the ACK, immediately applies them
* locally as synthetic inbound messages using the assigned serial and connection's siteCode.
*
* Spec: RTO20
*/
internal suspend fun publishAndApply(objectMessages: Array<ObjectMessage>) {
// RTO20b - publish, propagate failure
val publishResult = publish(objectMessages)

// RTO20c - validate required info
val siteCode = adapter.connectionManager.siteCode
if (siteCode == null) {
Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed")
return
}
val serials = publishResult.serials
if (serials == null || serials.size != objectMessages.size) {
Log.e(tag, "RTO20c2: PublishResult.serials unavailable or wrong length; operations will be applied when echoed")
return
}

// RTO20d - create synthetic inbound ObjectMessages
val syntheticMessages = mutableListOf<ObjectMessage>()
objectMessages.forEachIndexed { i, msg ->
val serial = serials[i]
if (serial == null) {
Log.d(tag, "RTO20d1: serial null at index $i (conflated), skipping")
return@forEachIndexed
}
syntheticMessages.add(msg.copy(serial = serial, siteCode = siteCode)) // RTO20d2a, RTO20d2b, RTO20d3
}
if (syntheticMessages.isEmpty()) return

// RTO20e, RTO20f - dispatch to sequential scope for ordering
withContext(sequentialScope.coroutineContext) {
objectsManager.applyAckResult(syntheticMessages) // suspends if SYNCING (RTO20e), applies on SYNCED (RTO20f)
}
}

/**
Expand Down Expand Up @@ -272,7 +317,17 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
}
}
ChannelState.detached,
ChannelState.suspended,
ChannelState.failed -> {
// RTO20e1 - fail any publishAndApply operations waiting for sync
val errorReason = try { adapter.getChannel(channelName).reason } catch (e: Exception) { null }
val error = ablyException(
"publishAndApply could not be applied locally: channel entered $state whilst waiting for objects sync",
ErrorCode.PublishAndApplyFailedDueToChannelState,
HttpStatusCode.BadRequest,
cause = errorReason?.let { AblyException.fromErrorInfo(it) }
)
objectsManager.failBufferedAcks(error)
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
objectsPool.clearObjectsData(false)
objectsManager.clearSyncObjectsDataPool()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ internal enum class ErrorCode(public val code: Int) {
// Channel mode and state validation error codes
ChannelModeRequired(40_024),
ChannelStateError(90_001),
PublishAndApplyFailedDueToChannelState(92_008),
}

internal enum class HttpStatusCode(public val code: Int) {
Expand Down
73 changes: 66 additions & 7 deletions liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import io.ably.lib.objects.type.BaseRealtimeObject
import io.ably.lib.objects.type.ObjectUpdate
import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
import io.ably.lib.objects.type.livemap.DefaultLiveMap
import io.ably.lib.types.AblyException
import io.ably.lib.util.Log
import kotlinx.coroutines.CompletableDeferred

/**
* @spec RTO5 - Processes OBJECT and OBJECT_SYNC messages during sync sequences
Expand All @@ -21,6 +23,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
* @spec RTO7 - Buffered object operations during sync
*/
private val bufferedObjectOperations = mutableListOf<ObjectMessage>() // RTO7a
/**
* @spec RTO22 - ACK results buffered during sync, with deferred for caller waiting
*/
private val bufferedAcks = mutableListOf<Pair<List<ObjectMessage>, CompletableDeferred<Unit>>>()

/**
* Handles object messages (non-sync messages).
Expand All @@ -39,7 +45,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}

// Apply messages immediately if synced
applyObjectMessages(objectMessages) // RTO8b
applyObjectMessages(objectMessages, ObjectsOperationSource.CHANNEL) // RTO8b
}

/**
Expand Down Expand Up @@ -77,6 +83,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
// need to discard all buffered object operation messages on new sync start
bufferedObjectOperations.clear() // RTO5a2b
syncObjectsDataPool.clear() // RTO5a2a
// RTO21b - clear ACK tracking state on new sync (safety guard; RTO20e1 should have already failed deferreds)
for ((_, deferred) in bufferedAcks) { deferred.cancel() }
bufferedAcks.clear()
realtimeObjects.appliedOnAckSerials.clear()
currentSyncId = syncId
stateChange(ObjectsState.Syncing, false)
}
Expand All @@ -89,14 +99,46 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
internal fun endSync(deferStateEvent: Boolean) {
Log.v(tag, "Ending sync sequence")
applySync()
// should apply buffered object operations after we applied the sync.
// can use regular non-sync object.operation logic
applyObjectMessages(bufferedObjectOperations) // RTO5c6
realtimeObjects.appliedOnAckSerials.clear() // RTO5c9 - sync replaces state

// RTO5c6b: apply buffered ACKs before buffered OBJECT messages (proper dedup ordering)
for ((messages, deferred) in bufferedAcks) {
applyObjectMessages(messages, ObjectsOperationSource.LOCAL)
deferred.complete(Unit) // signal publishAndApply to resume
}
bufferedAcks.clear()

applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6
bufferedObjectOperations.clear() // RTO5c5
syncObjectsDataPool.clear() // RTO5c4
currentSyncId = null // RTO5c3
stateChange(ObjectsState.Synced, deferStateEvent)
stateChange(ObjectsState.Synced, deferStateEvent) // RTO5c8
}

/**
* Called from publishAndApply (via withContext sequentialScope).
* If SYNCED: apply immediately with LOCAL source.
* If SYNCING: buffer, suspend deferred until endSync processes it (RTO20e).
*/
internal suspend fun applyAckResult(messages: List<ObjectMessage>) {
if (realtimeObjects.state == ObjectsState.Synced) {
applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f
} else {
val deferred = CompletableDeferred<Unit>()
bufferedAcks.add(Pair(messages, deferred))
deferred.await() // RTO20e - suspends until endSync completes or channel fails (RTO20e1)
}
}

/**
* Fails all buffered ACK deferreds.
* Called when the channel enters DETACHED/SUSPENDED/FAILED (RTO20e1).
*/
internal fun failBufferedAcks(error: AblyException) {
for ((_, deferred) in bufferedAcks) {
deferred.completeExceptionally(error)
}
bufferedAcks.clear()
}

/**
Expand Down Expand Up @@ -162,7 +204,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
*
* @spec RTO9 - Creates zero-value objects if they don't exist
*/
private fun applyObjectMessages(objectMessages: List<ObjectMessage>) {
private fun applyObjectMessages(
objectMessages: List<ObjectMessage>,
source: ObjectsOperationSource = ObjectsOperationSource.CHANNEL,
) {
// RTO9a
for (objectMessage in objectMessages) {
if (objectMessage.operation == null) {
Expand All @@ -177,14 +222,26 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
Log.w(tag, "Object operation action is unknown, skipping message: ${objectMessage.id}")
continue
}

// RTO9a3 - skip operations already applied on ACK
if (objectMessage.serial != null &&
realtimeObjects.appliedOnAckSerials.contains(objectMessage.serial)) {
Log.d(tag, "RTO9a3: serial ${objectMessage.serial} already applied on ACK; discarding")
realtimeObjects.appliedOnAckSerials.remove(objectMessage.serial)
continue // discard without taking any further action
}

// RTO9a2a - we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
// since they need to be able to eventually initialize themselves from that *_CREATE op.
// so to simplify operations handling, we always try to create a zero-value object in the pool first,
// and then we can always apply the operation on the existing object in the pool.
val obj = realtimeObjects.objectsPool.createZeroValueObjectIfNotExists(objectOperation.objectId) // RTO9a2a1
obj.applyObject(objectMessage) // RTO9a2a2, RTO9a2a3
val applied = obj.applyObject(objectMessage, source) // RTO9a2a2, RTO9a2a3
if (source == ObjectsOperationSource.LOCAL && applied && objectMessage.serial != null) {
realtimeObjects.appliedOnAckSerials.add(objectMessage.serial) // RTO9a2a4
}
}
}

Expand Down Expand Up @@ -240,6 +297,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}

internal fun dispose() {
for ((_, deferred) in bufferedAcks) { deferred.cancel() }
bufferedAcks.clear()
syncObjectsDataPool.clear()
bufferedObjectOperations.clear()
disposeObjectsStateListeners()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.ably.lib.objects

/** @spec RTO22 */
internal enum class ObjectsOperationSource {
LOCAL, // RTO22a - applied upon receipt of ACK
CHANNEL // RTO22b - received over a Realtime channel
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.ably.lib.objects.type
import io.ably.lib.objects.ObjectMessage
import io.ably.lib.objects.ObjectOperation
import io.ably.lib.objects.ObjectState
import io.ably.lib.objects.ObjectsOperationSource
import io.ably.lib.objects.ObjectsPoolDefaults
import io.ably.lib.objects.objectError
import io.ably.lib.objects.type.livecounter.noOpCounterUpdate
Expand Down Expand Up @@ -66,11 +67,11 @@ internal abstract class BaseRealtimeObject(

/**
* This is invoked by ObjectMessage having updated data with parent `ProtocolMessageAction` as `object`
* @return an update describing the changes
* @return true if the operation was meaningfully applied, false otherwise
*
* @spec RTLM15/RTLC7 - Applies ObjectMessage with object data operations to LiveMap/LiveCounter
*/
internal fun applyObject(objectMessage: ObjectMessage) {
internal fun applyObject(objectMessage: ObjectMessage, source: ObjectsOperationSource): Boolean {
validateObjectId(objectMessage.operation?.objectId)

val msgTimeSerial = objectMessage.serial
Expand All @@ -84,17 +85,18 @@ internal abstract class BaseRealtimeObject(
"Skipping ${objectOperation.action} op: op serial $msgTimeSerial <= site serial ${siteTimeserials[msgSiteCode]}; " +
"objectId=$objectId"
)
return
return false // RTLC7b / RTLM15b
}
// RTLC7c / RTLM15c - only update siteTimeserials for CHANNEL source
if (source == ObjectsOperationSource.CHANNEL) {
siteTimeserials[msgSiteCode!!] = msgTimeSerial!! // RTLC7c, RTLM15c
}
// should update stored site serial immediately. doesn't matter if we successfully apply the op,
// as it's important to mark that the op was processed by the object
siteTimeserials[msgSiteCode!!] = msgTimeSerial!! // RTLC7c, RTLM15c

if (isTombstoned) {
// this object is tombstoned so the operation cannot be applied
return
return false // RTLC7e / RTLM15e
}
applyObjectOperation(objectOperation, objectMessage) // RTLC7d
return applyObjectOperation(objectOperation, objectMessage) // RTLC7d
}

/**
Expand Down Expand Up @@ -166,9 +168,10 @@ internal abstract class BaseRealtimeObject(
*
* @param operation The operation containing the action and data to apply
* @param message The complete object message containing the operation
* @return true if the operation was meaningfully applied, false otherwise
*
*/
abstract fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage)
abstract fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage): Boolean

/**
* Clears the object's data and returns an update describing the changes.
Expand Down
Loading
Loading