Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial commit for remote monitor support #1547

Merged
merged 13 commits into from
May 31, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.common.io.stream.StreamInput
Expand Down Expand Up @@ -240,6 +241,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY,
ChainedAlertTrigger.XCONTENT_REGISTRY,
RemoteMonitorTrigger.XCONTENT_REGISTRY,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also need RemoteMonitorInput xcontent registry?

if yes, can you add appropriate tests to serialize

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workflow.XCONTENT_REGISTRY
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,35 @@
package org.opensearch.alerting;

import org.opensearch.action.support.WriteRequest;
import org.opensearch.alerting.monitor.runners.SampleRemoteDocLevelMonitorRunner;
import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput1;
import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput2;
import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTrigger1;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.commons.alerting.action.AlertingActions;
import org.opensearch.commons.alerting.action.IndexMonitorRequest;
import org.opensearch.commons.alerting.action.IndexMonitorResponse;
import org.opensearch.commons.alerting.model.DataSources;
import org.opensearch.commons.alerting.model.DocLevelMonitorInput;
import org.opensearch.commons.alerting.model.DocLevelQuery;
import org.opensearch.commons.alerting.model.IntervalSchedule;
import org.opensearch.commons.alerting.model.Monitor;
import org.opensearch.commons.alerting.model.action.Action;
import org.opensearch.commons.alerting.model.action.Throttle;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;

import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -52,6 +63,17 @@ public List<Route> routes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String runMonitorParam = restRequest.param("run_monitor");

SampleRemoteMonitorInput1 input1 = new SampleRemoteMonitorInput1("hello", Map.of("test", 1.0f), 1);
BytesStreamOutput out = new BytesStreamOutput();
input1.writeTo(out);
BytesReference input1Serialized = out.bytes();

SampleRemoteMonitorTrigger1 trigger1 = new SampleRemoteMonitorTrigger1("hello", Map.of("test", 1.0f), 1);
BytesStreamOutput outTrigger = new BytesStreamOutput();
trigger1.writeTo(outTrigger);
BytesReference trigger1Serialized = outTrigger.bytes();

Monitor monitor1 = new Monitor(
Monitor.NO_ID,
Monitor.NO_VERSION,
Expand All @@ -63,8 +85,11 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
SampleRemoteMonitorPlugin.SAMPLE_REMOTE_MONITOR1,
null,
0,
List.of(),
List.of(),
List.of(new RemoteMonitorInput(input1Serialized)),
List.of(new RemoteMonitorTrigger("id", "name", "1",
List.of(new Action("name", "destinationId", new Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "Hello World", Map.of()),
new Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "Hello World", Map.of()), false, new Throttle(60, ChronoUnit.MINUTES),
"id", null)), trigger1Serialized)),
Map.of(),
new DataSources(),
"sample-remote-monitor-plugin"
Expand Down Expand Up @@ -106,6 +131,12 @@ public void onFailure(Exception e) {
);
};
} else if (runMonitorParam.equals("multiple")) {
SampleRemoteMonitorInput2 input2 = new SampleRemoteMonitorInput2("hello",
new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of()))));
BytesStreamOutput out1 = new BytesStreamOutput();
input2.writeTo(out1);
BytesReference input1Serialized1 = out1.bytes();

Monitor monitor2 = new Monitor(
Monitor.NO_ID,
Monitor.NO_VERSION,
Expand All @@ -117,7 +148,7 @@ public void onFailure(Exception e) {
SampleRemoteMonitorPlugin.SAMPLE_REMOTE_MONITOR2,
null,
0,
List.of(),
List.of(new RemoteMonitorInput(input1Serialized1)),
List.of(),
Map.of(),
new DataSources(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.monitor.inputs;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Map;

public class SampleRemoteMonitorInput1 implements Writeable {

private String a;

private Map<String, Object> b;

private int c;

public SampleRemoteMonitorInput1(String a, Map<String, Object> b, int c) {
this.a = a;
this.b = b;
this.c = c;
}

public SampleRemoteMonitorInput1(StreamInput sin) throws IOException {
this(
sin.readString(),
sin.readMap(),
sin.readInt()
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(a);
out.writeMap(b);
out.writeInt(c);
}

public int getC() {
return c;
}

public Map<String, Object> getB() {
return b;
}

public String getA() {
return a;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.monitor.inputs;

import org.opensearch.commons.alerting.model.DocLevelMonitorInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;

public class SampleRemoteMonitorInput2 implements Writeable {

private String a;

private DocLevelMonitorInput b;

public SampleRemoteMonitorInput2(String a, DocLevelMonitorInput b) {
this.a = a;
this.b = b;
}

public SampleRemoteMonitorInput2(StreamInput sin) throws IOException {
this(
sin.readString(),
DocLevelMonitorInput.readFrom(sin)
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(a);
b.writeTo(out);
}

public String getA() {
return a;
}

public DocLevelMonitorInput getB() {
return b;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,21 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTriggerRunResult;
import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput1;
import org.opensearch.alerting.monitor.trigger.results.SampleRemoteMonitorTriggerRunResult;
import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTrigger1;
import org.opensearch.alerting.spi.RemoteMonitorRunner;
import org.opensearch.client.Client;
import org.opensearch.commons.alerting.model.Input;
import org.opensearch.commons.alerting.model.InputRunResults;
import org.opensearch.commons.alerting.model.Monitor;
import org.opensearch.commons.alerting.model.MonitorRunResult;
import org.opensearch.commons.alerting.model.Trigger;
import org.opensearch.commons.alerting.model.TriggerRunResult;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.transport.TransportService;

import java.time.Instant;
Expand Down Expand Up @@ -57,17 +65,45 @@ public MonitorRunResult<TriggerRunResult> runMonitor(
String executionId,
TransportService transportService
) {
IndexRequest indexRequest = new IndexRequest(SAMPLE_MONITOR_RUNNER1_INDEX)
.source(Map.of("sample", "record")).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest);
try {
BytesReference customInputSerialized = null;
Input input = monitor.getInputs().get(0);
if (input instanceof RemoteMonitorInput) {
customInputSerialized = ((RemoteMonitorInput) input).getInput();
}
StreamInput sin = StreamInput.wrap(customInputSerialized.toBytesRef().bytes);
SampleRemoteMonitorInput1 remoteMonitorInput = new SampleRemoteMonitorInput1(sin);

BytesReference customTriggerSerialized = null;
Trigger trigger = monitor.getTriggers().get(0);
if (trigger instanceof RemoteMonitorTrigger) {
customTriggerSerialized = ((RemoteMonitorTrigger) trigger).getTrigger();
}
StreamInput triggerSin = StreamInput.wrap(customTriggerSerialized.toBytesRef().bytes);
SampleRemoteMonitorTrigger1 remoteMonitorTrigger = new SampleRemoteMonitorTrigger1(triggerSin);

IndexRequest indexRequest = new IndexRequest(SAMPLE_MONITOR_RUNNER1_INDEX)
.source(Map.of(remoteMonitorInput.getA(), remoteMonitorTrigger.getC())).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest);

return new MonitorRunResult<>(
monitor.getName(),
periodStart,
periodEnd,
null,
new InputRunResults(),
Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", null, Map.of()))
);
} catch (Exception ex) {
return new MonitorRunResult<>(
monitor.getName(),
periodStart,
periodEnd,
ex,
new InputRunResults(),
Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", ex, Map.of()))
);
}

return new MonitorRunResult<>(
monitor.getName(),
periodStart,
periodEnd,
null,
new InputRunResults(),
Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", null, Map.of()))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTriggerRunResult;
import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput2;
import org.opensearch.alerting.monitor.trigger.results.SampleRemoteMonitorTriggerRunResult;
import org.opensearch.alerting.spi.RemoteMonitorRunner;
import org.opensearch.client.Client;
import org.opensearch.commons.alerting.model.Input;
import org.opensearch.commons.alerting.model.InputRunResults;
import org.opensearch.commons.alerting.model.Monitor;
import org.opensearch.commons.alerting.model.MonitorRunResult;
import org.opensearch.commons.alerting.model.TriggerRunResult;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.transport.TransportService;

import java.time.Instant;
Expand Down Expand Up @@ -57,17 +62,37 @@ public MonitorRunResult<TriggerRunResult> runMonitor(
String executionId,
TransportService transportService
) {
IndexRequest indexRequest = new IndexRequest(SAMPLE_MONITOR_RUNNER2_INDEX)
.source(Map.of("sample", "record")).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest);
try {
BytesReference customInputSerialized = null;
Input input = monitor.getInputs().get(0);
if (input instanceof RemoteMonitorInput) {
customInputSerialized = ((RemoteMonitorInput) input).getInput();
}
StreamInput sin = StreamInput.wrap(customInputSerialized.toBytesRef().bytes);
SampleRemoteMonitorInput2 remoteMonitorInput = new SampleRemoteMonitorInput2(sin);

IndexRequest indexRequest = new IndexRequest(SAMPLE_MONITOR_RUNNER2_INDEX)
.source(Map.of(remoteMonitorInput.getB().name(), remoteMonitorInput.getB().getQueries().get(0).getQuery()))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest);

return new MonitorRunResult<>(
monitor.getName(),
periodStart,
periodEnd,
null,
new InputRunResults(),
Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", null, Map.of()))
);
return new MonitorRunResult<>(
monitor.getName(),
periodStart,
periodEnd,
null,
new InputRunResults(),
Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", null, Map.of()))
);
} catch (Exception ex) {
return new MonitorRunResult<>(
monitor.getName(),
periodStart,
periodEnd,
ex,
new InputRunResults(),
Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", ex, Map.of()))
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.monitor.triggers;
package org.opensearch.alerting.monitor.trigger.results;

import org.opensearch.commons.alerting.model.ActionRunResult;
import org.opensearch.commons.alerting.model.TriggerRunResult;
Expand Down
Loading
Loading