Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.support.WriteRequest
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.index.shard.ShardId
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
Expand All @@ -31,10 +29,8 @@ import org.opensearch.indexmanagement.transform.model.ContinuousTransformStats
import org.opensearch.indexmanagement.transform.model.Transform
import org.opensearch.indexmanagement.transform.model.TransformMetadata
import org.opensearch.indexmanagement.transform.model.initializeShardsToSearch
import org.opensearch.indexmanagement.transform.settings.TransformSettings
import org.opensearch.indexmanagement.util.acquireLockForScheduledJob
import org.opensearch.indexmanagement.util.releaseLockForScheduledJob
import org.opensearch.indexmanagement.util.renewLockForScheduledJob
import org.opensearch.indexmanagement.transform.util.TransformContext
import org.opensearch.indexmanagement.transform.util.TransformLockManager
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
Expand Down Expand Up @@ -108,19 +104,19 @@ object TransformRunner :
var newGlobalCheckpoints: Map<ShardId, Long>? = null
var newGlobalCheckpointTime: Instant? = null
var currentMetadata = metadata
val backoffPolicy = BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(TransformSettings.DEFAULT_RENEW_LOCK_RETRY_DELAY),
TransformSettings.DEFAULT_RENEW_LOCK_RETRY_COUNT
)

val transformProcessedBucketLog = TransformProcessedBucketLog()
var bucketsToTransform = BucketsToTransform(HashSet(), metadata)
var lock = acquireLockForScheduledJob(transform, context, backoffPolicy)

val transformContext = TransformContext(TransformLockManager(transform, context))
// Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform
val transformLockManager = transformContext.transformLockManager
transformLockManager.acquireLockForScheduledJob()
try {
do {
when {
lock == null -> {
transformLockManager.lock == null -> {
logger.warn("Cannot acquire lock for transform job ${transform.id}")
// If we fail to get the lock we won't fail the job, instead we return early
return
}
listOf(TransformMetadata.Status.STOPPED, TransformMetadata.Status.FINISHED).contains(metadata.status) -> {
Expand All @@ -147,15 +143,15 @@ object TransformRunner :
// If there are shards to search do it here
if (bucketsToTransform.currentShard != null) {
// Computes aggregation on modified documents for current shard to get modified buckets
bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform).also {
bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform, transformContext).also {
currentMetadata = it.metadata
}
// Filter out already processed buckets
val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter {
transformProcessedBucketLog.isNotProcessed(it)
}.toMutableSet()
// Recompute modified buckets and update them in targetIndex
currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets)
currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets, transformContext)
// Add processed buckets to 'processed set' so that we don't try to reprocess them again
transformProcessedBucketLog.addBuckets(modifiedBuckets.toList())
// Update TransformMetadata
Expand All @@ -164,16 +160,12 @@ object TransformRunner :
}
} else {
// Computes buckets from source index and stores them in targetIndex as transform docs
currentMetadata = computeBucketsIteration(transform, currentMetadata)
currentMetadata = computeBucketsIteration(transform, currentMetadata, transformContext)
// Update TransformMetadata
currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true)
}
// we attempt to renew lock for every loop of transform
val renewedLock = renewLockForScheduledJob(context, lock, backoffPolicy)
if (renewedLock == null) {
releaseLockForScheduledJob(context, lock)
}
lock = renewedLock
transformLockManager.renewLockForScheduledJob()
}
}
} while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null)
Expand All @@ -185,7 +177,7 @@ object TransformRunner :
failureReason = e.localizedMessage
)
} finally {
lock?.let {
transformLockManager.lock?.let {
// Update the global checkpoints only after execution finishes successfully
if (transform.continuous && currentMetadata.status != TransformMetadata.Status.FAILED) {
currentMetadata = currentMetadata.copy(
Expand All @@ -198,20 +190,29 @@ object TransformRunner :
logger.info("Disabling the transform job ${transform.id}")
updateTransform(transform.copy(enabled = false, enabledAt = null))
}
releaseLockForScheduledJob(context, it)
transformLockManager.releaseLockForScheduledJob()
}
}
}

private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform): BucketsToTransform {
private suspend fun getBucketsToTransformIteration(
transform: Transform,
bucketsToTransform: BucketsToTransform,
transformContext: TransformContext
): BucketsToTransform {
var currentBucketsToTransform = bucketsToTransform
val currentShard = bucketsToTransform.currentShard
// Clear modified buckets from previous iteration
currentBucketsToTransform.modifiedBuckets.clear()

if (currentShard != null) {
val shardLevelModifiedBuckets = withTransformSecurityContext(transform) {
transformSearchService.getShardLevelModifiedBuckets(transform, currentBucketsToTransform.metadata.afterKey, currentShard)
transformSearchService.getShardLevelModifiedBuckets(
transform,
currentBucketsToTransform.metadata.afterKey,
currentShard,
transformContext
)
}
currentBucketsToTransform.modifiedBuckets.addAll(shardLevelModifiedBuckets.modifiedBuckets)
val mergedSearchTime = currentBucketsToTransform.metadata.stats.searchTimeInMillis +
Expand Down Expand Up @@ -258,13 +259,15 @@ object TransformRunner :
private suspend fun computeBucketsIteration(
transform: Transform,
metadata: TransformMetadata,
transformContext: TransformContext
): TransformMetadata {

val transformSearchResult = withTransformSecurityContext(transform) {
transformSearchService.executeCompositeSearch(
transform,
metadata.afterKey,
null
null,
transformContext
)
}
val indexTimeInMillis = withTransformSecurityContext(transform) {
Expand All @@ -287,11 +290,12 @@ object TransformRunner :
private suspend fun recomputeModifiedBuckets(
transform: Transform,
metadata: TransformMetadata,
modifiedBuckets: MutableSet<Map<String, Any>>
modifiedBuckets: MutableSet<Map<String, Any>>,
transformContext: TransformContext
): TransformMetadata {
val updatedMetadata = if (modifiedBuckets.isNotEmpty()) {
val transformSearchResult = withTransformSecurityContext(transform) {
transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets)
transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets, transformContext)
}
val indexTimeInMillis = withTransformSecurityContext(transform) {
transformIndexer.index(transformSearchResult.docsToIndex)
Expand Down
Loading