Skip to content

Commit b9ae73d

Browse files
committed
538: Created transfromContext class responsible for holding the context of the current transform job execution
Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
1 parent 6cf7979 commit b9ae73d

File tree

7 files changed

+169
-98
lines changed

7 files changed

+169
-98
lines changed

src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,19 +52,22 @@ import org.opensearch.indexmanagement.transform.util.TransformLockManager
5252
import org.opensearch.indexmanagement.util.NO_ID
5353
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.DEFAULT_INJECT_ROLES
5454
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.INTERNAL_REQUEST
55-
import org.opensearch.jobscheduler.spi.LockModel
5655
import org.opensearch.jobscheduler.spi.utils.LockService
5756
import org.opensearch.rest.RestStatus
57+
import org.opensearch.tasks.TaskCancelledException
5858
import org.opensearch.transport.RemoteTransportException
5959
import java.io.IOException
6060
import java.time.Instant
61+
import java.util.regex.Pattern
6162
import kotlin.coroutines.CoroutineContext
6263
import kotlin.coroutines.resume
6364
import kotlin.coroutines.resumeWithException
6465
import kotlin.coroutines.suspendCoroutine
6566

6667
const val OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST = "_opendistro_security_protected_indices_conf_request"
67-
private const val TIME_EXCEED_MESSAGE = "Time exceeded"
68+
69+
// Timeout pattern used for checking the timeout message which is in unique format if the transform search timeout was set programmatically
70+
private val timeoutMessagePattern = Pattern.compile("cancelled task with reason: Cancellation timeout of (.*) is expired")
6871

6972
fun contentParser(bytesReference: BytesReference): XContentParser {
7073
return XContentHelper.createParser(
@@ -193,14 +196,14 @@ suspend fun <T> BackoffPolicy.retry(
193196
*
194197
* If all retries fail the final exception will be rethrown. Exceptions caught during intermediate retries are
195198
* logged as warnings to [logger]. Similar to [org.opensearch.action.bulk.Retry], except these retries on
196-
* 502, 503, 504, 429 error codes as well as 500 with Time exceeded. If the request is timeout, lock will be renewed
199+
* 502, 503, 504 error codes as well as when TaskCancelledException is being raised as cause. If the request is timeout, lock will be renewed
197200
*
198201
* @param logger - logger used to log intermediate failures
199202
* @param transformLockManager - lock manager that stores current lock used in order to renew the lock if the request timed out
200203
* @param retryOn - any additional [RestStatus] values that should be retried
201204
* @param block - the block of code to retry. This should be a suspend function.
202205
*/
203-
suspend fun <T> BackoffPolicy.retry(
206+
suspend fun <T> BackoffPolicy.retryTransformSearch(
204207
logger: Logger,
205208
transformLockManager: TransformLockManager,
206209
retryOn: List<RestStatus> = emptyList(),
@@ -212,22 +215,24 @@ suspend fun <T> BackoffPolicy.retry(
212215
try {
213216
return block(backoff)
214217
} catch (e: OpenSearchException) {
215-
if (iter.hasNext() && (e.isRetryable() || e.isTimedOut() || retryOn.contains(e.status()))) {
216-
backoff = iter.next()
217-
logger.warn("Operation failed. Retrying in $backoff.", e)
218-
delay(backoff.millis)
219-
// In the case of time out, renew the lock
220-
if(e.isTimedOut()) {
221-
transformLockManager.renewLockForScheduledJob()
222-
}
223-
} else {
218+
if (!iter.hasNext() || !e.isRetryable(retryOn, e)) {
224219
throw e
225220
}
221+
backoff = iter.next()
222+
logger.warn("Operation failed. Retrying in $backoff.", e)
223+
delay(backoff.millis)
224+
if (isTransformOperationTimedOut(e)) {
225+
// In the case of time out, renew the lock
226+
transformLockManager.renewLockForScheduledJob()
227+
}
226228
}
227229
} while (true)
228230
}
229231

230-
fun LockModel.lockExpirationInSeconds() = lockTime.epochSecond + lockDurationSeconds - Instant.now().epochSecond
232+
private fun OpenSearchException.isRetryable(
233+
retryOn: List<RestStatus>,
234+
ex: OpenSearchException
235+
) = this.isRetryable() || isTransformOperationTimedOut(ex) || retryOn.contains(this.status())
231236

232237
/**
233238
* Retries on 502, 503 and 504 per elastic client's behavior: https://github.com/elastic/elasticsearch-net/issues/2061
@@ -238,11 +243,17 @@ fun OpenSearchException.isRetryable(): Boolean {
238243
}
239244

240245
/**
241-
* Retries on 500 and Time exceeded message which means that the timeout occurred. In that case
242-
* retry request with reduced size param and timeout param set based on the lock expiration
246+
* Retries on 408 or on TaskCancelledException once the message matches the given pattern.
247+
* In that case retry request with reduced size param and timeout param set based on the lock expiration
243248
*/
244-
fun OpenSearchException.isTimedOut(): Boolean {
245-
return status() == RestStatus.INTERNAL_SERVER_ERROR && TIME_EXCEED_MESSAGE == message
249+
fun isTransformOperationTimedOut(ex: OpenSearchException): Boolean {
250+
if (RestStatus.REQUEST_TIMEOUT == ex.status()) {
251+
return true
252+
}
253+
if (ex.cause != null && ex.cause is TaskCancelledException) {
254+
return timeoutMessagePattern.matcher((ex.cause as TaskCancelledException).message).matches()
255+
}
256+
return false
246257
}
247258

248259
/**

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -108,25 +108,22 @@ object TransformRunner :
108108
val transformProcessedBucketLog = TransformProcessedBucketLog()
109109
var bucketsToTransform = BucketsToTransform(HashSet(), metadata)
110110

111-
val transformContext = TransformContext.initTransformContext(transform, context)
111+
val transformContext = TransformContext(TransformLockManager(transform, context))
112+
// Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform
112113
val transformLockManager = transformContext.transformLockManager
114+
transformLockManager.acquireLockForScheduledJob()
113115
try {
114116
do {
115117
when {
116-
transformLockManager.getLock() == null -> {
117-
val thread = Thread.currentThread().id
118-
logger.warn("Cannot acquire lock for transform job ${transform.id} and thread $thread")
119-
// If we fail to get the lock we won't fail the job, instead we return early
118+
transformLockManager.lock == null -> {
119+
logger.warn("Cannot acquire lock for transform job ${transform.id}")
120120
return
121121
}
122122
listOf(TransformMetadata.Status.STOPPED, TransformMetadata.Status.FINISHED).contains(metadata.status) -> {
123123
logger.warn("Transform job ${transform.id} is in ${metadata.status} status. Skipping execution")
124124
return
125125
}
126126
else -> {
127-
val thread = Thread.currentThread().id
128-
logger.warn("Thread $thread acquired a lock")
129-
130127
val validatedMetadata = validateTransform(transform, currentMetadata)
131128
if (validatedMetadata.status == TransformMetadata.Status.FAILED) {
132129
currentMetadata = validatedMetadata
@@ -168,7 +165,7 @@ object TransformRunner :
168165
currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true)
169166
}
170167
// we attempt to renew lock for every loop of transform
171-
transformLockManager.renewLockForScheduledJob()
168+
transformLockManager.renewLockForScheduledJob()
172169
}
173170
}
174171
} while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null)
@@ -180,7 +177,7 @@ object TransformRunner :
180177
failureReason = e.localizedMessage
181178
)
182179
} finally {
183-
if(transformLockManager.getLock() != null){
180+
transformLockManager.lock?.let {
184181
// Update the global checkpoints only after execution finishes successfully
185182
if (transform.continuous && currentMetadata.status != TransformMetadata.Status.FAILED) {
186183
currentMetadata = currentMetadata.copy(
@@ -198,15 +195,24 @@ object TransformRunner :
198195
}
199196
}
200197

201-
private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform, transformContext: TransformContext): BucketsToTransform {
198+
private suspend fun getBucketsToTransformIteration(
199+
transform: Transform,
200+
bucketsToTransform: BucketsToTransform,
201+
transformContext: TransformContext
202+
): BucketsToTransform {
202203
var currentBucketsToTransform = bucketsToTransform
203204
val currentShard = bucketsToTransform.currentShard
204205
// Clear modified buckets from previous iteration
205206
currentBucketsToTransform.modifiedBuckets.clear()
206207

207208
if (currentShard != null) {
208209
val shardLevelModifiedBuckets = withTransformSecurityContext(transform) {
209-
transformSearchService.getShardLevelModifiedBuckets(transform, currentBucketsToTransform.metadata.afterKey, currentShard, transformContext)
210+
transformSearchService.getShardLevelModifiedBuckets(
211+
transform,
212+
currentBucketsToTransform.metadata.afterKey,
213+
currentShard,
214+
transformContext
215+
)
210216
}
211217
currentBucketsToTransform.modifiedBuckets.addAll(shardLevelModifiedBuckets.modifiedBuckets)
212218
val mergedSearchTime = currentBucketsToTransform.metadata.stats.searchTimeInMillis +
@@ -288,9 +294,6 @@ object TransformRunner :
288294
transformContext: TransformContext
289295
): TransformMetadata {
290296
val updatedMetadata = if (modifiedBuckets.isNotEmpty()) {
291-
292-
Thread.sleep(61000)
293-
294297
val transformSearchResult = withTransformSecurityContext(transform) {
295298
transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets, transformContext)
296299
}

0 commit comments

Comments
 (0)