Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ package org.opensearch.indexmanagement.snapshotmanagement.engine

import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.opensearch.ExceptionsHelper
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.ConfigConstants
import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
import org.opensearch.indexmanagement.opensearchapi.retry
Expand Down Expand Up @@ -215,6 +217,14 @@ class SMStateMachine(
metadata = md
}
} catch (ex: Exception) {
val unwrappedException = ExceptionsHelper.unwrapCause(ex) as Exception
if (unwrappedException is VersionConflictEngineException) {
// Don't throw the exception
// TODO: Extract seqNo on VersionConflictException and retry updateMetadata with updated seqNo.
log.error("Version conflict exception while updating metadata.", ex)
return
}

val smEx = SnapshotManagementException(ExceptionKey.METADATA_INDEXING_FAILURE, ex)
log.error(smEx.message, ex)
throw smEx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,23 @@

package org.opensearch.indexmanagement.snapshotmanagement.engine

import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.argumentCaptor
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.spy
import com.nhaarman.mockitokotlin2.times
import com.nhaarman.mockitokotlin2.verify
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.opensearch.OpenSearchException
import org.opensearch.common.unit.TimeValue
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.index.shard.ShardId
import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.indexmanagement.MocksTestCase
import org.opensearch.indexmanagement.opensearchapi.retry
import org.opensearch.indexmanagement.snapshotmanagement.SnapshotManagementException
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.creationTransitions
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.deletionTransitions
Expand Down Expand Up @@ -230,4 +240,66 @@ open class SMStateMachineTests : MocksTestCase() {
assertEquals(1, firstValue.policyPrimaryTerm)
}
}

fun `test updateMetadata handles VersionConflictEngineException gracefully`() = runBlocking {
val initialMetadata = randomSMMetadata(
policySeqNo = 0,
policyPrimaryTerm = 0,
)
val smPolicy = randomSMPolicy(
seqNo = 1,
primaryTerm = 1,
)
val updatedMetadata = randomSMMetadata(
policySeqNo = 1,
policyPrimaryTerm = 1,
)

doAnswer {
val listener = it.getArgument<ActionListener<ActionResponse>>(1)
listener.onFailure(VersionConflictEngineException(ShardId("index", "_na_", 1), "test", "message"))
}.whenever(client).index(any(), any())

val stateMachineSpy = spy(SMStateMachine(client, smPolicy, initialMetadata, settings, threadPool, indicesManager))

// Verify VersionConflictEngineException is handled gracefully
try {
stateMachineSpy.updateMetadata(updatedMetadata)
} catch (e: Exception) {
fail("VersionConflictEngineException should be handled without throwing: ${e.message}")
}
}

fun `test updateMetadata throws SnapshotManagementException for other exceptions`() = runBlocking {
val initialMetadata = randomSMMetadata(
policySeqNo = 0,
policyPrimaryTerm = 0,
)
val smPolicy = randomSMPolicy(
seqNo = 1,
primaryTerm = 1,
)
val updatedMetadata = randomSMMetadata(
policySeqNo = 1,
policyPrimaryTerm = 1,
)

val stateMachineSpy = spy(SMStateMachine(client, smPolicy, initialMetadata, settings, threadPool, indicesManager))

val openSearchException = OpenSearchException("Test exception")
doAnswer {
val listener = it.getArgument<ActionListener<ActionResponse>>(1)
listener.onFailure(openSearchException)
}.whenever(client).index(any(), any())

// Verify OpenSearchException is wrapped in SnapshotManagementException
val thrownException = assertThrows(SnapshotManagementException::class.java) {
runBlocking {
stateMachineSpy.updateMetadata(updatedMetadata)
}
}

// Verify exception type and cause
assertTrue(thrownException.cause is OpenSearchException)
}
}
Loading