@@ -112,8 +112,7 @@ object TransformRunner :
112112 TimeValue .timeValueMillis(TransformSettings .DEFAULT_RENEW_LOCK_RETRY_DELAY ),
113113 TransformSettings .DEFAULT_RENEW_LOCK_RETRY_COUNT
114114 )
115-
116- var attemptedToIndex = false
115+ val transformProcessedBucketLog = TransformProcessedBucketLog ()
117116 var bucketsToTransform = BucketsToTransform (HashSet (), metadata)
118117 var lock = acquireLockForScheduledJob(transform, context, backoffPolicy)
119118 try {
@@ -134,7 +133,7 @@ object TransformRunner :
134133 currentMetadata = validatedMetadata
135134 return
136135 }
137- if (transform.continuous && (bucketsToTransform.shardsToSearch == null || bucketsToTransform.currentShard != null ) ) {
136+ if (transform.continuous) {
138137 // If we have not populated the list of shards to search, do so now
139138 if (bucketsToTransform.shardsToSearch == null ) {
140139 // Note the timestamp when we got the shard global checkpoints to the user may know what data is included
@@ -145,11 +144,29 @@ object TransformRunner :
145144 newGlobalCheckpoints
146145 )
147146 }
148- bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform)
149- currentMetadata = bucketsToTransform.metadata
147+ // If there are shards to search do it here
148+ if (bucketsToTransform.currentShard != null ) {
149+ // Computes aggregation on modified documents for current shard to get modified buckets
150+ bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform).also {
151+ currentMetadata = it.metadata
152+ }
153+ // Filter out already processed buckets
154+ val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter {
155+ transformProcessedBucketLog.isNotProcessed(it)
156+ }.toMutableSet()
157+ // Recompute modified buckets and update them in targetIndex
158+ currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets)
159+ // Add processed buckets to 'processed set' so that we don't try to reprocess them again
160+ transformProcessedBucketLog.addBuckets(modifiedBuckets.toList())
161+ // Update TransformMetadata
162+ currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true )
163+ bucketsToTransform = bucketsToTransform.copy(metadata = currentMetadata)
164+ }
150165 } else {
151- currentMetadata = executeTransformIteration(transform, currentMetadata, bucketsToTransform.modifiedBuckets)
152- attemptedToIndex = true
166+ // Computes buckets from source index and stores them in targetIndex as transform docs
167+ currentMetadata = computeBucketsIteration(transform, currentMetadata)
168+ // Update TransformMetadata
169+ currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true )
153170 }
154171 // we attempt to renew lock for every loop of transform
155172 val renewedLock = renewLockForScheduledJob(context, lock, backoffPolicy)
@@ -159,7 +176,7 @@ object TransformRunner :
159176 lock = renewedLock
160177 }
161178 }
162- } while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null || ! attemptedToIndex )
179+ } while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null )
163180 } catch (e: Exception ) {
164181 logger.error(" Failed to execute the transform job [${transform.id} ] because of exception [${e.localizedMessage} ]" , e)
165182 currentMetadata = currentMetadata.copy(
@@ -189,6 +206,8 @@ object TransformRunner :
189206 private suspend fun getBucketsToTransformIteration (transform : Transform , bucketsToTransform : BucketsToTransform ): BucketsToTransform {
190207 var currentBucketsToTransform = bucketsToTransform
191208 val currentShard = bucketsToTransform.currentShard
209+ // Clear modified buckets from previous iteration
210+ currentBucketsToTransform.modifiedBuckets.clear()
192211
193212 if (currentShard != null ) {
194213 val shardLevelModifiedBuckets = withTransformSecurityContext(transform) {
@@ -236,32 +255,59 @@ object TransformRunner :
236255 * the range query will not precisely specify the modified buckets. As a result, we increase the range for the query and then filter out
237256 * the unintended buckets as part of the composite search step.
238257 */
239- private suspend fun executeTransformIteration (
258+ private suspend fun computeBucketsIteration (
259+ transform : Transform ,
260+ metadata : TransformMetadata ,
261+ ): TransformMetadata {
262+
263+ val transformSearchResult = withTransformSecurityContext(transform) {
264+ transformSearchService.executeCompositeSearch(
265+ transform,
266+ metadata.afterKey,
267+ null
268+ )
269+ }
270+ val indexTimeInMillis = withTransformSecurityContext(transform) {
271+ transformIndexer.index(transformSearchResult.docsToIndex)
272+ }
273+ val afterKey = transformSearchResult.afterKey
274+ val stats = transformSearchResult.stats
275+ val updatedStats = stats.copy(
276+ pagesProcessed = stats.pagesProcessed,
277+ indexTimeInMillis = stats.indexTimeInMillis + indexTimeInMillis,
278+ documentsIndexed = transformSearchResult.docsToIndex.size.toLong()
279+ )
280+ return metadata.mergeStats(updatedStats).copy(
281+ afterKey = afterKey,
282+ lastUpdatedAt = Instant .now(),
283+ status = if (afterKey == null ) TransformMetadata .Status .FINISHED else TransformMetadata .Status .STARTED
284+ )
285+ }
286+
287+ private suspend fun recomputeModifiedBuckets (
240288 transform : Transform ,
241289 metadata : TransformMetadata ,
242290 modifiedBuckets : MutableSet <Map <String , Any >>
243291 ): TransformMetadata {
244- val updatedMetadata = if (! transform.continuous || modifiedBuckets.isNotEmpty()) {
292+ val updatedMetadata = if (modifiedBuckets.isNotEmpty()) {
245293 val transformSearchResult = withTransformSecurityContext(transform) {
246- transformSearchService.executeCompositeSearch(transform, metadata.afterKey, if (transform.continuous) modifiedBuckets else null )
294+ transformSearchService.executeCompositeSearch(transform, null , modifiedBuckets)
247295 }
248296 val indexTimeInMillis = withTransformSecurityContext(transform) {
249297 transformIndexer.index(transformSearchResult.docsToIndex)
250298 }
251- val afterKey = transformSearchResult.afterKey
252299 val stats = transformSearchResult.stats
253300 val updatedStats = stats.copy(
254301 pagesProcessed = if (transform.continuous) 0 else stats.pagesProcessed,
255302 indexTimeInMillis = stats.indexTimeInMillis + indexTimeInMillis,
256303 documentsIndexed = transformSearchResult.docsToIndex.size.toLong()
257304 )
258305 metadata.mergeStats(updatedStats).copy(
259- afterKey = afterKey,
260306 lastUpdatedAt = Instant .now(),
261- status = if (afterKey == null && ! transform.continuous) TransformMetadata . Status . FINISHED else TransformMetadata .Status .STARTED
307+ status = TransformMetadata .Status .STARTED
262308 )
263309 } else metadata.copy(lastUpdatedAt = Instant .now(), status = TransformMetadata .Status .STARTED )
264- return transformMetadataService.writeMetadata( updatedMetadata, true )
310+ return updatedMetadata
265311 }
266312
267313 private suspend fun <T > withTransformSecurityContext (transform : Transform , block : suspend CoroutineScope .() -> T ): T {
0 commit comments