Skip to content

Commit 92ac22f

Browse files
committed
538: Created transfromContext class responsible for holding the context of the current transform job execution
Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
1 parent 6cf7979 commit 92ac22f

File tree

10 files changed

+309
-133
lines changed

10 files changed

+309
-133
lines changed

src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,9 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction
4848
import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate
4949
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
5050
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
51-
import org.opensearch.indexmanagement.transform.util.TransformLockManager
5251
import org.opensearch.indexmanagement.util.NO_ID
5352
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.DEFAULT_INJECT_ROLES
5453
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.INTERNAL_REQUEST
55-
import org.opensearch.jobscheduler.spi.LockModel
5654
import org.opensearch.jobscheduler.spi.utils.LockService
5755
import org.opensearch.rest.RestStatus
5856
import org.opensearch.transport.RemoteTransportException
@@ -64,7 +62,6 @@ import kotlin.coroutines.resumeWithException
6462
import kotlin.coroutines.suspendCoroutine
6563

6664
const val OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST = "_opendistro_security_protected_indices_conf_request"
67-
private const val TIME_EXCEED_MESSAGE = "Time exceeded"
6865

6966
fun contentParser(bytesReference: BytesReference): XContentParser {
7067
return XContentHelper.createParser(
@@ -187,48 +184,6 @@ suspend fun <T> BackoffPolicy.retry(
187184
} while (true)
188185
}
189186

190-
/**
191-
* Retries the given [block] of code as specified by the receiver [BackoffPolicy],
192-
* if [block] throws an [OpenSearchException] that is retriable (502, 503, 504 or 500 with message Time exceeded).
193-
*
194-
* If all retries fail the final exception will be rethrown. Exceptions caught during intermediate retries are
195-
* logged as warnings to [logger]. Similar to [org.opensearch.action.bulk.Retry], except these retries on
196-
* 502, 503, 504, 429 error codes as well as 500 with Time exceeded. If the request is timeout, lock will be renewed
197-
*
198-
* @param logger - logger used to log intermediate failures
199-
* @param transformLockManager - lock manager that stores current lock used in order to renew the lock if the request timed out
200-
* @param retryOn - any additional [RestStatus] values that should be retried
201-
* @param block - the block of code to retry. This should be a suspend function.
202-
*/
203-
suspend fun <T> BackoffPolicy.retry(
204-
logger: Logger,
205-
transformLockManager: TransformLockManager,
206-
retryOn: List<RestStatus> = emptyList(),
207-
block: suspend (backoff: TimeValue) -> T
208-
): T {
209-
val iter = iterator()
210-
var backoff: TimeValue = TimeValue.ZERO
211-
do {
212-
try {
213-
return block(backoff)
214-
} catch (e: OpenSearchException) {
215-
if (iter.hasNext() && (e.isRetryable() || e.isTimedOut() || retryOn.contains(e.status()))) {
216-
backoff = iter.next()
217-
logger.warn("Operation failed. Retrying in $backoff.", e)
218-
delay(backoff.millis)
219-
// In the case of time out, renew the lock
220-
if(e.isTimedOut()) {
221-
transformLockManager.renewLockForScheduledJob()
222-
}
223-
} else {
224-
throw e
225-
}
226-
}
227-
} while (true)
228-
}
229-
230-
fun LockModel.lockExpirationInSeconds() = lockTime.epochSecond + lockDurationSeconds - Instant.now().epochSecond
231-
232187
/**
233188
* Retries on 502, 503 and 504 per elastic client's behavior: https://github.com/elastic/elasticsearch-net/issues/2061
234189
* 429 must be retried manually as it's not clear if it's ok to retry for requests other than Bulk requests.
@@ -237,14 +192,6 @@ fun OpenSearchException.isRetryable(): Boolean {
237192
return (status() in listOf(RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.GATEWAY_TIMEOUT))
238193
}
239194

240-
/**
241-
* Retries on 500 and Time exceeded message which means that the timeout occurred. In that case
242-
* retry request with reduced size param and timeout param set based on the lock expiration
243-
*/
244-
fun OpenSearchException.isTimedOut(): Boolean {
245-
return status() == RestStatus.INTERNAL_SERVER_ERROR && TIME_EXCEED_MESSAGE == message
246-
}
247-
248195
/**
249196
* Extension function for OpenSearch 6.3 and above that duplicates the OpenSearch 6.2 XContentBuilder.string() method.
250197
*/

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -108,25 +108,22 @@ object TransformRunner :
108108
val transformProcessedBucketLog = TransformProcessedBucketLog()
109109
var bucketsToTransform = BucketsToTransform(HashSet(), metadata)
110110

111-
val transformContext = TransformContext.initTransformContext(transform, context)
111+
val transformContext = TransformContext(TransformLockManager(transform, context))
112+
// Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform
112113
val transformLockManager = transformContext.transformLockManager
114+
transformLockManager.acquireLockForScheduledJob()
113115
try {
114116
do {
115117
when {
116-
transformLockManager.getLock() == null -> {
117-
val thread = Thread.currentThread().id
118-
logger.warn("Cannot acquire lock for transform job ${transform.id} and thread $thread")
119-
// If we fail to get the lock we won't fail the job, instead we return early
118+
transformLockManager.lock == null -> {
119+
logger.warn("Cannot acquire lock for transform job ${transform.id}")
120120
return
121121
}
122122
listOf(TransformMetadata.Status.STOPPED, TransformMetadata.Status.FINISHED).contains(metadata.status) -> {
123123
logger.warn("Transform job ${transform.id} is in ${metadata.status} status. Skipping execution")
124124
return
125125
}
126126
else -> {
127-
val thread = Thread.currentThread().id
128-
logger.warn("Thread $thread acquired a lock")
129-
130127
val validatedMetadata = validateTransform(transform, currentMetadata)
131128
if (validatedMetadata.status == TransformMetadata.Status.FAILED) {
132129
currentMetadata = validatedMetadata
@@ -168,7 +165,7 @@ object TransformRunner :
168165
currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true)
169166
}
170167
// we attempt to renew lock for every loop of transform
171-
transformLockManager.renewLockForScheduledJob()
168+
transformLockManager.renewLockForScheduledJob()
172169
}
173170
}
174171
} while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null)
@@ -180,7 +177,7 @@ object TransformRunner :
180177
failureReason = e.localizedMessage
181178
)
182179
} finally {
183-
if(transformLockManager.getLock() != null){
180+
transformLockManager.lock?.let {
184181
// Update the global checkpoints only after execution finishes successfully
185182
if (transform.continuous && currentMetadata.status != TransformMetadata.Status.FAILED) {
186183
currentMetadata = currentMetadata.copy(
@@ -198,15 +195,24 @@ object TransformRunner :
198195
}
199196
}
200197

201-
private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform, transformContext: TransformContext): BucketsToTransform {
198+
private suspend fun getBucketsToTransformIteration(
199+
transform: Transform,
200+
bucketsToTransform: BucketsToTransform,
201+
transformContext: TransformContext
202+
): BucketsToTransform {
202203
var currentBucketsToTransform = bucketsToTransform
203204
val currentShard = bucketsToTransform.currentShard
204205
// Clear modified buckets from previous iteration
205206
currentBucketsToTransform.modifiedBuckets.clear()
206207

207208
if (currentShard != null) {
208209
val shardLevelModifiedBuckets = withTransformSecurityContext(transform) {
209-
transformSearchService.getShardLevelModifiedBuckets(transform, currentBucketsToTransform.metadata.afterKey, currentShard, transformContext)
210+
transformSearchService.getShardLevelModifiedBuckets(
211+
transform,
212+
currentBucketsToTransform.metadata.afterKey,
213+
currentShard,
214+
transformContext
215+
)
210216
}
211217
currentBucketsToTransform.modifiedBuckets.addAll(shardLevelModifiedBuckets.modifiedBuckets)
212218
val mergedSearchTime = currentBucketsToTransform.metadata.stats.searchTimeInMillis +
@@ -288,9 +294,6 @@ object TransformRunner :
288294
transformContext: TransformContext
289295
): TransformMetadata {
290296
val updatedMetadata = if (modifiedBuckets.isNotEmpty()) {
291-
292-
Thread.sleep(61000)
293-
294297
val transformSearchResult = withTransformSecurityContext(transform) {
295298
transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets, transformContext)
296299
}

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ import org.opensearch.indexmanagement.transform.model.ShardNewDocuments
3939
import org.opensearch.indexmanagement.transform.model.Transform
4040
import org.opensearch.indexmanagement.transform.model.TransformSearchResult
4141
import org.opensearch.indexmanagement.transform.model.TransformStats
42+
import org.opensearch.indexmanagement.transform.opensearchapi.retryTransformSearch
4243
import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT
4344
import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS
4445
import org.opensearch.indexmanagement.transform.util.TransformContext
45-
import org.opensearch.indexmanagement.transform.util.TransformLockManager
4646
import org.opensearch.indexmanagement.util.IndexUtils.Companion.LUCENE_MAX_CLAUSES
4747
import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL
4848
import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize
@@ -61,6 +61,7 @@ import org.opensearch.search.aggregations.metrics.Percentiles
6161
import org.opensearch.search.aggregations.metrics.ScriptedMetric
6262
import org.opensearch.search.builder.SearchSourceBuilder
6363
import org.opensearch.transport.RemoteTransportException
64+
import java.time.Instant
6465
import java.util.concurrent.TimeUnit
6566
import kotlin.math.max
6667
import kotlin.math.pow
@@ -114,25 +115,39 @@ class TransformSearchService(
114115
}
115116

116117
@Suppress("RethrowCaughtException")
117-
suspend fun getShardLevelModifiedBuckets(transform: Transform, afterKey: Map<String, Any>?, currentShard: ShardNewDocuments, transformContext: TransformContext): BucketSearchResult {
118+
suspend fun getShardLevelModifiedBuckets(
119+
transform: Transform,
120+
afterKey: Map<String, Any>?,
121+
currentShard: ShardNewDocuments,
122+
transformContext: TransformContext
123+
): BucketSearchResult {
118124
try {
119125
var retryAttempt = 0
120126
var pageSize = calculateMaxPageSize(transform)
121-
val searchResponse = backoffPolicy.retry(logger, transformContext.transformLockManager) {
127+
val searchStart = Instant.now().epochSecond
128+
val searchResponse = backoffPolicy.retryTransformSearch(logger, transformContext.transformLockManager) {
122129
val pageSizeDecay = 2f.pow(retryAttempt++)
130+
val searchRequestTimeoutInSeconds = transformContext.getMaxRequestTimeoutInSeconds()
123131
client.suspendUntil { listener: ActionListener<SearchResponse> ->
124-
pageSize = transformContext.pageSize ?: max(1, pageSize.div(pageSizeDecay.toInt()))
132+
// If the previous request of the current transform job execution was successful, take the page size of previous request.
133+
// If not, calculate the page size.
134+
pageSize = transformContext.lastSuccessfulPageSize ?: max(1, pageSize.div(pageSizeDecay.toInt()))
125135
if (retryAttempt > 1) {
126136
logger.debug(
127137
"Attempt [${retryAttempt - 1}] to get modified buckets for transform [${transform.id}]. Attempting " +
128138
"again with reduced page size [$pageSize]"
129139
)
130140
}
131-
val request = getShardLevelBucketsSearchRequest(transform, afterKey, pageSize, currentShard, transformContext.getMaxRequestTimeout())
141+
if (searchRequestTimeoutInSeconds == null) {
142+
return@suspendUntil
143+
}
144+
val request = getShardLevelBucketsSearchRequest(transform, afterKey, pageSize, currentShard, searchRequestTimeoutInSeconds)
132145
search(request, listener)
133146
}
134147
}
135-
transformContext.pageSize = pageSize
148+
// If the request was successful, update page size
149+
transformContext.lastSuccessfulPageSize = pageSize
150+
transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart)
136151
return convertBucketSearchResponse(transform, searchResponse)
137152
} catch (e: TransformSearchServiceException) {
138153
throw e
@@ -169,22 +184,28 @@ class TransformSearchService(
169184
modifiedBuckets.size
170185

171186
var retryAttempt = 0
172-
val searchResponse = backoffPolicy.retry(logger, transformContext.transformLockManager) {
173-
// TODO: Should we store the value of the past successful page size (?)
174-
val pageSizeDecay = 2f.pow(retryAttempt++)
187+
val searchStart = Instant.now().epochSecond
188+
val searchResponse = backoffPolicy.retryTransformSearch(logger, transformContext.transformLockManager) {
189+
val pageSizeDecay = 2f.pow(retryAttempt++)
190+
val searchRequestTimeoutInSeconds = transformContext.getMaxRequestTimeoutInSeconds()
191+
175192
client.suspendUntil { listener: ActionListener<SearchResponse> ->
176-
pageSize = transformContext.pageSize ?: max(1, pageSize.div(pageSizeDecay.toInt()))
193+
// If the previous request of the current transform job execution was successful, take the page size of previous request.
194+
// If not, calculate the page size.
195+
pageSize = transformContext.lastSuccessfulPageSize ?: max(1, pageSize.div(pageSizeDecay.toInt()))
177196
if (retryAttempt > 1) {
178197
logger.debug(
179198
"Attempt [${retryAttempt - 1}] of composite search failed for transform [${transform.id}]. Attempting " +
180199
"again with reduced page size [$pageSize]"
181200
)
182201
}
183-
val request = getSearchServiceRequest(transform, afterKey, pageSize, modifiedBuckets, transformContext.getMaxRequestTimeout())
202+
val request = getSearchServiceRequest(transform, afterKey, pageSize, modifiedBuckets, searchRequestTimeoutInSeconds)
184203
search(request, listener)
185204
}
186205
}
187-
transformContext.pageSize = pageSize
206+
// If the request was successful, update page size
207+
transformContext.lastSuccessfulPageSize = pageSize
208+
transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart)
188209
return convertResponse(transform, searchResponse, modifiedBuckets = modifiedBuckets)
189210
} catch (e: TransformSearchServiceException) {
190211
throw e
@@ -210,7 +231,7 @@ class TransformSearchService(
210231
afterKey: Map<String, Any>? = null,
211232
pageSize: Int,
212233
modifiedBuckets: MutableSet<Map<String, Any>>? = null,
213-
timeout: Long? = null
234+
timeoutInSeconds: Long? = null
214235
): SearchRequest {
215236
val sources = mutableListOf<CompositeValuesSourceBuilder<*>>()
216237
transform.groups.forEach { group -> sources.add(group.toSourceBuilder().missingBucket(true)) }
@@ -223,7 +244,7 @@ class TransformSearchService(
223244
} else {
224245
getQueryWithModifiedBuckets(transform.dataSelectionQuery, modifiedBuckets, transform.groups)
225246
}
226-
return getSearchServiceRequest(transform.sourceIndex, query, aggregationBuilder, timeout)
247+
return getSearchServiceRequest(transform.sourceIndex, query, aggregationBuilder, timeoutInSeconds)
227248
}
228249

229250
private fun getQueryWithModifiedBuckets(
@@ -251,7 +272,24 @@ class TransformSearchService(
251272
return query
252273
}
253274

254-
private fun getSearchServiceRequest(index: String, query: QueryBuilder, aggregationBuilder: CompositeAggregationBuilder, timeout: Long? = null): SearchRequest {
275+
/**
276+
* Creates transform search request and sets timeout if it is provided
277+
* Referring on: https://github.com/opensearch-project/OpenSearch/pull/1085
278+
* https://github.com/opensearch-project/documentation-website/blob/main/_opensearch/rest-api/search.md#url-parameters
279+
* cancel_after_time_interval property is used in order to set timeout of transform search request has not been ported to version 1.0
280+
* thus we can't use it for version 1.0 support
281+
*
282+
* @param index - index that will be searched
283+
* @param query - any additional [RestStatus] values that should be retried
284+
* @param aggregationBuilder - search aggregations
285+
* @param timeoutInSeconds - timeout period used for transform search request
286+
*/
287+
private fun getSearchServiceRequest(
288+
index: String,
289+
query: QueryBuilder,
290+
aggregationBuilder: CompositeAggregationBuilder,
291+
timeoutInSeconds: Long? = null
292+
): SearchRequest {
255293
val searchSourceBuilder = SearchSourceBuilder()
256294
.trackTotalHits(false)
257295
.size(0)
@@ -260,10 +298,9 @@ class TransformSearchService(
260298
val request = SearchRequest(index)
261299
.source(searchSourceBuilder)
262300
.allowPartialSearchResults(false)
263-
264-
if(timeout != null){
265-
request.cancelAfterTimeInterval = TimeValue(timeout, TimeUnit.SECONDS)
266-
}
301+
// The time after which the search request will be canceled.
302+
// Request-level parameter takes precedence over cancel_after_time_interval cluster setting. Default is -1.
303+
request.cancelAfterTimeInterval = timeoutInSeconds?.let { TimeValue(timeoutInSeconds, TimeUnit.SECONDS) }
267304
return request
268305
}
269306

@@ -272,15 +309,15 @@ class TransformSearchService(
272309
afterKey: Map<String, Any>? = null,
273310
pageSize: Int,
274311
currentShard: ShardNewDocuments,
275-
timeout: Long?
312+
timeoutInSeconds: Long?
276313
): SearchRequest {
277314
val rangeQuery = getSeqNoRangeQuery(currentShard.from, currentShard.to)
278315
val query = QueryBuilders.boolQuery().filter(rangeQuery).must(transform.dataSelectionQuery)
279316
val sources = transform.groups.map { it.toSourceBuilder().missingBucket(true) }
280317
val aggregationBuilder = CompositeAggregationBuilder(transform.id, sources)
281318
.size(pageSize)
282319
.apply { afterKey?.let { this.aggregateAfter(it) } }
283-
return getSearchServiceRequest(currentShard.shardId.indexName, query, aggregationBuilder, timeout)
320+
return getSearchServiceRequest(currentShard.shardId.indexName, query, aggregationBuilder, timeoutInSeconds)
284321
.preference("_shards:" + currentShard.shardId.id.toString())
285322
}
286323

src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class TransportExplainTransformAction @Inject constructor(
6464

6565
private val log = LogManager.getLogger(javaClass)
6666

67-
@Suppress("SpreadOperator", "NestedBlockDepth")
67+
@Suppress("SpreadOperator", "NestedBlockDepth", "LongMethod")
6868
override fun doExecute(task: Task, request: ExplainTransformRequest, actionListener: ActionListener<ExplainTransformResponse>) {
6969
log.debug(
7070
"User and roles string from thread context: ${client.threadPool().threadContext.getTransient<String>(

0 commit comments

Comments
 (0)