Skip to content

Commit

Permalink
Integrate notifications backend (#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuali925 authored Oct 7, 2021
1 parent 9b1ecab commit 6d2607b
Show file tree
Hide file tree
Showing 10 changed files with 355 additions and 31 deletions.
5 changes: 4 additions & 1 deletion reports-scheduler/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ integTest {

if (System.getProperty("tests.clustername") != null) {
exclude 'org/opensearch/reportsscheduler/ReportsSchedulerPluginIT.class'
} else {
// assuming that opensearch-notifications will be installed when running integTest against a remote cluster
exclude 'org/opensearch/reportsscheduler/rest/ReportWithNotificationIT.class'
}
}

Expand All @@ -247,7 +250,7 @@ integTest.dependsOn(bundle)
integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))}

testClusters.integTest {
testDistribution = "ARCHIVE"
testDistribution = "INTEG_TEST"
// need to install job-scheduler first, need to assemble job-scheduler first
plugin(provider(new Callable<RegularFile>(){
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
package org.opensearch.reportsscheduler

import org.opensearch.OpenSearchStatusException
import org.opensearch.jobscheduler.spi.JobSchedulerExtension
import org.opensearch.jobscheduler.spi.ScheduledJobParser
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
Expand Down Expand Up @@ -55,6 +56,7 @@ import org.opensearch.reportsscheduler.settings.PluginSettings
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.service.ClusterService
Expand All @@ -69,9 +71,11 @@ import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.repositories.RepositoriesService
import org.opensearch.rest.RestController
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestStatus
import org.opensearch.script.ScriptService
import org.opensearch.threadpool.ThreadPool
import org.opensearch.watcher.ResourceWatcherService
Expand Down Expand Up @@ -118,6 +122,12 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension {
PluginSettings.addSettingsUpdateConsumer(clusterService)
ReportDefinitionsIndex.initialize(client, clusterService)
ReportInstancesIndex.initialize(client, clusterService)
(client as? NodeClient)?.let { NotificationsActions.initialize(it) } ?: run {
throw OpenSearchStatusException(
"Unable to cast client to NodeClient for Notifications call",
RestStatus.INTERNAL_SERVER_ERROR
)
}
return emptyList()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.client.Client
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.indices.InvalidIndexNameException
Expand Down Expand Up @@ -73,20 +74,29 @@ abstract class PluginBaseAction<Request : ActionRequest, Response : ActionRespon
request: Request,
listener: ActionListener<Response>
) {
val userStr: String? = client.threadPool().threadContext.getTransient<String>(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
val userStr: String? =
client.threadPool().threadContext.getTransient<String>(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
val user: User? = User.parse(userStr)
val storedThreadContext = client.threadPool().threadContext.newStoredContext(false)
scope.launch {
try {
listener.onResponse(executeRequest(request, user))
client.threadPool().threadContext.stashContext().use {
storedThreadContext.restore()
listener.onResponse(executeRequest(request, user))
}
} catch (exception: OpenSearchStatusException) {
Metrics.REPORT_EXCEPTIONS_ES_STATUS_EXCEPTION.counter.increment()
log.warn("$LOG_PREFIX:OpenSearchStatusException: message:${exception.message}")
listener.onFailure(exception)
} catch (exception: OpenSearchSecurityException) {
Metrics.REPORT_EXCEPTIONS_ES_SECURITY_EXCEPTION.counter.increment()
log.warn("$LOG_PREFIX:OpenSearchSecurityException:", exception)
listener.onFailure(OpenSearchStatusException("Permissions denied: ${exception.message} - Contact administrator",
RestStatus.FORBIDDEN))
listener.onFailure(
OpenSearchStatusException(
"Permissions denied: ${exception.message} - Contact administrator",
RestStatus.FORBIDDEN
)
)
} catch (exception: VersionConflictEngineException) {
Metrics.REPORT_EXCEPTIONS_VERSION_CONFLICT_ENGINE_EXCEPTION.counter.increment()
log.warn("$LOG_PREFIX:VersionConflictEngineException:", exception)
Expand Down Expand Up @@ -125,4 +135,43 @@ abstract class PluginBaseAction<Request : ActionRequest, Response : ActionRespon
* @return the response to return.
*/
abstract fun executeRequest(request: Request, user: User?): Response

/**
* Executes the given [block] function on this resource and then closes it down correctly whether an exception
* is thrown or not.
*
* In case if the resource is being closed due to an exception occurred in [block], and the closing also fails with an exception,
* the latter is added to the [suppressed][java.lang.Throwable.addSuppressed] exceptions of the former.
*
* @param block a function to process this [AutoCloseable] resource.
* @return the result of [block] function invoked on this resource.
*/
@Suppress("TooGenericExceptionCaught")
private inline fun <T : ThreadContext.StoredContext, R> T.use(block: (T) -> R): R {
var exception: Throwable? = null
try {
return block(this)
} catch (e: Throwable) {
exception = e
throw e
} finally {
closeFinally(exception)
}
}

/**
* Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when
* it's being closed due to some other [cause] exception occurred.
*
* The suppressed exception is added to the list of suppressed exceptions of [cause] exception.
*/
@Suppress("TooGenericExceptionCaught")
private fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) {
null -> close()
else -> try {
close()
} catch (closeException: Throwable) {
cause.addSuppressed(closeException)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

package org.opensearch.reportsscheduler.action

import org.opensearch.OpenSearchStatusException
import org.opensearch.commons.authuser.User
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.index.ReportDefinitionsIndex
Expand All @@ -44,9 +45,9 @@ import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.model.ReportInstance.Status
import org.opensearch.reportsscheduler.model.UpdateReportInstanceStatusRequest
import org.opensearch.reportsscheduler.model.UpdateReportInstanceStatusResponse
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.reportsscheduler.security.UserAccessManager
import org.opensearch.reportsscheduler.util.logger
import org.opensearch.OpenSearchStatusException
import org.opensearch.rest.RestStatus
import java.time.Instant

Expand Down Expand Up @@ -122,6 +123,8 @@ internal object ReportInstanceActions {
Metrics.REPORT_FROM_DEFINITION_ID_SYSTEM_ERROR.counter.increment()
throw OpenSearchStatusException("Report Instance Creation failed", RestStatus.INTERNAL_SERVER_ERROR)
}
if (reportDefinitionDetails.reportDefinition.delivery != null)
NotificationsActions.send(reportDefinitionDetails.reportDefinition.delivery, docId)
val reportInstanceCopy = reportInstance.copy(id = docId)
return OnDemandReportCreateResponse(reportInstanceCopy, true)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.reportsscheduler.notifications

import org.opensearch.action.ActionListener
import org.opensearch.client.node.NodeClient
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.commons.ConfigConstants
import org.opensearch.commons.notifications.NotificationConstants.FEATURE_REPORTS
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.SendNotificationResponse
import org.opensearch.commons.notifications.model.ChannelMessage
import org.opensearch.commons.notifications.model.EventSource
import org.opensearch.commons.notifications.model.SeverityType
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.model.CreateReportDefinitionResponse
import org.opensearch.reportsscheduler.model.ReportDefinition
import org.opensearch.reportsscheduler.util.logger

/**
* Report definitions index operation actions.
*/
internal object NotificationsActions {
private val log by logger(NotificationsActions::class.java)

private lateinit var client: NodeClient

/**
* Initialize the class
* @param client NodeClient for transport call
*/
fun initialize(client: NodeClient) {
this.client = client
}

/**
* Send notifications based on delivery parameter
* @param delivery [ReportDefinition.Delivery] object
* @param referenceId [String] object
* @return [CreateReportDefinitionResponse]
*/
fun send(delivery: ReportDefinition.Delivery, referenceId: String): SendNotificationResponse? {
return send(delivery, referenceId, "")
}

/**
* Send notifications based on delivery parameter
* @param delivery [ReportDefinition.Delivery] object
* @param referenceId [String] object
* @param userStr [String] object,
* @return [CreateReportDefinitionResponse]
*/
fun send(delivery: ReportDefinition.Delivery, referenceId: String, userStr: String?): SendNotificationResponse? {
if (userStr.isNullOrEmpty()) {
return sendNotificationHelper(delivery, referenceId)
}

var sendNotificationResponse: SendNotificationResponse? = null
client.threadPool().threadContext.stashContext().use {
client.threadPool().threadContext.putTransient(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT,
userStr
)
sendNotificationResponse = sendNotificationHelper(delivery, referenceId)
}
return sendNotificationResponse
}

private fun sendNotificationHelper(
delivery: ReportDefinition.Delivery,
referenceId: String
): SendNotificationResponse? {
log.info("$LOG_PREFIX:NotificationsActions-send")
var sendNotificationResponse: SendNotificationResponse? = null
NotificationsPluginInterface.sendNotification(
client,
EventSource(delivery.title, referenceId, FEATURE_REPORTS, SeverityType.INFO),
ChannelMessage(delivery.textDescription, delivery.htmlDescription, null),
delivery.configIds,
object : ActionListener<SendNotificationResponse> {
override fun onResponse(response: SendNotificationResponse) {
sendNotificationResponse = response
log.info("$LOG_PREFIX:NotificationsActions-send:$sendNotificationResponse")
}

override fun onFailure(exception: Exception) {
log.error("$LOG_PREFIX:NotificationsActions-send Error:$exception")
}
}
)
return sendNotificationResponse
}

/**
* Executes the given [block] function on this resource and then closes it down correctly whether an exception
* is thrown or not.
*
* In case if the resource is being closed due to an exception occurred in [block], and the closing also fails with an exception,
* the latter is added to the [suppressed][java.lang.Throwable.addSuppressed] exceptions of the former.
*
* @param block a function to process this [AutoCloseable] resource.
* @return the result of [block] function invoked on this resource.
*/
@Suppress("TooGenericExceptionCaught")
private inline fun <T : ThreadContext.StoredContext, R> T.use(block: (T) -> R): R {
var exception: Throwable? = null
try {
return block(this)
} catch (e: Throwable) {
exception = e
throw e
} finally {
closeFinally(exception)
}
}

/**
* Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when
* it's being closed due to some other [cause] exception occurred.
*
* The suppressed exception is added to the list of suppressed exceptions of [cause] exception.
*/
@Suppress("TooGenericExceptionCaught")
private fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) {
null -> close()
else -> try {
close()
} catch (closeException: Throwable) {
cause.addSuppressed(closeException)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@

package org.opensearch.reportsscheduler.scheduler

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.index.ReportInstancesIndex
import org.opensearch.reportsscheduler.model.ReportDefinitionDetails
import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.reportsscheduler.security.UserAccessManager
import org.opensearch.reportsscheduler.util.logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import java.time.Instant

internal object ReportDefinitionJobRunner : ScheduledJobRunner {
Expand Down Expand Up @@ -68,6 +70,11 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner {
log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance")
} else {
log.info("$LOG_PREFIX:runJob-created job:$id")
if (reportDefinitionDetails.reportDefinition.delivery != null) {
val user = UserAccessManager.getUserFromAccess(job.access)
val userStr = user?.let { it.toString() } ?: ""
NotificationsActions.send(reportDefinitionDetails.reportDefinition.delivery, id, userStr)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,19 @@ internal object UserAccessManager {
fun validateUser(user: User?) {
if (isUserPrivateTenant(user) && user?.name == null) {
Metrics.REPORT_PERMISSION_USER_ERROR.counter.increment()
throw OpenSearchStatusException("User name not provided for private tenant access",
RestStatus.FORBIDDEN)
throw OpenSearchStatusException(
"User name not provided for private tenant access",
RestStatus.FORBIDDEN
)
}
if (PluginSettings.isRbacEnabled()) {
// backend roles must be present
if (user?.backendRoles.isNullOrEmpty()) {
Metrics.REPORT_PERMISSION_USER_ERROR.counter.increment()
throw OpenSearchStatusException("User doesn't have backend roles configured. Contact administrator.",
RestStatus.FORBIDDEN)
throw OpenSearchStatusException(
"User doesn't have backend roles configured. Contact administrator.",
RestStatus.FORBIDDEN
)
}
}
}
Expand Down Expand Up @@ -96,6 +100,19 @@ internal object UserAccessManager {
return retList
}

/**
* Get user object from all user access info.
*/
fun getUserFromAccess(access: List<String>): User? {
if (access.isNullOrEmpty()) {
return null
}
val name = access.find { it.startsWith(USER_TAG) }?.substring(USER_TAG.length)
val backendRoles = access.filter { it.startsWith(ROLE_TAG) }.map { it.substring(ROLE_TAG.length) }
val roles = access.filter { it.startsWith(BACKEND_ROLE_TAG) }.map { it.substring(BACKEND_ROLE_TAG.length) }
return User(name, backendRoles, roles, listOf())
}

/**
* Get access info for search filtering
*/
Expand Down
Loading

0 comments on commit 6d2607b

Please sign in to comment.