Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,18 +241,6 @@ internal class OperationRepo(
queue.forEach { it.operation.translateIds(response.idTranslations) }
}
response.idTranslations.values.forEach { _newRecordState.add(it) }
// Stall processing the queue so the backend's DB has to time
// reflect the change before we do any other operations to it.
// NOTE: Future: We could run this logic in a
// coroutineScope.launch() block so other operations not
// effecting this these id's can still be done in parallel,
// however other parts of the system don't currently account
// for this so this is not safe to do.
val waitTime = _configModelStore.model.opRepoPostCreateDelay
delay(waitTime)
synchronized(queue) {
if (queue.isNotEmpty()) waiter.wake(LoopWaiterMessage(false, waitTime))
}
}

var highestRetries = 0
Expand Down Expand Up @@ -316,7 +304,11 @@ internal class OperationRepo(
}
}

// wait for retry and post create waiters to start next operation
delayBeforeNextExecution(highestRetries, response.retryAfterSeconds)
if (response.idTranslations != null) {
delayForPostCreate(_configModelStore.model.opRepoPostCreateDelay)
}
} catch (e: Throwable) {
Logging.log(LogLevel.ERROR, "Error attempting to execute operation: $ops", e)

Expand Down Expand Up @@ -345,6 +337,22 @@ internal class OperationRepo(
}
}

/**
* Stall processing the queue so the backend's DB has to time
* reflect the change before we do any other operations to it.
* NOTE: Future: We could run this logic in a
* coroutineScope.launch() block so other operations not
* effecting this these id's can still be done in parallel,
* however other parts of the system don't currently account
* for this so this is not safe to do.
*/
suspend fun delayForPostCreate(postCreateDelay: Long) {
delay(postCreateDelay)
synchronized(queue) {
if (queue.isNotEmpty()) waiter.wake(LoopWaiterMessage(false, postCreateDelay))
}
}

internal fun getNextOps(bucketFilter: Int): List<OperationQueueItem>? {
return synchronized(queue) {
val startingOp =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ internal class LoginUserOperationExecutor(
loginUserOp: LoginUserOperation,
operations: List<Operation>,
): ExecutionResponse {
// Handle a bad state that can happen in User Model 5.1.27 or earlier versions that old Login
// request is not removed after processing if app is force-closed within the PostCreateDelay.
// Anonymous Login being processed alone will surely be rejected, so we need to drop the request
val containsSubscriptionOperation = operations.any { it is CreateSubscriptionOperation || it is TransferSubscriptionOperation }
if (!containsSubscriptionOperation && loginUserOp.externalId == null) {
return ExecutionResponse(ExecutionResult.FAIL_NORETRY)
}
if (loginUserOp.existingOnesignalId == null || loginUserOp.externalId == null) {
// When there is no existing user to attempt to associate with the externalId provided, we go right to
// createUser. If there is no externalId provided this is an insert, if there is this will be an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,32 @@ class OperationRepoTests : FunSpec({
}
}

// operations not removed from the queue may get stuck in the queue if app is force closed within the delay
test("execution of an operation with translation IDs removes the operation from queue before delay") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoPostCreateDelay = 100
val operation = mockOperation(groupComparisonType = GroupComparisonType.NONE)
val opId = operation.id
val idTranslation = mapOf("local-id1" to "id1")
coEvery {
mocks.executor.execute(listOf(operation))
} returns ExecutionResponse(ExecutionResult.SUCCESS, idTranslation)

// When
mocks.operationRepo.start()
val response = mocks.operationRepo.enqueueAndWait(operation)

// Then
response shouldBe true
coVerifyOrder {
// ensure the order: IDs are translated, operation removed from the store, then delay for postCreateDelay
operation.translateIds(idTranslation)
mocks.operationModelStore.remove(opId)
mocks.operationRepo.delayBeforeNextExecution(any(), any())
}
}

// We want to prevent a misbehaving app stuck in a loop from continuously
// sending updates every opRepoExecutionInterval (5 seconds currently).
// By waiting for the dust to settle we ensure the app is done making
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ class LoginUserOperationExecutorTests : FunSpec({
val localSubscriptionId2 = "local-subscriptionId2"
val remoteSubscriptionId1 = "remote-subscriptionId1"
val remoteSubscriptionId2 = "remote-subscriptionId2"
val createSubscriptionOperation =
CreateSubscriptionOperation(
appId,
localOneSignalId,
"subscriptionId1",
SubscriptionType.PUSH,
true,
"pushToken1",
SubscriptionStatus.SUBSCRIBED,
)

test("login anonymous user successfully creates user") {
// Given
Expand All @@ -58,7 +68,7 @@ class LoginUserOperationExecutorTests : FunSpec({
val loginUserOperationExecutor =
LoginUserOperationExecutor(
mockIdentityOperationExecutor,
MockHelper.applicationService(),
AndroidMockHelper.applicationService(),
MockHelper.deviceService(),
mockUserBackendService,
mockIdentityModelStore,
Expand All @@ -67,7 +77,11 @@ class LoginUserOperationExecutorTests : FunSpec({
MockHelper.configModelStore(),
MockHelper.languageContext(),
)
val operations = listOf<Operation>(LoginUserOperation(appId, localOneSignalId, null, null))
val operations =
listOf<Operation>(
LoginUserOperation(appId, localOneSignalId, null, null),
createSubscriptionOperation,
)

// When
val response = loginUserOperationExecutor.execute(operations)
Expand Down Expand Up @@ -98,7 +112,7 @@ class LoginUserOperationExecutorTests : FunSpec({
val loginUserOperationExecutor =
LoginUserOperationExecutor(
mockIdentityOperationExecutor,
MockHelper.applicationService(),
AndroidMockHelper.applicationService(),
MockHelper.deviceService(),
mockUserBackendService,
mockIdentityModelStore,
Expand All @@ -107,7 +121,11 @@ class LoginUserOperationExecutorTests : FunSpec({
MockHelper.configModelStore(),
MockHelper.languageContext(),
)
val operations = listOf<Operation>(LoginUserOperation(appId, localOneSignalId, null, null))
val operations =
listOf<Operation>(
LoginUserOperation(appId, localOneSignalId, null, null),
createSubscriptionOperation,
)

// When
val response = loginUserOperationExecutor.execute(operations)
Expand All @@ -130,8 +148,12 @@ class LoginUserOperationExecutorTests : FunSpec({
val mockSubscriptionsModelStore = mockk<SubscriptionModelStore>()

val loginUserOperationExecutor =
LoginUserOperationExecutor(mockIdentityOperationExecutor, MockHelper.applicationService(), MockHelper.deviceService(), mockUserBackendService, mockIdentityModelStore, mockPropertiesModelStore, mockSubscriptionsModelStore, MockHelper.configModelStore(), MockHelper.languageContext())
val operations = listOf<Operation>(LoginUserOperation(appId, localOneSignalId, null, null))
LoginUserOperationExecutor(mockIdentityOperationExecutor, AndroidMockHelper.applicationService(), MockHelper.deviceService(), mockUserBackendService, mockIdentityModelStore, mockPropertiesModelStore, mockSubscriptionsModelStore, MockHelper.configModelStore(), MockHelper.languageContext())
val operations =
listOf<Operation>(
LoginUserOperation(appId, localOneSignalId, null, null),
createSubscriptionOperation,
)

// When
val response = loginUserOperationExecutor.execute(operations)
Expand Down Expand Up @@ -679,4 +701,42 @@ class LoginUserOperationExecutorTests : FunSpec({
)
}
}

test("ensure anonymous login with no other operations will fail with FAIL_NORETRY") {
// Given
val mockUserBackendService = mockk<IUserBackendService>()
coEvery { mockUserBackendService.createUser(any(), any(), any(), any()) } returns
CreateUserResponse(mapOf(IdentityConstants.ONESIGNAL_ID to remoteOneSignalId), PropertiesObject(), listOf())

val mockIdentityOperationExecutor = mockk<IdentityOperationExecutor>()

val mockIdentityModelStore = MockHelper.identityModelStore()
val mockPropertiesModelStore = MockHelper.propertiesModelStore()
val mockSubscriptionsModelStore = mockk<SubscriptionModelStore>()

val loginUserOperationExecutor =
LoginUserOperationExecutor(
mockIdentityOperationExecutor,
MockHelper.applicationService(),
MockHelper.deviceService(),
mockUserBackendService,
mockIdentityModelStore,
mockPropertiesModelStore,
mockSubscriptionsModelStore,
MockHelper.configModelStore(),
MockHelper.languageContext(),
)
// anonymous Login request
val operations = listOf<Operation>(LoginUserOperation(appId, localOneSignalId, null, null))

// When
val response = loginUserOperationExecutor.execute(operations)

// Then
response.result shouldBe ExecutionResult.FAIL_NORETRY
// ensure user is not created by the bad request
coVerify(
exactly = 0,
) { mockUserBackendService.createUser(appId, any(), any(), any()) }
}
})
Loading