Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
with:
repository: 'opensearch-project/common-utils'
path: common-utils
ref: '1.0'
ref: 'main'
- name: Build common-utils
working-directory: ./common-utils
run: ./gradlew publishToMavenLocal -Dopensearch.version=1.0.0
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
with:
repository: 'opensearch-project/common-utils'
path: common-utils
ref: '1.0'
ref: 'main'
- name: Build common-utils
working-directory: ./common-utils
run: ./gradlew publishToMavenLocal -Dopensearch.version=1.0.0
Expand Down
23 changes: 19 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.function.Predicate
buildscript {
ext {
opensearch_version = System.getProperty("opensearch.version", "1.0.0")
kotlin_version = System.getProperty("kotlin.version", "1.3.72")
kotlin_version = System.getProperty("kotlin.version", "1.4.0")
}

repositories {
Expand Down Expand Up @@ -151,12 +151,12 @@ version = "${opensearchVersion}.0"
dependencies {
compileOnly "org.opensearch:opensearch:${opensearch_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:1.0.0.0"
compile group: 'commons-codec', name: 'commons-codec', version: '1.13'
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9'
compile "org.jetbrains:annotations:13.0"
compile "org.opensearch:notification:1.0.0.0"
compile "org.opensearch:common-utils:1.0.0.0"
compile "org.opensearch:common-utils:1.1.0.0"
compile "com.github.seancfoley:ipaddress:5.3.3"

testCompile "org.opensearch.test:framework:${opensearch_version}"
Expand Down Expand Up @@ -261,6 +261,14 @@ testClusters.integTest {
File getAsFile() { fileTree("src/test/resources/job-scheduler").getSingleFile() }
}
}))

plugin(provider({
new RegularFile() {
@Override
File getAsFile() { fileTree("src/test/resources/notifications").getSingleFile() }
}
}))

if (securityEnabled) {
plugin(provider({
new RegularFile() {
Expand Down Expand Up @@ -353,6 +361,13 @@ testClusters.mixedCluster {
}
}))

plugin(provider({
new RegularFile() {
@Override
File getAsFile() { fileTree("src/test/resources/notifications").getSingleFile() }
}
}))

if (mixedClusterFlag && node.name == "mixedCluster-1") {
node.plugin(provider({
new RegularFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.action.Action
import org.opensearch.indexmanagement.indexstatemanagement.model.ErrorNotification
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
Expand Down Expand Up @@ -93,6 +94,8 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.isSafeToChange
import org.opensearch.indexmanagement.indexstatemanagement.util.isSuccessfulDelete
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.publishLegacyNotification
import org.opensearch.indexmanagement.indexstatemanagement.util.sendNotification
import org.opensearch.indexmanagement.indexstatemanagement.util.shouldBackoff
import org.opensearch.indexmanagement.indexstatemanagement.util.shouldChangePolicy
import org.opensearch.indexmanagement.indexstatemanagement.util.updateDisableManagedIndexRequest
Expand Down Expand Up @@ -734,9 +737,9 @@ object ManagedIndexRunner :
private suspend fun publishErrorNotification(policy: Policy, managedIndexMetaData: ManagedIndexMetaData) {
policy.errorNotification?.run {
errorNotificationRetryPolicy.retry(logger) {
withContext(Dispatchers.IO) {
destination.publish(null, compileTemplate(messageTemplate, managedIndexMetaData), hostDenyList)
}
val compiledMessage = compileTemplate(messageTemplate, managedIndexMetaData)
destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client)
channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,47 +35,58 @@ import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination
import org.opensearch.script.Script
import java.io.IOException

data class ErrorNotification(
val destination: Destination,
val destination: Destination?,
val channel: Channel?,
val messageTemplate: Script
) : ToXContentObject, Writeable {

init {
require(destination != null || channel != null) { "ErrorNotification must contain a destination or channel" }
require(destination == null || channel == null) { "ErrorNotification can only contain a single destination or channel" }
require(messageTemplate.lang == MUSTACHE) { "ErrorNotification message template must be a mustache script" }
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(DESTINATION_FIELD, destination)
builder.startObject()
if (destination != null) builder.field(DESTINATION_FIELD, destination)
if (channel != null) builder.field(CHANNEL_FIELD, channel)
return builder
.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
.endObject()
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
Destination(sin),
sin.readOptionalWriteable(::Destination),
sin.readOptionalWriteable(::Channel),
Script(sin)
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
destination.writeTo(out)
out.writeOptionalWriteable(destination)
out.writeOptionalWriteable(channel)
messageTemplate.writeTo(out)
}

companion object {
const val DESTINATION_FIELD = "destination"
const val CHANNEL_FIELD = "channel"
const val MESSAGE_TEMPLATE_FIELD = "message_template"
const val MUSTACHE = "mustache"
const val CHANNEL_TITLE = "Index Management-ISM-Error Notification"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): ErrorNotification {
var destination: Destination? = null
var channel: Channel? = null
var messageTemplate: Script? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
Expand All @@ -84,14 +95,16 @@ data class ErrorNotification(
xcp.nextToken()

when (fieldName) {
DESTINATION_FIELD -> destination = Destination.parse(xcp)
DESTINATION_FIELD -> destination = if (xcp.currentToken() == Token.VALUE_NULL) null else Destination.parse(xcp)
CHANNEL_FIELD -> channel = if (xcp.currentToken() == Token.VALUE_NULL) null else Channel.parse(xcp)
MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ErrorNotification.")
}
}

return ErrorNotification(
destination = requireNotNull(destination) { "ErrorNotification destination is null" },
destination = destination,
channel = channel,
messageTemplate = requireNotNull(messageTemplate) { "ErrorNotification message template is null" }
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,31 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.action.Action
import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import java.io.IOException

data class NotificationActionConfig(
val destination: Destination,
val destination: Destination?,
val channel: Channel?,
val messageTemplate: Script,
val index: Int
) : ToXContentObject, ActionConfig(ActionType.NOTIFICATION, index) {

init {
require(destination != null || channel != null) { "Notification must contain a destination or channel" }
require(destination == null || channel == null) { "Notification can only contain a single destination or channel" }
require(messageTemplate.lang == MUSTACHE) { "Notification message template must be a mustache script" }
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
super.toXContent(builder, params).startObject(ActionType.NOTIFICATION.type)
builder.field(DESTINATION_FIELD, destination)
.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
if (destination != null) builder.field(DESTINATION_FIELD, destination)
if (channel != null) builder.field(CHANNEL_FIELD, channel)
builder.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
.endObject()
.endObject()
return builder
Expand All @@ -77,28 +82,32 @@ data class NotificationActionConfig(

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
destination = Destination(sin),
destination = sin.readOptionalWriteable(::Destination),
channel = sin.readOptionalWriteable(::Channel),
messageTemplate = Script(sin),
index = sin.readInt()
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
destination.writeTo(out)
out.writeOptionalWriteable(destination)
out.writeOptionalWriteable(channel)
messageTemplate.writeTo(out)
out.writeInt(index)
}

companion object {
const val DESTINATION_FIELD = "destination"
const val CHANNEL_FIELD = "channel"
const val MESSAGE_TEMPLATE_FIELD = "message_template"
const val MUSTACHE = "mustache"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser, index: Int): NotificationActionConfig {
var destination: Destination? = null
var channel: Channel? = null
var messageTemplate: Script? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
Expand All @@ -108,13 +117,15 @@ data class NotificationActionConfig(

when (fieldName) {
DESTINATION_FIELD -> destination = Destination.parse(xcp)
CHANNEL_FIELD -> channel = Channel.parse(xcp)
MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in NotificationActionConfig.")
}
}

return NotificationActionConfig(
destination = requireNotNull(destination) { "NotificationActionConfig destination is null" },
destination = destination,
channel = channel,
messageTemplate = requireNotNull(messageTemplate) { "NotificationActionConfig message template is null" },
index = index
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indexmanagement.indexstatemanagement.model.destination

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException

data class Channel(val id: String) : ToXContent, Writeable {

init {
require(id.isNotEmpty()) { "Channel ID cannot be empty" }
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(ID, id)
.endObject()
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString()
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
}

companion object {
const val ID = "id"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): Channel {
var id: String? = null

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
ID -> id = xcp.text()
else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing Channel destination")
}
}
}

return Channel(requireNotNull(id) { "Channel ID is null" })
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@ import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.opensearchapi.string
import java.io.IOException
import java.lang.IllegalStateException

Expand Down Expand Up @@ -90,12 +87,8 @@ data class Chime(val url: String) : ToXContent, Writeable {
}
}

fun constructMessageContent(subject: String?, message: String?): String {
val messageContent: String? = if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message"
val builder = XContentFactory.contentBuilder(XContentType.JSON)
builder.startObject()
.field("Content", messageContent)
.endObject()
return builder.string()
// Complete JSON structure is now constructed in the notification plugin
fun constructMessageContent(subject: String?, message: String): String {
return if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message"
}
}
Loading