Skip to content

Commit 7475cfd

Browse files
authored
Transform maxclauses fix (#477)
* transform maxClauses fix Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * added bucket log to track processed buckets Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * various renames/changes Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * fixed detekt issues Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * added comments to test Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * removed debug logging Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * empty commit to trigger checks Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * reduced pageSize to 1 in few ITs to avoid flaky tests; fixed bug where pagesProcessed was calculated incorrectly Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * reverted pagesProcessed change; fixed few ITs Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
1 parent 307ed55 commit 7475cfd

File tree

5 files changed

+215
-20
lines changed

5 files changed

+215
-20
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.transform
7+
8+
import java.math.BigInteger
9+
import java.security.MessageDigest
10+
11+
class TransformProcessedBucketLog {
12+
13+
companion object {
14+
const val MAX_SIZE = 100_000_000
15+
const val HEX_RADIX = 16
16+
}
17+
18+
private var processedBuckets: MutableSet<String> = HashSet()
19+
20+
fun addBuckets(buckets: List<Map<String, Any>>) {
21+
buckets.forEach {
22+
addBucket(it)
23+
}
24+
}
25+
26+
fun addBucket(bucket: Map<String, Any>) {
27+
if (processedBuckets.size >= MAX_SIZE) return
28+
processedBuckets.add(computeBucketHash(bucket))
29+
}
30+
31+
fun isProcessed(bucket: Map<String, Any>): Boolean {
32+
return processedBuckets.contains(computeBucketHash(bucket))
33+
}
34+
35+
fun isNotProcessed(bucket: Map<String, Any>) = !isProcessed(bucket)
36+
37+
fun computeBucketHash(bucket: Map<String, Any>): String {
38+
val md5Crypt = MessageDigest.getInstance("MD5")
39+
bucket.entries.sortedBy { it.key }.also {
40+
it.forEach { entry ->
41+
md5Crypt.update(
42+
if (entry.value == null) "null".toByteArray()
43+
else entry.value.toString().toByteArray()
44+
)
45+
}
46+
}
47+
return BigInteger(1, md5Crypt.digest()).toString(HEX_RADIX)
48+
}
49+
}

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,7 @@ object TransformRunner :
112112
TimeValue.timeValueMillis(TransformSettings.DEFAULT_RENEW_LOCK_RETRY_DELAY),
113113
TransformSettings.DEFAULT_RENEW_LOCK_RETRY_COUNT
114114
)
115-
116-
var attemptedToIndex = false
115+
val transformProcessedBucketLog = TransformProcessedBucketLog()
117116
var bucketsToTransform = BucketsToTransform(HashSet(), metadata)
118117
var lock = acquireLockForScheduledJob(transform, context, backoffPolicy)
119118
try {
@@ -134,7 +133,7 @@ object TransformRunner :
134133
currentMetadata = validatedMetadata
135134
return
136135
}
137-
if (transform.continuous && (bucketsToTransform.shardsToSearch == null || bucketsToTransform.currentShard != null)) {
136+
if (transform.continuous) {
138137
// If we have not populated the list of shards to search, do so now
139138
if (bucketsToTransform.shardsToSearch == null) {
140139
// Note the timestamp when we got the shard global checkpoints to the user may know what data is included
@@ -145,11 +144,29 @@ object TransformRunner :
145144
newGlobalCheckpoints
146145
)
147146
}
148-
bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform)
149-
currentMetadata = bucketsToTransform.metadata
147+
// If there are shards to search do it here
148+
if (bucketsToTransform.currentShard != null) {
149+
// Computes aggregation on modified documents for current shard to get modified buckets
150+
bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform).also {
151+
currentMetadata = it.metadata
152+
}
153+
// Filter out already processed buckets
154+
val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter {
155+
transformProcessedBucketLog.isNotProcessed(it)
156+
}.toMutableSet()
157+
// Recompute modified buckets and update them in targetIndex
158+
currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets)
159+
// Add processed buckets to 'processed set' so that we don't try to reprocess them again
160+
transformProcessedBucketLog.addBuckets(modifiedBuckets.toList())
161+
// Update TransformMetadata
162+
currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true)
163+
bucketsToTransform = bucketsToTransform.copy(metadata = currentMetadata)
164+
}
150165
} else {
151-
currentMetadata = executeTransformIteration(transform, currentMetadata, bucketsToTransform.modifiedBuckets)
152-
attemptedToIndex = true
166+
// Computes buckets from source index and stores them in targetIndex as transform docs
167+
currentMetadata = computeBucketsIteration(transform, currentMetadata)
168+
// Update TransformMetadata
169+
currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true)
153170
}
154171
// we attempt to renew lock for every loop of transform
155172
val renewedLock = renewLockForScheduledJob(context, lock, backoffPolicy)
@@ -159,7 +176,7 @@ object TransformRunner :
159176
lock = renewedLock
160177
}
161178
}
162-
} while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null || !attemptedToIndex)
179+
} while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null)
163180
} catch (e: Exception) {
164181
logger.error("Failed to execute the transform job [${transform.id}] because of exception [${e.localizedMessage}]", e)
165182
currentMetadata = currentMetadata.copy(
@@ -189,6 +206,8 @@ object TransformRunner :
189206
private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform): BucketsToTransform {
190207
var currentBucketsToTransform = bucketsToTransform
191208
val currentShard = bucketsToTransform.currentShard
209+
// Clear modified buckets from previous iteration
210+
currentBucketsToTransform.modifiedBuckets.clear()
192211

193212
if (currentShard != null) {
194213
val shardLevelModifiedBuckets = withTransformSecurityContext(transform) {
@@ -236,32 +255,59 @@ object TransformRunner :
236255
* the range query will not precisely specify the modified buckets. As a result, we increase the range for the query and then filter out
237256
* the unintended buckets as part of the composite search step.
238257
*/
239-
private suspend fun executeTransformIteration(
258+
private suspend fun computeBucketsIteration(
259+
transform: Transform,
260+
metadata: TransformMetadata,
261+
): TransformMetadata {
262+
263+
val transformSearchResult = withTransformSecurityContext(transform) {
264+
transformSearchService.executeCompositeSearch(
265+
transform,
266+
metadata.afterKey,
267+
null
268+
)
269+
}
270+
val indexTimeInMillis = withTransformSecurityContext(transform) {
271+
transformIndexer.index(transformSearchResult.docsToIndex)
272+
}
273+
val afterKey = transformSearchResult.afterKey
274+
val stats = transformSearchResult.stats
275+
val updatedStats = stats.copy(
276+
pagesProcessed = stats.pagesProcessed,
277+
indexTimeInMillis = stats.indexTimeInMillis + indexTimeInMillis,
278+
documentsIndexed = transformSearchResult.docsToIndex.size.toLong()
279+
)
280+
return metadata.mergeStats(updatedStats).copy(
281+
afterKey = afterKey,
282+
lastUpdatedAt = Instant.now(),
283+
status = if (afterKey == null) TransformMetadata.Status.FINISHED else TransformMetadata.Status.STARTED
284+
)
285+
}
286+
287+
private suspend fun recomputeModifiedBuckets(
240288
transform: Transform,
241289
metadata: TransformMetadata,
242290
modifiedBuckets: MutableSet<Map<String, Any>>
243291
): TransformMetadata {
244-
val updatedMetadata = if (!transform.continuous || modifiedBuckets.isNotEmpty()) {
292+
val updatedMetadata = if (modifiedBuckets.isNotEmpty()) {
245293
val transformSearchResult = withTransformSecurityContext(transform) {
246-
transformSearchService.executeCompositeSearch(transform, metadata.afterKey, if (transform.continuous) modifiedBuckets else null)
294+
transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets)
247295
}
248296
val indexTimeInMillis = withTransformSecurityContext(transform) {
249297
transformIndexer.index(transformSearchResult.docsToIndex)
250298
}
251-
val afterKey = transformSearchResult.afterKey
252299
val stats = transformSearchResult.stats
253300
val updatedStats = stats.copy(
254301
pagesProcessed = if (transform.continuous) 0 else stats.pagesProcessed,
255302
indexTimeInMillis = stats.indexTimeInMillis + indexTimeInMillis,
256303
documentsIndexed = transformSearchResult.docsToIndex.size.toLong()
257304
)
258305
metadata.mergeStats(updatedStats).copy(
259-
afterKey = afterKey,
260306
lastUpdatedAt = Instant.now(),
261-
status = if (afterKey == null && !transform.continuous) TransformMetadata.Status.FINISHED else TransformMetadata.Status.STARTED
307+
status = TransformMetadata.Status.STARTED
262308
)
263309
} else metadata.copy(lastUpdatedAt = Instant.now(), status = TransformMetadata.Status.STARTED)
264-
return transformMetadataService.writeMetadata(updatedMetadata, true)
310+
return updatedMetadata
265311
}
266312

267313
private suspend fun <T> withTransformSecurityContext(transform: Transform, block: suspend CoroutineScope.() -> T): T {

src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.transform.model.TransformSearchResult
4040
import org.opensearch.indexmanagement.transform.model.TransformStats
4141
import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT
4242
import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS
43+
import org.opensearch.indexmanagement.util.IndexUtils.Companion.LUCENE_MAX_CLAUSES
4344
import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL
4445
import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize
4546
import org.opensearch.rest.RestStatus
@@ -112,10 +113,11 @@ class TransformSearchService(
112113
suspend fun getShardLevelModifiedBuckets(transform: Transform, afterKey: Map<String, Any>?, currentShard: ShardNewDocuments): BucketSearchResult {
113114
try {
114115
var retryAttempt = 0
116+
var pageSize = calculateMaxPageSize(transform)
115117
val searchResponse = backoffPolicy.retry(logger) {
116118
val pageSizeDecay = 2f.pow(retryAttempt++)
117119
client.suspendUntil { listener: ActionListener<SearchResponse> ->
118-
val pageSize = max(1, transform.pageSize.div(pageSizeDecay.toInt()))
120+
pageSize = max(1, pageSize.div(pageSizeDecay.toInt()))
119121
if (retryAttempt > 1) {
120122
logger.debug(
121123
"Attempt [${retryAttempt - 1}] to get modified buckets for transform [${transform.id}]. Attempting " +
@@ -139,19 +141,33 @@ class TransformSearchService(
139141
}
140142
}
141143

144+
/**
145+
* Apache Lucene has maxClauses limit which we could trip during recomputing of modified buckets(continuous transform)
146+
* due to trying to match too many bucket fields. To avoid this, we control how many buckets we recompute at a time(pageSize)
147+
*/
148+
private fun calculateMaxPageSize(transform: Transform): Int {
149+
return minOf(transform.pageSize, LUCENE_MAX_CLAUSES / (transform.groups.size + 1))
150+
}
151+
142152
@Suppress("RethrowCaughtException")
143153
suspend fun executeCompositeSearch(
144154
transform: Transform,
145155
afterKey: Map<String, Any>? = null,
146156
modifiedBuckets: MutableSet<Map<String, Any>>? = null
147157
): TransformSearchResult {
148158
try {
159+
var pageSize: Int =
160+
if (modifiedBuckets.isNullOrEmpty())
161+
transform.pageSize
162+
else
163+
modifiedBuckets.size
164+
149165
var retryAttempt = 0
150166
val searchResponse = backoffPolicy.retry(logger) {
151167
// TODO: Should we store the value of the past successful page size (?)
152168
val pageSizeDecay = 2f.pow(retryAttempt++)
153169
client.suspendUntil { listener: ActionListener<SearchResponse> ->
154-
val pageSize = max(1, transform.pageSize.div(pageSizeDecay.toInt()))
170+
pageSize = max(1, pageSize.div(pageSizeDecay.toInt()))
155171
if (retryAttempt > 1) {
156172
logger.debug(
157173
"Attempt [${retryAttempt - 1}] of composite search failed for transform [${transform.id}]. Attempting " +

src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class IndexUtils {
3333
const val SCHEMA_VERSION = "schema_version"
3434
const val DEFAULT_SCHEMA_VERSION = 1L
3535
const val ODFE_MAGIC_NULL = "#ODFE-MAGIC-NULL-MAGIC-ODFE#"
36+
const val LUCENE_MAX_CLAUSES = 1024
3637
private const val BYTE_ARRAY_SIZE = 16
3738
private const val DOCUMENT_ID_SEED = 72390L
3839

src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ package org.opensearch.indexmanagement.transform
77

88
import org.apache.http.entity.ContentType
99
import org.apache.http.entity.StringEntity
10+
import org.opensearch.client.Request
11+
import org.opensearch.client.RequestOptions
1012
import org.opensearch.common.settings.Settings
1113
import org.opensearch.index.query.TermQueryBuilder
1214
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
@@ -44,7 +46,7 @@ class TransformRunnerIT : TransformRestTestCase() {
4446
sourceIndex = "transform-source-index",
4547
targetIndex = "transform-target-index",
4648
roles = emptyList(),
47-
pageSize = 10,
49+
pageSize = 1,
4850
groups = listOf(
4951
Terms(sourceField = "store_and_fwd_flag", targetField = "flag")
5052
)
@@ -62,7 +64,7 @@ class TransformRunnerIT : TransformRestTestCase() {
6264
transformMetadata
6365
}
6466

65-
assertEquals("More than expected pages processed", 2L, metadata.stats.pagesProcessed)
67+
assertEquals("More than expected pages processed", 3L, metadata.stats.pagesProcessed)
6668
assertEquals("More than expected documents indexed", 2L, metadata.stats.documentsIndexed)
6769
assertEquals("More than expected documents processed", 5000L, metadata.stats.documentsProcessed)
6870
assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis > 0)
@@ -84,7 +86,7 @@ class TransformRunnerIT : TransformRestTestCase() {
8486
sourceIndex = "transform-source-index",
8587
targetIndex = "transform-target-index",
8688
roles = emptyList(),
87-
pageSize = 10,
89+
pageSize = 1,
8890
groups = listOf(
8991
Terms(sourceField = "store_and_fwd_flag", targetField = "flag")
9092
),
@@ -950,6 +952,70 @@ class TransformRunnerIT : TransformRestTestCase() {
950952
}
951953
}
952954

955+
fun `test continuous transform with a lot of buckets`() {
956+
957+
// Create index with high cardinality fields
958+
val sourceIndex = "index_with_lots_of_buckets"
959+
960+
val requestBody: StringBuilder = StringBuilder(100000)
961+
for (i in 1..2000) {
962+
val docPayload: String = """
963+
{
964+
"id1": "$i",
965+
"id2": "${i + 1}"
966+
}
967+
""".trimIndent().replace(Regex("[\n\r\\s]"), "")
968+
969+
requestBody.append("{\"create\":{}}\n").append(docPayload).append('\n')
970+
}
971+
972+
createIndexAndBulkInsert(sourceIndex, Settings.EMPTY, null, null, requestBody.toString())
973+
// Source index will have total of 2000 buckets
974+
val transform = Transform(
975+
id = "transform_index_with_lots_of_buckets",
976+
schemaVersion = 1L,
977+
enabled = true,
978+
enabledAt = Instant.now(),
979+
updatedAt = Instant.now(),
980+
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
981+
description = "test transform",
982+
metadataId = null,
983+
sourceIndex = "index_with_lots_of_buckets",
984+
targetIndex = "index_with_lots_of_buckets_transformed",
985+
roles = emptyList(),
986+
pageSize = 1000,
987+
groups = listOf(
988+
Terms(sourceField = "id1.keyword", targetField = "id1"),
989+
Terms(sourceField = "id2.keyword", targetField = "id2")
990+
),
991+
continuous = true
992+
).let { createTransform(it, it.id) }
993+
994+
updateTransformStartTime(transform)
995+
996+
waitFor { assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) }
997+
998+
val firstIterationMetadata = waitFor {
999+
val job = getTransform(transformId = transform.id)
1000+
assertNotNull("Transform job doesn't have metadata set", job.metadataId)
1001+
val transformMetadata = getTransformMetadata(job.metadataId!!)
1002+
assertEquals("Transform did not complete iteration or had incorrect number of documents processed", 2000, transformMetadata.stats.documentsProcessed)
1003+
assertEquals("Transform did not complete iteration", null, transformMetadata.afterKey)
1004+
assertNotNull("Continuous stats were not updated", transformMetadata.continuousStats)
1005+
assertNotNull("Continuous stats were set, but lastTimestamp was not", transformMetadata.continuousStats!!.lastTimestamp)
1006+
transformMetadata
1007+
}
1008+
1009+
assertEquals("Not the expected transform status", TransformMetadata.Status.STARTED, firstIterationMetadata.status)
1010+
assertEquals("Not the expected pages processed", 7, firstIterationMetadata.stats.pagesProcessed)
1011+
assertEquals("Not the expected documents indexed", 2000L, firstIterationMetadata.stats.documentsIndexed)
1012+
assertEquals("Not the expected documents processed", 2000L, firstIterationMetadata.stats.documentsProcessed)
1013+
assertTrue("Doesn't capture indexed time", firstIterationMetadata.stats.indexTimeInMillis > 0)
1014+
assertTrue("Didn't capture search time", firstIterationMetadata.stats.searchTimeInMillis > 0)
1015+
1016+
disableTransform(transform.id)
1017+
}
1018+
9531019
private fun getStrictMappings(): String {
9541020
return """
9551021
"dynamic": "strict",
@@ -967,4 +1033,21 @@ class TransformRunnerIT : TransformRestTestCase() {
9671033
assertIndexExists(indexName)
9681034
}
9691035
}
1036+
1037+
private fun createIndexAndBulkInsert(name: String, settings: Settings?, mapping: String?, aliases: String?, bulkData: String) {
1038+
1039+
if (settings != null || mapping != null || aliases != null) {
1040+
createIndex(name, settings, mapping, aliases)
1041+
}
1042+
1043+
val request = Request("POST", "/$name/_bulk/?refresh=true")
1044+
request.setJsonEntity(bulkData)
1045+
request.options = RequestOptions.DEFAULT.toBuilder().addHeader("content-type", "application/x-ndjson").build()
1046+
var res = client().performRequest(request)
1047+
assertEquals(RestStatus.OK, res.restStatus())
1048+
1049+
val refreshRequest = Request("POST", "/$name/_refresh")
1050+
res = client().performRequest(refreshRequest)
1051+
assertEquals(RestStatus.OK, res.restStatus())
1052+
}
9701053
}

0 commit comments

Comments
 (0)