@@ -11,13 +11,11 @@ import kotlinx.coroutines.Dispatchers
1111import kotlinx.coroutines.SupervisorJob
1212import kotlinx.coroutines.launch
1313import org.apache.logging.log4j.LogManager
14- import org.opensearch.action.bulk.BackoffPolicy
1514import org.opensearch.action.support.WriteRequest
1615import org.opensearch.client.Client
1716import org.opensearch.cluster.metadata.IndexNameExpressionResolver
1817import org.opensearch.cluster.service.ClusterService
1918import org.opensearch.common.settings.Settings
20- import org.opensearch.common.unit.TimeValue
2119import org.opensearch.common.xcontent.NamedXContentRegistry
2220import org.opensearch.index.shard.ShardId
2321import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
@@ -31,10 +29,8 @@ import org.opensearch.indexmanagement.transform.model.ContinuousTransformStats
3129import org.opensearch.indexmanagement.transform.model.Transform
3230import org.opensearch.indexmanagement.transform.model.TransformMetadata
3331import org.opensearch.indexmanagement.transform.model.initializeShardsToSearch
34- import org.opensearch.indexmanagement.transform.settings.TransformSettings
35- import org.opensearch.indexmanagement.util.acquireLockForScheduledJob
36- import org.opensearch.indexmanagement.util.releaseLockForScheduledJob
37- import org.opensearch.indexmanagement.util.renewLockForScheduledJob
32+ import org.opensearch.indexmanagement.transform.util.TransformContext
33+ import org.opensearch.indexmanagement.transform.util.TransformLockManager
3834import org.opensearch.jobscheduler.spi.JobExecutionContext
3935import org.opensearch.jobscheduler.spi.ScheduledJobParameter
4036import org.opensearch.jobscheduler.spi.ScheduledJobRunner
@@ -108,19 +104,19 @@ object TransformRunner :
108104 var newGlobalCheckpoints: Map <ShardId , Long >? = null
109105 var newGlobalCheckpointTime: Instant ? = null
110106 var currentMetadata = metadata
111- val backoffPolicy = BackoffPolicy .exponentialBackoff(
112- TimeValue .timeValueMillis(TransformSettings .DEFAULT_RENEW_LOCK_RETRY_DELAY ),
113- TransformSettings .DEFAULT_RENEW_LOCK_RETRY_COUNT
114- )
107+
115108 val transformProcessedBucketLog = TransformProcessedBucketLog ()
116109 var bucketsToTransform = BucketsToTransform (HashSet (), metadata)
117- var lock = acquireLockForScheduledJob(transform, context, backoffPolicy)
110+
111+ val transformContext = TransformContext (TransformLockManager (transform, context))
112+ // Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform
113+ val transformLockManager = transformContext.transformLockManager
114+ transformLockManager.acquireLockForScheduledJob()
118115 try {
119116 do {
120117 when {
121- lock == null -> {
118+ transformLockManager. lock == null -> {
122119 logger.warn(" Cannot acquire lock for transform job ${transform.id} " )
123- // If we fail to get the lock we won't fail the job, instead we return early
124120 return
125121 }
126122 listOf (TransformMetadata .Status .STOPPED , TransformMetadata .Status .FINISHED ).contains(metadata.status) -> {
@@ -147,15 +143,15 @@ object TransformRunner :
147143 // If there are shards to search do it here
148144 if (bucketsToTransform.currentShard != null ) {
149145 // Computes aggregation on modified documents for current shard to get modified buckets
150- bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform).also {
146+ bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform, transformContext ).also {
151147 currentMetadata = it.metadata
152148 }
153149 // Filter out already processed buckets
154150 val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter {
155151 transformProcessedBucketLog.isNotProcessed(it)
156152 }.toMutableSet()
157153 // Recompute modified buckets and update them in targetIndex
158- currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets)
154+ currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets, transformContext )
159155 // Add processed buckets to 'processed set' so that we don't try to reprocess them again
160156 transformProcessedBucketLog.addBuckets(modifiedBuckets.toList())
161157 // Update TransformMetadata
@@ -164,16 +160,12 @@ object TransformRunner :
164160 }
165161 } else {
166162 // Computes buckets from source index and stores them in targetIndex as transform docs
167- currentMetadata = computeBucketsIteration(transform, currentMetadata)
163+ currentMetadata = computeBucketsIteration(transform, currentMetadata, transformContext )
168164 // Update TransformMetadata
169165 currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true )
170166 }
171167 // we attempt to renew lock for every loop of transform
172- val renewedLock = renewLockForScheduledJob(context, lock, backoffPolicy)
173- if (renewedLock == null ) {
174- releaseLockForScheduledJob(context, lock)
175- }
176- lock = renewedLock
168+ transformLockManager.renewLockForScheduledJob()
177169 }
178170 }
179171 } while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null )
@@ -185,7 +177,7 @@ object TransformRunner :
185177 failureReason = e.localizedMessage
186178 )
187179 } finally {
188- lock?.let {
180+ transformLockManager. lock?.let {
189181 // Update the global checkpoints only after execution finishes successfully
190182 if (transform.continuous && currentMetadata.status != TransformMetadata .Status .FAILED ) {
191183 currentMetadata = currentMetadata.copy(
@@ -198,20 +190,29 @@ object TransformRunner :
198190 logger.info(" Disabling the transform job ${transform.id} " )
199191 updateTransform(transform.copy(enabled = false , enabledAt = null ))
200192 }
201- releaseLockForScheduledJob(context, it )
193+ transformLockManager. releaseLockForScheduledJob()
202194 }
203195 }
204196 }
205197
206- private suspend fun getBucketsToTransformIteration (transform : Transform , bucketsToTransform : BucketsToTransform ): BucketsToTransform {
198+ private suspend fun getBucketsToTransformIteration (
199+ transform : Transform ,
200+ bucketsToTransform : BucketsToTransform ,
201+ transformContext : TransformContext
202+ ): BucketsToTransform {
207203 var currentBucketsToTransform = bucketsToTransform
208204 val currentShard = bucketsToTransform.currentShard
209205 // Clear modified buckets from previous iteration
210206 currentBucketsToTransform.modifiedBuckets.clear()
211207
212208 if (currentShard != null ) {
213209 val shardLevelModifiedBuckets = withTransformSecurityContext(transform) {
214- transformSearchService.getShardLevelModifiedBuckets(transform, currentBucketsToTransform.metadata.afterKey, currentShard)
210+ transformSearchService.getShardLevelModifiedBuckets(
211+ transform,
212+ currentBucketsToTransform.metadata.afterKey,
213+ currentShard,
214+ transformContext
215+ )
215216 }
216217 currentBucketsToTransform.modifiedBuckets.addAll(shardLevelModifiedBuckets.modifiedBuckets)
217218 val mergedSearchTime = currentBucketsToTransform.metadata.stats.searchTimeInMillis +
@@ -258,13 +259,15 @@ object TransformRunner :
258259 private suspend fun computeBucketsIteration (
259260 transform : Transform ,
260261 metadata : TransformMetadata ,
262+ transformContext : TransformContext
261263 ): TransformMetadata {
262264
263265 val transformSearchResult = withTransformSecurityContext(transform) {
264266 transformSearchService.executeCompositeSearch(
265267 transform,
266268 metadata.afterKey,
267- null
269+ null ,
270+ transformContext
268271 )
269272 }
270273 val indexTimeInMillis = withTransformSecurityContext(transform) {
@@ -287,11 +290,12 @@ object TransformRunner :
287290 private suspend fun recomputeModifiedBuckets (
288291 transform : Transform ,
289292 metadata : TransformMetadata ,
290- modifiedBuckets : MutableSet <Map <String , Any >>
293+ modifiedBuckets : MutableSet <Map <String , Any >>,
294+ transformContext : TransformContext
291295 ): TransformMetadata {
292296 val updatedMetadata = if (modifiedBuckets.isNotEmpty()) {
293297 val transformSearchResult = withTransformSecurityContext(transform) {
294- transformSearchService.executeCompositeSearch(transform, null , modifiedBuckets)
298+ transformSearchService.executeCompositeSearch(transform, null , modifiedBuckets, transformContext )
295299 }
296300 val indexTimeInMillis = withTransformSecurityContext(transform) {
297301 transformIndexer.index(transformSearchResult.docsToIndex)
0 commit comments