Skip to content

Commit 2d3fd97

Browse files
downsrobthalurur
andauthored
Refactor IndexManagement to support custom actions (#288)
* Adding IM SPI (#216) Signed-off-by: Robert Downs <downsrob@amazon.com> Co-authored-by: Ravi <6005951+thalurur@users.noreply.github.com>
1 parent 20ff74e commit 2d3fd97

File tree

182 files changed

+4494
-3493
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

182 files changed

+4494
-3493
lines changed

build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ task ktlint(type: JavaExec, group: "verification") {
104104
description = "Check Kotlin code style."
105105
main = "com.pinterest.ktlint.Main"
106106
classpath = configurations.ktlint
107-
args "src/**/*.kt"
107+
args "src/**/*.kt", "spi/src/main/**/*.kt"
108108
}
109109

110110
check.dependsOn ktlint
@@ -113,7 +113,7 @@ task ktlintFormat(type: JavaExec, group: "formatting") {
113113
description = "Fix Kotlin code style deviations."
114114
main = "com.pinterest.ktlint.Main"
115115
classpath = configurations.ktlint
116-
args "-F", "src/**/*.kt"
116+
args "-F", "src/**/*.kt", "spi/src/main/**/*.kt"
117117
}
118118

119119
detekt {
@@ -148,6 +148,7 @@ dependencies {
148148
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
149149
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9'
150150
compile "org.jetbrains:annotations:13.0"
151+
compile project(path: ":${rootProject.name}-spi", configuration: 'shadow')
151152
compile "org.opensearch:notification:${notification_version}"
152153
compile "org.opensearch:common-utils:${common_utils_version}"
153154
compile "com.github.seancfoley:ipaddress:5.3.3"

settings.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,6 @@
44
*/
55

66
rootProject.name = 'opensearch-index-management'
7+
8+
include "spi"
9+
project(":spi").name = rootProject.name + "-spi"

spi/build.gradle

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
import org.opensearch.gradle.test.RestIntegTestTask
6+
7+
plugins {
8+
id 'com.github.johnrengelman.shadow'
9+
id 'jacoco'
10+
}
11+
12+
apply plugin: 'opensearch.java'
13+
apply plugin: 'opensearch.testclusters'
14+
apply plugin: 'opensearch.java-rest-test'
15+
apply plugin: 'kotlin'
16+
apply plugin: 'org.jetbrains.kotlin.jvm'
17+
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'
18+
19+
ext {
20+
projectSubstitutions = [:]
21+
licenseFile = rootProject.file('LICENSE.txt')
22+
noticeFile = rootProject.file('NOTICE')
23+
}
24+
25+
jacoco {
26+
toolVersion = '0.8.5'
27+
reportsDir = file("$buildDir/JacocoReport")
28+
}
29+
30+
jacocoTestReport {
31+
reports {
32+
xml.enabled false
33+
csv.enabled false
34+
html.destination file("${buildDir}/jacoco/")
35+
}
36+
}
37+
check.dependsOn jacocoTestReport
38+
39+
repositories {
40+
mavenLocal()
41+
mavenCentral()
42+
maven { url "https://plugins.gradle.org/m2/" }
43+
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
44+
}
45+
46+
configurations.all {
47+
if (it.state != Configuration.State.UNRESOLVED) return
48+
resolutionStrategy {
49+
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
50+
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
51+
}
52+
}
53+
54+
dependencies {
55+
compileOnly "org.opensearch:opensearch:${opensearch_version}"
56+
compileOnly "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
57+
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
58+
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
59+
compileOnly "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9"
60+
compileOnly "org.opensearch:common-utils:${common_utils_version}"
61+
62+
testImplementation "org.opensearch.test:framework:${opensearch_version}"
63+
testImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
64+
}
65+
66+
test {
67+
doFirst {
68+
test.classpath -= project.files(project.tasks.named('shadowJar'))
69+
test.classpath -= project.configurations.getByName(ShadowBasePlugin.CONFIGURATION_NAME)
70+
test.classpath += project.extensions.getByType(SourceSetContainer).getByName(SourceSet.MAIN_SOURCE_SET_NAME).runtimeClasspath
71+
}
72+
systemProperty 'tests.security.manager', 'false'
73+
}
74+
75+
task integTest(type: RestIntegTestTask) {
76+
description 'Run integ test with opensearch test framework'
77+
group 'verification'
78+
systemProperty 'tests.security.manager', 'false'
79+
dependsOn test
80+
}
81+
check.dependsOn integTest
82+
83+
testClusters.javaRestTest {
84+
testDistribution = 'INTEG_TEST'
85+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi
7+
8+
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
9+
import org.opensearch.indexmanagement.spi.indexstatemanagement.DefaultStatusChecker
10+
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
11+
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker
12+
13+
/**
14+
* SPI for IndexManagement
15+
*/
16+
interface IndexManagementExtension {
17+
18+
/**
19+
* List of action parsers that are supported by the extension, each of the action parser will parse the policy action into the defined action.
20+
* The ActionParser provides the ability to parse the action
21+
*/
22+
fun getISMActionParsers(): List<ActionParser>
23+
24+
/**
25+
* Status checker is used by IndexManagement to check the status of the extension before executing the actions registered by the extension.
26+
* Actions registered by the plugin can only be executed if in enabled, otherwise the action fails without retries. The status returned
27+
* should represent if the extension is enabled or disabled, and should not represent extension health or the availability of some extension
28+
* dependency.
29+
*/
30+
fun statusChecker(): StatusChecker {
31+
return DefaultStatusChecker()
32+
}
33+
34+
/**
35+
* Name of the extension
36+
*/
37+
fun getExtensionName(): String
38+
39+
/**
40+
* Not Required to override but if extension moves the index metadata outside of cluster state and requires IndexManagement to manage these
41+
* indices provide the metadata service that can provide the index metadata for these indices. An extension need to label the metadata service
42+
* with a type string which is used to distinguish indices in IndexManagement plugin
43+
*/
44+
fun getIndexMetadataService(): Map<String, IndexMetadataService> {
45+
return mapOf()
46+
}
47+
48+
/**
49+
* Caution: Experimental and can be removed in future
50+
*
51+
* If extension wants IndexManagement to determine cluster state indices UUID based on custom index setting if
52+
* present of cluster state override this method.
53+
*/
54+
fun overrideClusterStateIndexUuidSetting(): String? {
55+
return null
56+
}
57+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement
7+
8+
import org.opensearch.common.io.stream.StreamOutput
9+
import org.opensearch.common.io.stream.Writeable
10+
import org.opensearch.common.xcontent.ToXContent
11+
import org.opensearch.common.xcontent.ToXContentObject
12+
import org.opensearch.common.xcontent.XContentBuilder
13+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
14+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
15+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
16+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
17+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
18+
import java.time.Instant
19+
20+
abstract class Action(
21+
val type: String,
22+
val actionIndex: Int
23+
) : ToXContentObject, Writeable {
24+
25+
var configTimeout: ActionTimeout? = null
26+
var configRetry: ActionRetry? = ActionRetry(DEFAULT_RETRIES)
27+
var customAction: Boolean = false
28+
29+
final override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
30+
builder.startObject()
31+
configTimeout?.toXContent(builder, params)
32+
configRetry?.toXContent(builder, params)
33+
// Include a "custom" object wrapper for custom actions to allow extensions to put arbitrary action configs in the config
34+
// index. The EXCLUDE_CUSTOM_FIELD_PARAM is used to not include this wrapper in api responses
35+
if (customAction && !params.paramAsBoolean(EXCLUDE_CUSTOM_FIELD_PARAM, false)) builder.startObject(CUSTOM_ACTION_FIELD)
36+
populateAction(builder, params)
37+
if (customAction && !params.paramAsBoolean(EXCLUDE_CUSTOM_FIELD_PARAM, false)) builder.endObject()
38+
return builder.endObject()
39+
}
40+
41+
/**
42+
* The implementer of Action can change this method to correctly serialize the internals of the action
43+
* when stored internally or returned as response
44+
*/
45+
open fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
46+
builder.startObject(type).endObject()
47+
}
48+
49+
final override fun writeTo(out: StreamOutput) {
50+
out.writeString(type)
51+
out.writeOptionalWriteable(configTimeout)
52+
out.writeOptionalWriteable(configRetry)
53+
populateAction(out)
54+
}
55+
56+
fun getUpdatedActionMetadata(managedIndexMetaData: ManagedIndexMetaData, stateName: String): ActionMetaData {
57+
val stateMetaData = managedIndexMetaData.stateMetaData
58+
val actionMetaData = managedIndexMetaData.actionMetaData
59+
60+
return when {
61+
// start a new action
62+
stateMetaData?.name != stateName ->
63+
ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null)
64+
actionMetaData?.index != this.actionIndex ->
65+
ActionMetaData(this.type, Instant.now().toEpochMilli(), this.actionIndex, false, 0, 0, null)
66+
// RetryAPI will reset startTime to null for actionMetaData and we'll reset it to "now" here
67+
else -> actionMetaData.copy(startTime = actionMetaData.startTime ?: Instant.now().toEpochMilli())
68+
}
69+
}
70+
71+
/**
72+
* The implementer of Action can change this method to correctly serialize the internals of the action
73+
* when data is shared between nodes
74+
*/
75+
open fun populateAction(out: StreamOutput) {
76+
out.writeInt(actionIndex)
77+
}
78+
79+
/**
80+
* Get all the steps associated with the action
81+
*/
82+
abstract fun getSteps(): List<Step>
83+
84+
/**
85+
* Get the current step to execute in the action
86+
*/
87+
abstract fun getStepToExecute(context: StepContext): Step
88+
89+
final fun isLastStep(stepName: String): Boolean = getSteps().last().name == stepName
90+
91+
final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName
92+
93+
/*
94+
* Gets if the managedIndexMetaData reflects a state in which this action has completed successfully. Used in the
95+
* runner when determining if the index metadata should be deleted. If the action isFinishedSuccessfully and
96+
* deleteIndexMetadataAfterFinish is set to true, then we issue a request to delete the managedIndexConfig and its
97+
* managedIndexMetadata.
98+
*/
99+
final fun isFinishedSuccessfully(managedIndexMetaData: ManagedIndexMetaData): Boolean {
100+
val policyRetryInfo = managedIndexMetaData.policyRetryInfo
101+
if (policyRetryInfo?.failed == true) return false
102+
val actionMetaData = managedIndexMetaData.actionMetaData
103+
if (actionMetaData == null || actionMetaData.failed || actionMetaData.name != this.type) return false
104+
val stepMetaData = managedIndexMetaData.stepMetaData
105+
if (stepMetaData == null || !isLastStep(stepMetaData.name) || stepMetaData.stepStatus != Step.StepStatus.COMPLETED) return false
106+
return true
107+
}
108+
109+
/*
110+
* Denotes if the index metadata in the config index should be deleted for the index this action has just
111+
* successfully finished running on. This may be used by custom actions which delete some off-cluster index,
112+
* and following the action's success, the managed index config and metadata need to be deleted.
113+
*/
114+
open fun deleteIndexMetadataAfterFinish(): Boolean = false
115+
116+
companion object {
117+
const val DEFAULT_RETRIES = 3L
118+
const val CUSTOM_ACTION_FIELD = "custom"
119+
const val EXCLUDE_CUSTOM_FIELD_PARAM = "exclude_custom"
120+
}
121+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement
7+
8+
import org.opensearch.common.io.stream.StreamInput
9+
import org.opensearch.common.xcontent.XContentParser
10+
11+
abstract class ActionParser(var customAction: Boolean = false) {
12+
13+
/**
14+
* The action type parser will parse
15+
*/
16+
abstract fun getActionType(): String
17+
18+
/**
19+
* Deserialize Action from stream input
20+
*/
21+
abstract fun fromStreamInput(sin: StreamInput): Action
22+
23+
/**
24+
* Deserialize Action from xContent
25+
*/
26+
abstract fun fromXContent(xcp: XContentParser, index: Int): Action
27+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement
7+
8+
import org.opensearch.client.Client
9+
import org.opensearch.cluster.service.ClusterService
10+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata
11+
12+
/**
13+
* ISM by default considers all the index metadata to be part of the cluster state,
14+
* if that doesn't hold true and indices metadata is present in some other place and
15+
* ISM still need to manage these indices the following interface provides a mechanism
16+
* for ISM extensions to register the metadata service for the type so ISM can get the
17+
* index metadata for these special type of indices.
18+
*
19+
* ISM Rest APIs allows support for type param which determines the type of index, if there
20+
* is a registered metadata service for the type - ISM will use the service to get the metadata
21+
* else uses the default i.e cluster state
22+
*/
23+
interface IndexMetadataService {
24+
25+
/**
26+
* Returns the index metadata needed for ISM
27+
*/
28+
suspend fun getMetadata(indices: List<String>, client: Client, clusterService: ClusterService): Map<String, ISMIndexMetadata>
29+
30+
/**
31+
* Returns all the indices metadata
32+
*/
33+
suspend fun getMetadataForAllIndices(client: Client, clusterService: ClusterService): Map<String, ISMIndexMetadata>
34+
35+
/**
36+
* Returns an optional setting path which, when set to true in the index settings, overrides a cluster level metadata write block.
37+
*/
38+
fun getIndexMetadataWriteOverrideSetting(): String? = null
39+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement
7+
8+
import org.opensearch.cluster.ClusterState
9+
10+
interface StatusChecker {
11+
12+
/**
13+
* checks and returns the status of the extension
14+
*/
15+
fun check(clusterState: ClusterState): Status {
16+
return Status.ENABLED
17+
}
18+
}
19+
20+
enum class Status(private val value: String) {
21+
ENABLED("enabled"),
22+
DISABLED("disabled");
23+
24+
override fun toString(): String {
25+
return value
26+
}
27+
}
28+
29+
class DefaultStatusChecker : StatusChecker

0 commit comments

Comments
 (0)