Skip to content

Commit c82793f

Browse files
committed
Adding IM SPI
Signed-off-by: Ravi Thaluru <thalurur@users.noreply.github.com>
1 parent d718d7c commit c82793f

21 files changed

+1433
-2
lines changed

build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ task ktlint(type: JavaExec, group: "verification") {
101101
description = "Check Kotlin code style."
102102
main = "com.pinterest.ktlint.Main"
103103
classpath = configurations.ktlint
104-
args "src/**/*.kt"
104+
args "src/**/*.kt", "spi/src/main/**/*.kt"
105105
}
106106

107107
check.dependsOn ktlint
@@ -110,7 +110,7 @@ task ktlintFormat(type: JavaExec, group: "formatting") {
110110
description = "Fix Kotlin code style deviations."
111111
main = "com.pinterest.ktlint.Main"
112112
classpath = configurations.ktlint
113-
args "-F", "src/**/*.kt"
113+
args "-F", "src/**/*.kt", "spi/src/main/**/*.kt"
114114
}
115115

116116
detekt {
@@ -145,6 +145,7 @@ dependencies {
145145
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
146146
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9'
147147
compile "org.jetbrains:annotations:13.0"
148+
compile project(path: ":${rootProject.name}-spi", configuration: 'shadow')
148149
compile "org.opensearch:notification:${notification_version}"
149150
compile "org.opensearch:common-utils:${common_utils_version}"
150151
compile "com.github.seancfoley:ipaddress:5.3.3"

settings.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,6 @@
44
*/
55

66
rootProject.name = 'opensearch-index-management'
7+
8+
include "spi"
9+
project(":spi").name = rootProject.name + "-spi"

spi/build.gradle

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
import org.opensearch.gradle.test.RestIntegTestTask
6+
7+
plugins {
8+
id 'com.github.johnrengelman.shadow'
9+
id 'jacoco'
10+
}
11+
12+
apply plugin: 'opensearch.java'
13+
apply plugin: 'opensearch.testclusters'
14+
apply plugin: 'opensearch.java-rest-test'
15+
apply plugin: 'kotlin'
16+
apply plugin: 'org.jetbrains.kotlin.jvm'
17+
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'
18+
19+
ext {
20+
projectSubstitutions = [:]
21+
licenseFile = rootProject.file('LICENSE.txt')
22+
noticeFile = rootProject.file('NOTICE')
23+
}
24+
25+
jacoco {
26+
toolVersion = '0.8.5'
27+
reportsDir = file("$buildDir/JacocoReport")
28+
}
29+
30+
jacocoTestReport {
31+
reports {
32+
xml.enabled false
33+
csv.enabled false
34+
html.destination file("${buildDir}/jacoco/")
35+
}
36+
}
37+
check.dependsOn jacocoTestReport
38+
39+
repositories {
40+
mavenLocal()
41+
mavenCentral()
42+
maven { url "https://plugins.gradle.org/m2/" }
43+
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
44+
}
45+
46+
configurations.all {
47+
if (it.state != Configuration.State.UNRESOLVED) return
48+
resolutionStrategy {
49+
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
50+
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
51+
}
52+
}
53+
54+
dependencies {
55+
compileOnly "org.opensearch:opensearch:${opensearch_version}"
56+
compileOnly "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
57+
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
58+
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
59+
compileOnly "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9"
60+
compileOnly "org.opensearch:common-utils:${common_utils_version}"
61+
62+
testImplementation "org.opensearch.test:framework:${opensearch_version}"
63+
testImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
64+
}
65+
66+
test {
67+
doFirst {
68+
test.classpath -= project.files(project.tasks.named('shadowJar'))
69+
test.classpath -= project.configurations.getByName(ShadowBasePlugin.CONFIGURATION_NAME)
70+
test.classpath += project.extensions.getByType(SourceSetContainer).getByName(SourceSet.MAIN_SOURCE_SET_NAME).runtimeClasspath
71+
}
72+
systemProperty 'tests.security.manager', 'false'
73+
}
74+
75+
task integTest(type: RestIntegTestTask) {
76+
description 'Run integ test with opensearch test framework'
77+
group 'verification'
78+
systemProperty 'tests.security.manager', 'false'
79+
dependsOn test
80+
}
81+
check.dependsOn integTest
82+
83+
testClusters.javaRestTest {
84+
testDistribution = 'INTEG_TEST'
85+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi
7+
8+
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
9+
import org.opensearch.indexmanagement.spi.indexstatemanagement.ClusterEventHandler
10+
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
11+
12+
/**
13+
* SPI for IndexManagement
14+
*/
15+
interface IndexManagementExtension {
16+
17+
/**
18+
* List of action parsers that are supported by the extension, each of the action parser will parse the policy action into the defined action.
19+
* The ActionParser provides the ability to parse the action
20+
*/
21+
fun getISMActionParsers(): List<ActionParser>
22+
23+
/**
24+
* Not Required to override but if extension is introducing a new index type and special handling is needed to handle this type
25+
* use this to provide the metadata service for the new index types
26+
*/
27+
fun getIndexMetadataService(): Map<String, IndexMetadataService> {
28+
return mapOf()
29+
}
30+
31+
/**
32+
* Not required to override but if extension wants to evaluate the cluster events before deciding whether to auto manage indices
33+
* on index creation or should/not clean up managed indices when indices are deleted - add new handlers for the sepcific event type
34+
*/
35+
fun getClusterEventHandlers(): Map<ClusterEventType, ClusterEventHandler> {
36+
return mapOf()
37+
}
38+
}
39+
40+
enum class ClusterEventType(val type: String) {
41+
CREATE("create"),
42+
DELETE("delete");
43+
44+
override fun toString(): String {
45+
return type
46+
}
47+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement
7+
8+
import org.opensearch.common.io.stream.StreamOutput
9+
import org.opensearch.common.io.stream.Writeable
10+
import org.opensearch.common.xcontent.ToXContent
11+
import org.opensearch.common.xcontent.ToXContentObject
12+
import org.opensearch.common.xcontent.XContentBuilder
13+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
14+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
15+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
16+
17+
abstract class Action(
18+
val type: String,
19+
val actionIndex: Int
20+
) : ToXContentObject, Writeable {
21+
22+
var configTimeout: ActionTimeout? = null
23+
var configRetry: ActionRetry? = null
24+
25+
final override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
26+
builder.startObject()
27+
configTimeout?.toXContent(builder, params)
28+
configRetry?.toXContent(builder, params)
29+
// TODO: We should add "custom" object wrapper based on the params
30+
populateAction(builder, params)
31+
return builder.endObject()
32+
}
33+
34+
/**
35+
* The implementer of Action can change this method to correctly serialize the internals of the action
36+
* when stored internally or returned as response
37+
*/
38+
open fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
39+
builder.startObject(type).endObject()
40+
}
41+
42+
final override fun writeTo(out: StreamOutput) {
43+
out.writeString(type)
44+
out.writeOptionalWriteable(configTimeout)
45+
out.writeOptionalWriteable(configRetry)
46+
populateAction(out)
47+
}
48+
49+
/**
50+
* The implementer of Action can change this method to correctly serialize the internals of the action
51+
* when data is shared between nodes
52+
*/
53+
open fun populateAction(out: StreamOutput) {
54+
out.writeInt(actionIndex)
55+
}
56+
57+
/**
58+
* Get all the steps associated with the action
59+
*/
60+
abstract fun getSteps(): List<Step>
61+
62+
/**
63+
* Get the current step to execute in the action
64+
*/
65+
abstract fun getStepToExecute(context: StepContext): Step
66+
67+
final fun isLastStep(stepName: String): Boolean = getSteps().last().name == stepName
68+
69+
final fun isFirstStep(stepName: String): Boolean = getSteps().first().name == stepName
70+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement
7+
8+
import org.opensearch.common.io.stream.StreamInput
9+
import org.opensearch.common.xcontent.XContentParser
10+
11+
abstract class ActionParser(var customAction: Boolean = false) {
12+
13+
/**
14+
* The action type parser will parse
15+
*/
16+
abstract fun getActionType(): String
17+
18+
/**
19+
* Deserialize Action from stream input
20+
*/
21+
abstract fun fromStreamInput(sin: StreamInput): Action
22+
23+
/**
24+
* Deserialize Action from xContent
25+
*/
26+
abstract fun fromXContent(xcp: XContentParser, index: Int): Action
27+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement
7+
8+
import org.opensearch.client.Client
9+
import org.opensearch.cluster.ClusterChangedEvent
10+
import org.opensearch.cluster.service.ClusterService
11+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.Decision
12+
13+
abstract class ClusterEventHandler {
14+
15+
abstract fun processEvent(client: Client, clusterService: ClusterService, event: ClusterChangedEvent): Decision
16+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement
7+
8+
import org.opensearch.client.Client
9+
import org.opensearch.cluster.service.ClusterService
10+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata
11+
12+
interface IndexMetadataService {
13+
14+
/**
15+
* Returns the index metadata needed for ISM
16+
*/
17+
fun getMetadata(indices: List<String>, client: Client, clusterService: ClusterService): Map<String, ISMIndexMetadata>
18+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement
7+
8+
import org.apache.logging.log4j.Logger
9+
import org.opensearch.common.io.stream.StreamInput
10+
import org.opensearch.common.io.stream.StreamOutput
11+
import org.opensearch.common.io.stream.Writeable
12+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
13+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
14+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
15+
import java.time.Instant
16+
import java.util.Locale
17+
18+
abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {
19+
20+
var context: StepContext? = null
21+
private set
22+
23+
fun preExecute(logger: Logger, context: StepContext): Step {
24+
logger.info("Executing $name for ${context.metadata.index}")
25+
this.context = context
26+
return this
27+
}
28+
29+
abstract suspend fun execute(): Step
30+
31+
fun postExecute(logger: Logger): Step {
32+
logger.info("Finished executing $name for ${context?.metadata?.index}")
33+
this.context = null
34+
return this
35+
}
36+
37+
abstract fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData
38+
39+
abstract fun isIdempotent(): Boolean
40+
41+
final fun getStepStartTime(metadata: ManagedIndexMetaData): Instant {
42+
return when {
43+
metadata.stepMetaData == null -> Instant.now()
44+
metadata.stepMetaData.name != this.name -> Instant.now()
45+
// The managed index metadata is a historical snapshot of the metadata and refers to what has happened from the previous
46+
// execution, so if we ever see it as COMPLETED it means we are always going to be in a new step, this specifically
47+
// helps with the Transition -> Transition (empty state) sequence which the above do not capture
48+
metadata.stepMetaData.stepStatus == StepStatus.COMPLETED -> Instant.now()
49+
else -> Instant.ofEpochMilli(metadata.stepMetaData.startTime)
50+
}
51+
}
52+
53+
final fun getStartingStepMetaData(metadata: ManagedIndexMetaData): StepMetaData = StepMetaData(name, getStepStartTime(metadata).toEpochMilli(), StepStatus.STARTING)
54+
55+
enum class StepStatus(val status: String) : Writeable {
56+
STARTING("starting"),
57+
CONDITION_NOT_MET("condition_not_met"),
58+
FAILED("failed"),
59+
COMPLETED("completed");
60+
61+
override fun toString(): String {
62+
return status
63+
}
64+
65+
override fun writeTo(out: StreamOutput) {
66+
out.writeString(status)
67+
}
68+
69+
companion object {
70+
fun read(streamInput: StreamInput): StepStatus {
71+
return valueOf(streamInput.readString().toUpperCase(Locale.ROOT))
72+
}
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)