From fc026edce4b6073a539baf74277a5f14ebe8d445 Mon Sep 17 00:00:00 2001 From: Joshua152 <59714582+Joshua152@users.noreply.github.com> Date: Tue, 12 Dec 2023 12:25:42 -0500 Subject: [PATCH] Added minimum timeout for transforms search of 10 minutes (#1033) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added minimum timeout for transforms search of 10 minutes Signed-off-by: Joshua Au * Extracted cancel minimum code to function Signed-off-by: Joshua Au * Fixed transform code to use cluster setting Signed-off-by: Joshua Au * Removed log statements Signed-off-by: Joshua Au * Changed timeout logic Signed-off-by: Joshua Au * Switched to basing off seconds Signed-off-by: Joshua Au * [Feature] Support Transform as an ISM action (#760) * Initial impl Signed-off-by: Tanqiu Liu * fix style Signed-off-by: Tanqiu Liu * end to end functional Signed-off-by: Tanqiu Liu * ISM transform unit tests & integ tests Signed-off-by: Tanqiu Liu * Fix after core #8157 (#857) Signed-off-by: bowenlan-amzn * Upgrade the backport workflow (#862) Signed-off-by: Ashish Agrawal Signed-off-by: Tanqiu Liu * Added 2.9 release notes. (#851) * Added 2.9 release notes. Signed-off-by: AWSHurneyt * Added 2.9 release notes. Signed-off-by: AWSHurneyt * Added 2.9 release notes. Signed-off-by: AWSHurneyt --------- Signed-off-by: AWSHurneyt Signed-off-by: Tanqiu Liu * Handle NPE in isRollupIndex (#855) * Handle NPE in isRollupIndex `metadata.index()` can return `null`, so handle that case by returning `false`. Signed-off-by: Bryce Lampe * unit test Signed-off-by: Bryce Lampe --------- Signed-off-by: Bryce Lampe Co-authored-by: bowenlan-amzn Signed-off-by: Tanqiu Liu * Fix core XcontentType refactor (#873) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * fix for max & min aggregations when no metric property exist (#870) Signed-off-by: Subhobrata Dey Signed-off-by: Tanqiu Liu * core refactor change (#884) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * update backport branch name (#885) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * core refactor change (#887) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * Fix breaking change by core refactor (#888) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * fix core breaking (#906) Signed-off-by: bowenlan-amzn Signed-off-by: Tanqiu Liu * Support copy alias in rollover (#907) * Support copy alias in rollover Signed-off-by: bowenlan-amzn * 2.10 Signed-off-by: bowenlan-amzn --------- Signed-off-by: bowenlan-amzn Signed-off-by: Tanqiu Liu * Set preference to _primary when searching control-center index (#911) * Set preference to _primary when searching control-center index Signed-off-by: gaobinlong * Use _primary_first instead Signed-off-by: gaobinlong --------- Signed-off-by: gaobinlong Signed-off-by: Tanqiu Liu * Add primary first preference to all search requests (#912) Signed-off-by: Tanqiu Liu * fix intelliJ IDEA gradle sync error (#916) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * make control center index as system index (#919) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * Updates demo certs used in integ tests (#921) Signed-off-by: Darshit Chanpura Signed-off-by: Tanqiu Liu * Added 2.10 release notes (#925) Signed-off-by: Ashish Agrawal Signed-off-by: Tanqiu Liu * Bump bwc version (#930) Signed-off-by: bowenlan-amzn Signed-off-by: Tanqiu Liu * fix integ tests; upgrade mappings versions Signed-off-by: Tanqiu Liu * Fix DCO Signed-off-by: Tanqiu Liu * Addressed pr comments; Add integ test case for re-execute the same transform action Signed-off-by: Tanqiu Liu * Addressed detekt error Signed-off-by: Tanqiu Liu * Added ISMTransform writeable test Signed-off-by: Tanqiu Liu * Addressed comments; Moved updateTransformStartTime to IndexManagementRestTestCase Signed-off-by: Tanqiu Liu --------- Signed-off-by: Tanqiu Liu Signed-off-by: bowenlan-amzn Signed-off-by: Ashish Agrawal Signed-off-by: AWSHurneyt Signed-off-by: Bryce Lampe Signed-off-by: Hailong Cui Signed-off-by: Subhobrata Dey Signed-off-by: gaobinlong Signed-off-by: Darshit Chanpura Co-authored-by: bowenlan-amzn Co-authored-by: Ashish Agrawal Co-authored-by: AWSHurneyt Co-authored-by: Bryce Lampe Co-authored-by: Hailong Cui Co-authored-by: Subhobrata Dey Co-authored-by: gaobinlong Co-authored-by: Darshit Chanpura <35282393+DarshitChanpura@users.noreply.github.com> Signed-off-by: Joshua Au * [Test] increase the wait time after transform job triggered (#999) Signed-off-by: bowenlan-amzn Signed-off-by: Joshua Au * Drafted 2.11 release notes. (#1004) * Drafted 2.11 release notes. Signed-off-by: AWSHurneyt * Drafted 2.11 release notes. Signed-off-by: AWSHurneyt --------- Signed-off-by: AWSHurneyt Signed-off-by: Joshua Au * Refactor change policy API and the policy in managed index to be non-null (#967) * Refactor the policy to be non null in managed index config Signed-off-by: bowenlan-amzn * Update Signed-off-by: bowenlan-amzn * fix bug Signed-off-by: bowenlan-amzn --------- Signed-off-by: bowenlan-amzn Signed-off-by: Joshua Au * Add more error notification at fail points (#1000) * Add more error notification at fail points Signed-off-by: bowenlan-amzn * Handle exception gracefully Signed-off-by: bowenlan-amzn * small fix Signed-off-by: bowenlan-amzn --------- Signed-off-by: bowenlan-amzn Signed-off-by: Joshua Au * fix the race condition in test reset action start time (#1007) Signed-off-by: bowenlan-amzn Signed-off-by: Joshua Au * Bump bwc version after 2.11 release (#1015) Signed-off-by: bowenlan-amzn Signed-off-by: Joshua Au * added type check for pipeline aggregator types in Transform initialization (#1014) Signed-off-by: n-dohrmann Co-authored-by: Joanne Wang <109310487+jowg-amazon@users.noreply.github.com> Signed-off-by: Joshua Au * Improve security plugin enabling check (#1017) Signed-off-by: Hailong Cui Signed-off-by: Joshua Au * Onboard jenkins prod docker images to github actions (#1025) * Onboard jenkins prod docker images to github actions Signed-off-by: Peter Zhu * Add more Signed-off-by: Peter Zhu * Add more Signed-off-by: Peter Zhu --------- Signed-off-by: Peter Zhu Signed-off-by: Joshua Au * Support switch aliases in shrink action. (#987) Signed-off-by: Oleg Kravchuk Co-authored-by: ikibo Signed-off-by: Joshua Au * Transform pipeline aggr test (#1027) * tester code: pipeline aggr. transform job Signed-off-by: n-dohrmann * made test case for pipeline aggregator in transform job Signed-off-by: n-dohrmann * removed unnec. test lines Signed-off-by: n-dohrmann * re-added method call on Transform obj Signed-off-by: n-dohrmann --------- Signed-off-by: n-dohrmann Signed-off-by: Joshua Au * Added unit test for switchAliases method. (#1035) * Added unit test for switchAliases method. Signed-off-by: Oleg Kravchuk * Added unit test for switchAliases method checking the flow when switchAliases is disabled. Signed-off-by: Oleg Kravchuk --------- Signed-off-by: Oleg Kravchuk Signed-off-by: Joshua Au * Interval schedule should take start time from the request, should not set it to the current time of request execution. (#1036) Signed-off-by: Oleg Kravchuk Signed-off-by: Joshua Au * Added minimum for search.cancel_after_time_interval setting for rollups (#1026) * Added minimum for search.cancel_after_time_interval setting for rollups Signed-off-by: Joshua Au * Added constant for cancel_after_time_interval for rollup search Signed-off-by: Joshua Au * Handled case of default value for cancel interval Signed-off-by: Joshua Au * Added comment explanation for default rollup cancel after time interval Signed-off-by: Joshua Au * Fixed github workflow checks Signed-off-by: Joshua Au --------- Signed-off-by: Joshua Au Co-authored-by: bowenlan-amzn Signed-off-by: Joshua Au * Update 2.11.1 release note (#1042) Signed-off-by: bowenlan-amzn Signed-off-by: Joshua Au * Interval schedule should take start time from the request, should not… (#1040) * Interval schedule should take start time from the request, should not set it to the current time of request execution. Signed-off-by: Oleg Kravchuk * Changed the "delayed continuous execution test" to be more expressive about what it should test. Signed-off-by: Oleg Kravchuk * fixed the NPE if schedule.startTime is NULL Signed-off-by: Oleg Kravchuk * fixed the NPE if schedule.startTime is NULL Signed-off-by: Oleg Kravchuk * fixed styling Signed-off-by: Oleg Kravchuk * - removed null checks from RollUp and Transforms - fixed comments in the "delayed execution" test Signed-off-by: Oleg Kravchuk --------- Signed-off-by: Oleg Kravchuk Signed-off-by: Joshua Au * Removed unused imports Signed-off-by: Joshua Au --------- Signed-off-by: Joshua Au Signed-off-by: Tanqiu Liu Signed-off-by: bowenlan-amzn Signed-off-by: Ashish Agrawal Signed-off-by: AWSHurneyt Signed-off-by: Bryce Lampe Signed-off-by: Hailong Cui Signed-off-by: Subhobrata Dey Signed-off-by: gaobinlong Signed-off-by: Darshit Chanpura Signed-off-by: n-dohrmann Signed-off-by: Peter Zhu Signed-off-by: Oleg Kravchuk Co-authored-by: Tanqiu Liu Co-authored-by: bowenlan-amzn Co-authored-by: Ashish Agrawal Co-authored-by: AWSHurneyt Co-authored-by: Bryce Lampe Co-authored-by: Hailong Cui Co-authored-by: Subhobrata Dey Co-authored-by: gaobinlong Co-authored-by: Darshit Chanpura <35282393+DarshitChanpura@users.noreply.github.com> Co-authored-by: n-dohrmann <87952011+n-dohrmann@users.noreply.github.com> Co-authored-by: Joanne Wang <109310487+jowg-amazon@users.noreply.github.com> Co-authored-by: Peter Zhu Co-authored-by: Oleg Kravchuk Co-authored-by: ikibo --- .../transform/TransformSearchService.kt | 24 ++++++++++++++++++- .../transform/settings/TransformSettings.kt | 1 + 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index 8709a6b98..58e050645 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -16,6 +16,7 @@ import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse +import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.service.ClusterService @@ -47,6 +48,7 @@ import org.opensearch.indexmanagement.util.IndexUtils.Companion.LUCENE_MAX_CLAUS import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.MINIMUM_CANCEL_AFTER_TIME_INTERVAL_SECONDS import org.opensearch.search.aggregations.Aggregation import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder @@ -78,11 +80,17 @@ class TransformSearchService( @Volatile private var backoffPolicy = BackoffPolicy.constantBackoff(TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS.get(settings), TRANSFORM_JOB_SEARCH_BACKOFF_COUNT.get(settings)) + @Volatile private var cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(settings) + init { clusterService.clusterSettings.addSettingsUpdateConsumer(TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS, TRANSFORM_JOB_SEARCH_BACKOFF_COUNT) { millis, count -> backoffPolicy = BackoffPolicy.constantBackoff(millis, count) } + + clusterService.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) { + cancelAfterTimeInterval = it + } } @Suppress("RethrowCaughtException") @@ -187,7 +195,11 @@ class TransformSearchService( val searchStart = Instant.now().epochSecond val searchResponse = backoffPolicy.retryTransformSearch(logger, transformContext.transformLockManager) { val pageSizeDecay = 2f.pow(retryAttempt++) - val searchRequestTimeoutInSeconds = transformContext.getMaxRequestTimeoutInSeconds() + + var searchRequestTimeoutInSeconds = transformContext.getMaxRequestTimeoutInSeconds() + if (searchRequestTimeoutInSeconds == null) { + searchRequestTimeoutInSeconds = getCancelAfterTimeIntervalSeconds(cancelAfterTimeInterval.seconds) + } client.suspendUntil { listener: ActionListener -> // If the previous request of the current transform job execution was successful, take the page size of previous request. @@ -224,6 +236,16 @@ class TransformSearchService( } } + private fun getCancelAfterTimeIntervalSeconds(givenIntervalSeconds: Long): Long { + // The default value for the cancelAfterTimeInterval is -1 and so, in this case + // we should ignore processing on the value + if (givenIntervalSeconds == -1L) { + return -1 + } + + return max(givenIntervalSeconds, MINIMUM_CANCEL_AFTER_TIME_INTERVAL_SECONDS) + } + companion object { const val failedSearchErrorMessage = "Failed to search data in source indices" const val modifiedBucketsErrorMessage = "Failed to get the modified buckets in source indices" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/settings/TransformSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/settings/TransformSettings.kt index 9ed375344..abfdc1b1f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/settings/TransformSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/settings/TransformSettings.kt @@ -14,6 +14,7 @@ class TransformSettings { companion object { const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3 const val DEFAULT_RENEW_LOCK_RETRY_DELAY = 1000L + const val MINIMUM_CANCEL_AFTER_TIME_INTERVAL_SECONDS = 600L val TRANSFORM_JOB_SEARCH_BACKOFF_COUNT: Setting = Setting.intSetting( "plugins.transform.internal.search.backoff_count",