Skip to content

Commit 6cf7979

Browse files
committed
538: Added transform context used for handling the last succesfull page size and transform lock
Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
1 parent 63984b2 commit 6cf7979

File tree

6 files changed

+234
-45
lines changed

6 files changed

+234
-45
lines changed

src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction
4848
import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate
4949
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
5050
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
51+
import org.opensearch.indexmanagement.transform.util.TransformLockManager
5152
import org.opensearch.indexmanagement.util.NO_ID
5253
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.DEFAULT_INJECT_ROLES
5354
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.INTERNAL_REQUEST
55+
import org.opensearch.jobscheduler.spi.LockModel
5456
import org.opensearch.jobscheduler.spi.utils.LockService
5557
import org.opensearch.rest.RestStatus
5658
import org.opensearch.transport.RemoteTransportException
@@ -62,6 +64,7 @@ import kotlin.coroutines.resumeWithException
6264
import kotlin.coroutines.suspendCoroutine
6365

6466
const val OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST = "_opendistro_security_protected_indices_conf_request"
67+
private const val TIME_EXCEED_MESSAGE = "Time exceeded"
6568

6669
fun contentParser(bytesReference: BytesReference): XContentParser {
6770
return XContentHelper.createParser(
@@ -184,6 +187,48 @@ suspend fun <T> BackoffPolicy.retry(
184187
} while (true)
185188
}
186189

190+
/**
191+
* Retries the given [block] of code as specified by the receiver [BackoffPolicy],
192+
* if [block] throws an [OpenSearchException] that is retriable (502, 503, 504 or 500 with message Time exceeded).
193+
*
194+
* If all retries fail the final exception will be rethrown. Exceptions caught during intermediate retries are
195+
* logged as warnings to [logger]. Similar to [org.opensearch.action.bulk.Retry], except these retries on
196+
* 502, 503, 504, 429 error codes as well as 500 with Time exceeded. If the request is timeout, lock will be renewed
197+
*
198+
* @param logger - logger used to log intermediate failures
199+
* @param transformLockManager - lock manager that stores current lock used in order to renew the lock if the request timed out
200+
* @param retryOn - any additional [RestStatus] values that should be retried
201+
* @param block - the block of code to retry. This should be a suspend function.
202+
*/
203+
suspend fun <T> BackoffPolicy.retry(
204+
logger: Logger,
205+
transformLockManager: TransformLockManager,
206+
retryOn: List<RestStatus> = emptyList(),
207+
block: suspend (backoff: TimeValue) -> T
208+
): T {
209+
val iter = iterator()
210+
var backoff: TimeValue = TimeValue.ZERO
211+
do {
212+
try {
213+
return block(backoff)
214+
} catch (e: OpenSearchException) {
215+
if (iter.hasNext() && (e.isRetryable() || e.isTimedOut() || retryOn.contains(e.status()))) {
216+
backoff = iter.next()
217+
logger.warn("Operation failed. Retrying in $backoff.", e)
218+
delay(backoff.millis)
219+
// In the case of time out, renew the lock
220+
if(e.isTimedOut()) {
221+
transformLockManager.renewLockForScheduledJob()
222+
}
223+
} else {
224+
throw e
225+
}
226+
}
227+
} while (true)
228+
}
229+
230+
fun LockModel.lockExpirationInSeconds() = lockTime.epochSecond + lockDurationSeconds - Instant.now().epochSecond
231+
187232
/**
188233
* Retries on 502, 503 and 504 per elastic client's behavior: https://github.com/elastic/elasticsearch-net/issues/2061
189234
* 429 must be retried manually as it's not clear if it's ok to retry for requests other than Bulk requests.
@@ -192,6 +237,14 @@ fun OpenSearchException.isRetryable(): Boolean {
192237
return (status() in listOf(RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.GATEWAY_TIMEOUT))
193238
}
194239

240+
/**
241+
* Retries on 500 and Time exceeded message which means that the timeout occurred. In that case
242+
* retry request with reduced size param and timeout param set based on the lock expiration
243+
*/
244+
fun OpenSearchException.isTimedOut(): Boolean {
245+
return status() == RestStatus.INTERNAL_SERVER_ERROR && TIME_EXCEED_MESSAGE == message
246+
}
247+
195248
/**
196249
* Extension function for OpenSearch 6.3 and above that duplicates the OpenSearch 6.2 XContentBuilder.string() method.
197250
*/

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,11 @@ import kotlinx.coroutines.Dispatchers
1111
import kotlinx.coroutines.SupervisorJob
1212
import kotlinx.coroutines.launch
1313
import org.apache.logging.log4j.LogManager
14-
import org.opensearch.action.bulk.BackoffPolicy
1514
import org.opensearch.action.support.WriteRequest
1615
import org.opensearch.client.Client
1716
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
1817
import org.opensearch.cluster.service.ClusterService
1918
import org.opensearch.common.settings.Settings
20-
import org.opensearch.common.unit.TimeValue
2119
import org.opensearch.common.xcontent.NamedXContentRegistry
2220
import org.opensearch.index.shard.ShardId
2321
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
@@ -31,10 +29,8 @@ import org.opensearch.indexmanagement.transform.model.ContinuousTransformStats
3129
import org.opensearch.indexmanagement.transform.model.Transform
3230
import org.opensearch.indexmanagement.transform.model.TransformMetadata
3331
import org.opensearch.indexmanagement.transform.model.initializeShardsToSearch
34-
import org.opensearch.indexmanagement.transform.settings.TransformSettings
35-
import org.opensearch.indexmanagement.util.acquireLockForScheduledJob
36-
import org.opensearch.indexmanagement.util.releaseLockForScheduledJob
37-
import org.opensearch.indexmanagement.util.renewLockForScheduledJob
32+
import org.opensearch.indexmanagement.transform.util.TransformContext
33+
import org.opensearch.indexmanagement.transform.util.TransformLockManager
3834
import org.opensearch.jobscheduler.spi.JobExecutionContext
3935
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
4036
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
@@ -108,18 +104,18 @@ object TransformRunner :
108104
var newGlobalCheckpoints: Map<ShardId, Long>? = null
109105
var newGlobalCheckpointTime: Instant? = null
110106
var currentMetadata = metadata
111-
val backoffPolicy = BackoffPolicy.exponentialBackoff(
112-
TimeValue.timeValueMillis(TransformSettings.DEFAULT_RENEW_LOCK_RETRY_DELAY),
113-
TransformSettings.DEFAULT_RENEW_LOCK_RETRY_COUNT
114-
)
107+
115108
val transformProcessedBucketLog = TransformProcessedBucketLog()
116109
var bucketsToTransform = BucketsToTransform(HashSet(), metadata)
117-
var lock = acquireLockForScheduledJob(transform, context, backoffPolicy)
110+
111+
val transformContext = TransformContext.initTransformContext(transform, context)
112+
val transformLockManager = transformContext.transformLockManager
118113
try {
119114
do {
120115
when {
121-
lock == null -> {
122-
logger.warn("Cannot acquire lock for transform job ${transform.id}")
116+
transformLockManager.getLock() == null -> {
117+
val thread = Thread.currentThread().id
118+
logger.warn("Cannot acquire lock for transform job ${transform.id} and thread $thread")
123119
// If we fail to get the lock we won't fail the job, instead we return early
124120
return
125121
}
@@ -128,6 +124,9 @@ object TransformRunner :
128124
return
129125
}
130126
else -> {
127+
val thread = Thread.currentThread().id
128+
logger.warn("Thread $thread acquired a lock")
129+
131130
val validatedMetadata = validateTransform(transform, currentMetadata)
132131
if (validatedMetadata.status == TransformMetadata.Status.FAILED) {
133132
currentMetadata = validatedMetadata
@@ -147,15 +146,15 @@ object TransformRunner :
147146
// If there are shards to search do it here
148147
if (bucketsToTransform.currentShard != null) {
149148
// Computes aggregation on modified documents for current shard to get modified buckets
150-
bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform).also {
149+
bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform, transformContext).also {
151150
currentMetadata = it.metadata
152151
}
153152
// Filter out already processed buckets
154153
val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter {
155154
transformProcessedBucketLog.isNotProcessed(it)
156155
}.toMutableSet()
157156
// Recompute modified buckets and update them in targetIndex
158-
currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets)
157+
currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets, transformContext)
159158
// Add processed buckets to 'processed set' so that we don't try to reprocess them again
160159
transformProcessedBucketLog.addBuckets(modifiedBuckets.toList())
161160
// Update TransformMetadata
@@ -164,16 +163,12 @@ object TransformRunner :
164163
}
165164
} else {
166165
// Computes buckets from source index and stores them in targetIndex as transform docs
167-
currentMetadata = computeBucketsIteration(transform, currentMetadata)
166+
currentMetadata = computeBucketsIteration(transform, currentMetadata, transformContext)
168167
// Update TransformMetadata
169168
currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true)
170169
}
171170
// we attempt to renew lock for every loop of transform
172-
val renewedLock = renewLockForScheduledJob(context, lock, backoffPolicy)
173-
if (renewedLock == null) {
174-
releaseLockForScheduledJob(context, lock)
175-
}
176-
lock = renewedLock
171+
transformLockManager.renewLockForScheduledJob()
177172
}
178173
}
179174
} while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null)
@@ -185,7 +180,7 @@ object TransformRunner :
185180
failureReason = e.localizedMessage
186181
)
187182
} finally {
188-
lock?.let {
183+
if(transformLockManager.getLock() != null){
189184
// Update the global checkpoints only after execution finishes successfully
190185
if (transform.continuous && currentMetadata.status != TransformMetadata.Status.FAILED) {
191186
currentMetadata = currentMetadata.copy(
@@ -198,20 +193,20 @@ object TransformRunner :
198193
logger.info("Disabling the transform job ${transform.id}")
199194
updateTransform(transform.copy(enabled = false, enabledAt = null))
200195
}
201-
releaseLockForScheduledJob(context, it)
196+
transformLockManager.releaseLockForScheduledJob()
202197
}
203198
}
204199
}
205200

206-
private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform): BucketsToTransform {
201+
private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform, transformContext: TransformContext): BucketsToTransform {
207202
var currentBucketsToTransform = bucketsToTransform
208203
val currentShard = bucketsToTransform.currentShard
209204
// Clear modified buckets from previous iteration
210205
currentBucketsToTransform.modifiedBuckets.clear()
211206

212207
if (currentShard != null) {
213208
val shardLevelModifiedBuckets = withTransformSecurityContext(transform) {
214-
transformSearchService.getShardLevelModifiedBuckets(transform, currentBucketsToTransform.metadata.afterKey, currentShard)
209+
transformSearchService.getShardLevelModifiedBuckets(transform, currentBucketsToTransform.metadata.afterKey, currentShard, transformContext)
215210
}
216211
currentBucketsToTransform.modifiedBuckets.addAll(shardLevelModifiedBuckets.modifiedBuckets)
217212
val mergedSearchTime = currentBucketsToTransform.metadata.stats.searchTimeInMillis +
@@ -258,13 +253,15 @@ object TransformRunner :
258253
private suspend fun computeBucketsIteration(
259254
transform: Transform,
260255
metadata: TransformMetadata,
256+
transformContext: TransformContext
261257
): TransformMetadata {
262258

263259
val transformSearchResult = withTransformSecurityContext(transform) {
264260
transformSearchService.executeCompositeSearch(
265261
transform,
266262
metadata.afterKey,
267-
null
263+
null,
264+
transformContext
268265
)
269266
}
270267
val indexTimeInMillis = withTransformSecurityContext(transform) {
@@ -287,11 +284,15 @@ object TransformRunner :
287284
private suspend fun recomputeModifiedBuckets(
288285
transform: Transform,
289286
metadata: TransformMetadata,
290-
modifiedBuckets: MutableSet<Map<String, Any>>
287+
modifiedBuckets: MutableSet<Map<String, Any>>,
288+
transformContext: TransformContext
291289
): TransformMetadata {
292290
val updatedMetadata = if (modifiedBuckets.isNotEmpty()) {
291+
292+
Thread.sleep(61000)
293+
293294
val transformSearchResult = withTransformSecurityContext(transform) {
294-
transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets)
295+
transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets, transformContext)
295296
}
296297
val indexTimeInMillis = withTransformSecurityContext(transform) {
297298
transformIndexer.index(transformSearchResult.docsToIndex)

0 commit comments

Comments
 (0)