Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinat
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import org.opensearch.indexmanagement.indexstatemanagement.MetadataService
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
Expand Down Expand Up @@ -63,6 +61,8 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.remo
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.TransportRemovePolicyAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.RetryFailedManagedIndexAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.TransportRetryFailedManagedIndexAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import org.opensearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
import org.opensearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction
import org.opensearch.indexmanagement.refreshanalyzer.TransportRefreshSearchAnalyzerAction
Expand All @@ -73,20 +73,20 @@ import org.opensearch.indexmanagement.rollup.RollupRunner
import org.opensearch.indexmanagement.rollup.RollupSearchService
import org.opensearch.indexmanagement.rollup.action.delete.DeleteRollupAction
import org.opensearch.indexmanagement.rollup.action.delete.TransportDeleteRollupAction
import org.opensearch.indexmanagement.rollup.action.explain.ExplainRollupAction
import org.opensearch.indexmanagement.rollup.action.explain.TransportExplainRollupAction
import org.opensearch.indexmanagement.rollup.action.get.GetRollupAction
import org.opensearch.indexmanagement.rollup.action.get.GetRollupsAction
import org.opensearch.indexmanagement.rollup.action.get.TransportGetRollupAction
import org.opensearch.indexmanagement.rollup.action.get.TransportGetRollupsAction
import org.opensearch.indexmanagement.rollup.action.index.IndexRollupAction
import org.opensearch.indexmanagement.rollup.action.index.TransportIndexRollupAction
import org.opensearch.indexmanagement.rollup.action.mapping.TransportUpdateRollupMappingAction
import org.opensearch.indexmanagement.rollup.action.mapping.UpdateRollupMappingAction
import org.opensearch.indexmanagement.rollup.action.start.StartRollupAction
import org.opensearch.indexmanagement.rollup.action.start.TransportStartRollupAction
import org.opensearch.indexmanagement.rollup.action.stop.StopRollupAction
import org.opensearch.indexmanagement.rollup.action.stop.TransportStopRollupAction
import org.opensearch.indexmanagement.rollup.action.explain.ExplainRollupAction
import org.opensearch.indexmanagement.rollup.action.explain.TransportExplainRollupAction
import org.opensearch.indexmanagement.rollup.action.get.GetRollupsAction
import org.opensearch.indexmanagement.rollup.action.get.TransportGetRollupsAction
import org.opensearch.indexmanagement.rollup.action.mapping.TransportUpdateRollupMappingAction
import org.opensearch.indexmanagement.rollup.action.mapping.UpdateRollupMappingAction
import org.opensearch.indexmanagement.rollup.actionfilter.FieldCapsFilter
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor
import org.opensearch.indexmanagement.rollup.model.Rollup
Expand All @@ -98,6 +98,33 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestIndexRollupAction
import org.opensearch.indexmanagement.rollup.resthandler.RestStartRollupAction
import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.transform.TransformRunner
import org.opensearch.indexmanagement.transform.action.delete.DeleteTransformsAction
import org.opensearch.indexmanagement.transform.action.delete.TransportDeleteTransformsAction
import org.opensearch.indexmanagement.transform.action.explain.ExplainTransformAction
import org.opensearch.indexmanagement.transform.action.explain.TransportExplainTransformAction
import org.opensearch.indexmanagement.transform.action.get.GetTransformAction
import org.opensearch.indexmanagement.transform.action.get.GetTransformsAction
import org.opensearch.indexmanagement.transform.action.get.TransportGetTransformAction
import org.opensearch.indexmanagement.transform.action.get.TransportGetTransformsAction
import org.opensearch.indexmanagement.transform.action.index.IndexTransformAction
import org.opensearch.indexmanagement.transform.action.index.TransportIndexTransformAction
import org.opensearch.indexmanagement.transform.action.preview.PreviewTransformAction
import org.opensearch.indexmanagement.transform.action.preview.TransportPreviewTransformAction
import org.opensearch.indexmanagement.transform.action.start.StartTransformAction
import org.opensearch.indexmanagement.transform.action.start.TransportStartTransformAction
import org.opensearch.indexmanagement.transform.action.stop.StopTransformAction
import org.opensearch.indexmanagement.transform.action.stop.TransportStopTransformAction
import org.opensearch.indexmanagement.transform.model.Transform
import org.opensearch.indexmanagement.transform.model.TransformMetadata
import org.opensearch.indexmanagement.transform.resthandler.RestDeleteTransformAction
import org.opensearch.indexmanagement.transform.resthandler.RestExplainTransformAction
import org.opensearch.indexmanagement.transform.resthandler.RestGetTransformAction
import org.opensearch.indexmanagement.transform.resthandler.RestIndexTransformAction
import org.opensearch.indexmanagement.transform.resthandler.RestPreviewTransformAction
import org.opensearch.indexmanagement.transform.resthandler.RestStartTransformAction
import org.opensearch.indexmanagement.transform.resthandler.RestStopTransformAction
import org.opensearch.indexmanagement.transform.settings.TransformSettings
import org.opensearch.jobscheduler.spi.JobSchedulerExtension
import org.opensearch.jobscheduler.spi.ScheduledJobParser
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
Expand Down Expand Up @@ -132,6 +159,7 @@ import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportInterceptor
import org.opensearch.watcher.ResourceWatcherService
import java.util.function.Supplier
import org.opensearch.monitor.jvm.JvmService
import org.opensearch.common.component.Lifecycle
import org.opensearch.common.component.LifecycleComponent
import org.opensearch.common.component.LifecycleListener
Expand All @@ -155,6 +183,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
const val PLUGINS_BASE_URI = "/_plugins"
const val ISM_BASE_URI = "$PLUGINS_BASE_URI/_ism"
const val ROLLUP_BASE_URI = "$PLUGINS_BASE_URI/_rollup"
const val TRANSFORM_BASE_URI = "$PLUGINS_BASE_URI/_transform"
const val POLICY_BASE_URI = "$ISM_BASE_URI/policies"
const val ROLLUP_JOBS_BASE_URI = "$ROLLUP_BASE_URI/jobs"
const val INDEX_MANAGEMENT_INDEX = ".opendistro-ism-config"
Expand Down Expand Up @@ -198,6 +227,12 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RollupMetadata.ROLLUP_METADATA_TYPE -> {
return@ScheduledJobParser null
}
Transform.TRANSFORM_TYPE -> {
return@ScheduledJobParser Transform.parse(xcp, id, jobDocVersion.seqNo, jobDocVersion.primaryTerm)
}
TransformMetadata.TRANSFORM_METADATA_TYPE -> {
return@ScheduledJobParser null
}
ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE -> {
return@ScheduledJobParser null
}
Expand Down Expand Up @@ -235,7 +270,14 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RestIndexRollupAction(),
RestStartRollupAction(),
RestStopRollupAction(),
RestExplainRollupAction()
RestExplainRollupAction(),
RestIndexTransformAction(),
RestGetTransformAction(),
RestPreviewTransformAction(),
RestDeleteTransformAction(),
RestExplainTransformAction(),
RestStartTransformAction(),
RestStopTransformAction()
)
}

Expand Down Expand Up @@ -267,6 +309,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
.registerMetadataServices(RollupMetadataService(client, xContentRegistry))
.registerConsumers()
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
val jvmService = JvmService(environment.settings())
val transformRunner = TransformRunner.initialize(client, clusterService, xContentRegistry, settings, indexNameExpressionResolver, jvmService)
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
this.indexNameExpressionResolver = indexNameExpressionResolver

Expand Down Expand Up @@ -296,7 +340,9 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
val managedIndexCoordinator = ManagedIndexCoordinator(environment.settings(),
client, clusterService, threadPool, indexManagementIndices, metadataService)

return listOf(managedIndexRunner, rollupRunner, indexManagementIndices, managedIndexCoordinator, indexStateManagementHistory)
return listOf(
managedIndexRunner, rollupRunner, transformRunner, indexManagementIndices, managedIndexCoordinator, indexStateManagementHistory
)
}

override fun getSettings(): List<Setting<*>> {
Expand Down Expand Up @@ -326,6 +372,12 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RollupSettings.ROLLUP_ENABLED,
RollupSettings.ROLLUP_SEARCH_ENABLED,
RollupSettings.ROLLUP_DASHBOARDS,
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT,
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS,
TransformSettings.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT,
TransformSettings.TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS,
TransformSettings.TRANSFORM_CIRCUIT_BREAKER_ENABLED,
TransformSettings.TRANSFORM_CIRCUIT_BREAKER_JVM_THRESHOLD,
LegacyOpenDistroManagedIndexSettings.HISTORY_ENABLED,
LegacyOpenDistroManagedIndexSettings.HISTORY_INDEX_MAX_AGE,
LegacyOpenDistroManagedIndexSettings.HISTORY_MAX_DOCS,
Expand Down Expand Up @@ -374,7 +426,15 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ActionPlugin.ActionHandler(StartRollupAction.INSTANCE, TransportStartRollupAction::class.java),
ActionPlugin.ActionHandler(StopRollupAction.INSTANCE, TransportStopRollupAction::class.java),
ActionPlugin.ActionHandler(ExplainRollupAction.INSTANCE, TransportExplainRollupAction::class.java),
ActionPlugin.ActionHandler(UpdateRollupMappingAction.INSTANCE, TransportUpdateRollupMappingAction::class.java)
ActionPlugin.ActionHandler(UpdateRollupMappingAction.INSTANCE, TransportUpdateRollupMappingAction::class.java),
ActionPlugin.ActionHandler(IndexTransformAction.INSTANCE, TransportIndexTransformAction::class.java),
ActionPlugin.ActionHandler(GetTransformAction.INSTANCE, TransportGetTransformAction::class.java),
ActionPlugin.ActionHandler(GetTransformsAction.INSTANCE, TransportGetTransformsAction::class.java),
ActionPlugin.ActionHandler(PreviewTransformAction.INSTANCE, TransportPreviewTransformAction::class.java),
ActionPlugin.ActionHandler(DeleteTransformsAction.INSTANCE, TransportDeleteTransformsAction::class.java),
ActionPlugin.ActionHandler(ExplainTransformAction.INSTANCE, TransportExplainTransformAction::class.java),
ActionPlugin.ActionHandler(StartTransformAction.INSTANCE, TransportStartTransformAction::class.java),
ActionPlugin.ActionHandler(StopTransformAction.INSTANCE, TransportStopTransformAction::class.java)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.rollup.RollupRunner
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.transform.TransformRunner
import org.opensearch.indexmanagement.transform.model.Transform
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
Expand All @@ -43,6 +45,7 @@ object IndexManagementRunner : ScheduledJobRunner {
when (job) {
is ManagedIndexConfig -> ManagedIndexRunner.runJob(job, context)
is Rollup -> RollupRunner.runJob(job, context)
is Transform -> TransformRunner.runJob(job, context)
else -> {
val errorMessage = "Invalid job type, found ${job.javaClass.simpleName} with id: ${context.jobId}"
logger.error(errorMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
* permissions and limitations under the License.
*/

package org.opensearch.indexmanagement.rollup.model.dimension
package org.opensearch.indexmanagement.common.model.dimension

import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
Expand All @@ -37,6 +38,9 @@ import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder
import java.io.IOException
import java.time.ZoneId
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder
import org.opensearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval

data class DateHistogram(
override val sourceField: String,
Expand Down Expand Up @@ -81,6 +85,30 @@ data class DateHistogram(
out.writeZoneId(timezone)
}

override fun toSourceBuilder(appendType: Boolean): CompositeValuesSourceBuilder<*> {
val calendarInterval = this.calendarInterval
val fixedInterval = this.fixedInterval
val name = if (appendType) "${this.targetField}.${Type.DATE_HISTOGRAM.type}" else this.targetField

return DateHistogramValuesSourceBuilder(name)
.missingBucket(true)
.field(this.sourceField)
.timeZone(this.timezone)
.apply {
calendarInterval?.let {
this.calendarInterval(DateHistogramInterval(it))
}
fixedInterval?.let {
this.fixedInterval(DateHistogramInterval(it))
}
}
}

override fun canBeRealizedInMappings(mappings: Map<String, Any>): Boolean {
val fieldType = getFieldFromMappings(sourceField, mappings)?.get("type") ?: return false
return "date" == fieldType
}

fun getRewrittenAggregation(
aggregationBuilder: DateHistogramAggregationBuilder,
subAggregations: AggregatorFactories.Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
* permissions and limitations under the License.
*/

package org.opensearch.indexmanagement.rollup.model.dimension
package org.opensearch.indexmanagement.common.model.dimension

import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder

abstract class Dimension(
val type: Type,
Expand All @@ -48,6 +49,15 @@ abstract class Dimension(
}
}

abstract fun toSourceBuilder(appendType: Boolean = false): CompositeValuesSourceBuilder<*>

/**
* Helper method that evaluates if the dimension can be realized using mappings provided.
*
* e.g. A date_histogram dimension on source_field "a" can only be possible in mappings that contain "date" type field "a".
*/
abstract fun canBeRealizedInMappings(mappings: Map<String, Any>): Boolean

companion object {
const val DIMENSION_SOURCE_FIELD_FIELD = "source_field"
const val DIMENSION_TARGET_FIELD_FIELD = "target_field"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
* permissions and limitations under the License.
*/

package org.opensearch.indexmanagement.rollup.model.dimension
package org.opensearch.indexmanagement.common.model.dimension

import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
Expand All @@ -36,6 +37,9 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder
import java.io.IOException
import org.opensearch.index.mapper.NumberFieldMapper
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder
import org.opensearch.search.aggregations.bucket.composite.HistogramValuesSourceBuilder

// TODO: Verify if offset, missing value, min_doc_count, extended_bounds are usable in Composite histogram source
data class Histogram(
Expand Down Expand Up @@ -72,6 +76,25 @@ data class Histogram(
out.writeDouble(interval)
}

override fun toSourceBuilder(appendType: Boolean): CompositeValuesSourceBuilder<*> {
val name = if (appendType) "${this.targetField}.${Type.HISTOGRAM.type}" else this.targetField
return HistogramValuesSourceBuilder(name)
.missingBucket(true)
.field(this.sourceField)
.interval(this.interval)
}

override fun canBeRealizedInMappings(mappings: Map<String, Any>): Boolean {
val fieldType = getFieldFromMappings(sourceField, mappings)?.get("type") ?: return false

val numberTypes = mutableSetOf<String>()
NumberFieldMapper.NumberType.values().forEach {
numberTypes.add(it.typeName())
}

return fieldType in numberTypes
}

fun getRewrittenAggregation(
aggregationBuilder: HistogramAggregationBuilder,
subAggregations: AggregatorFactories.Builder
Expand Down
Loading