Skip to content

Commit c63f95e

Browse files
authored
[ML] [Data Frame] adding and modifying auditor messages (#42722)
* [ML] [Data Frame] adding and modifying auditor messages * Update DataFrameTransformTask.java
1 parent 7970526 commit c63f95e

File tree

4 files changed

+28
-14
lines changed

4 files changed

+28
-14
lines changed

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,24 @@
2323
import org.elasticsearch.transport.TransportService;
2424
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
2525
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Request;
26+
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
2627
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
2728

2829
import java.io.IOException;
2930

3031
public class TransportDeleteDataFrameTransformAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
3132

3233
private final DataFrameTransformsConfigManager transformsConfigManager;
34+
private final DataFrameAuditor auditor;
3335

3436
@Inject
3537
public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool,
3638
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
37-
DataFrameTransformsConfigManager transformsConfigManager) {
39+
DataFrameTransformsConfigManager transformsConfigManager, DataFrameAuditor auditor) {
3840
super(DeleteDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters,
3941
Request::new, indexNameExpressionResolver);
4042
this.transformsConfigManager = transformsConfigManager;
43+
this.auditor = auditor;
4144
}
4245

4346
@Override
@@ -65,7 +68,10 @@ protected void masterOperation(Request request, ClusterState state, ActionListen
6568
} else {
6669
// Task is not running, delete the configuration document
6770
transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(
68-
r -> listener.onResponse(new AcknowledgedResponse(r)),
71+
r -> {
72+
auditor.info(request.getId(), "Deleted data frame transform.");
73+
listener.onResponse(new AcknowledgedResponse(r));
74+
},
6975
listener::onFailure));
7076
}
7177
}

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
4747
import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges;
4848
import org.elasticsearch.xpack.core.security.support.Exceptions;
49+
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
4950
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
5051
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
5152

@@ -65,19 +66,22 @@ public class TransportPutDataFrameTransformAction
6566
private final Client client;
6667
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
6768
private final SecurityContext securityContext;
69+
private final DataFrameAuditor auditor;
6870

6971
@Inject
7072
public TransportPutDataFrameTransformAction(Settings settings, TransportService transportService, ThreadPool threadPool,
7173
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
7274
ClusterService clusterService, XPackLicenseState licenseState,
73-
DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client) {
75+
DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client,
76+
DataFrameAuditor auditor) {
7477
super(PutDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters,
7578
PutDataFrameTransformAction.Request::new, indexNameExpressionResolver);
7679
this.licenseState = licenseState;
7780
this.client = client;
7881
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
7982
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
8083
new SecurityContext(settings, threadPool.getThreadContext()) : null;
84+
this.auditor = auditor;
8185
}
8286

8387
@Override
@@ -234,7 +238,10 @@ private void putDataFrame(DataFrameTransformConfig config, ActionListener<Acknow
234238

235239
// <5> Return the listener, or clean up destination index on failure.
236240
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(
237-
putTransformConfigurationResult -> listener.onResponse(new AcknowledgedResponse(true)),
241+
putTransformConfigurationResult -> {
242+
auditor.info(config.getId(), "Created data frame transform.");
243+
listener.onResponse(new AcknowledgedResponse(true));
244+
},
238245
listener::onFailure
239246
);
240247

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,10 @@ protected void masterOperation(StartDataFrameTransformAction.Request request,
184184

185185
if(dest.length == 0) {
186186
auditor.info(request.getId(),
187-
"Could not find destination index [" + destinationIndex + "]." +
188-
" Creating index with deduced mappings.");
187+
"Creating destination index [" + destinationIndex + "] with deduced mappings.");
189188
createDestinationIndex(config, createOrGetIndexListener);
190189
} else {
191-
auditor.info(request.getId(), "Destination index [" + destinationIndex + "] already exists.");
190+
auditor.info(request.getId(), "Using existing destination index [" + destinationIndex + "].");
192191
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
193192
ClientHelper.DATA_FRAME_ORIGIN,
194193
client.admin()

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,8 @@ public synchronized void start(Long startingCheckpoint, ActionListener<Response>
213213
logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
214214
persistStateToClusterState(state, ActionListener.wrap(
215215
task -> {
216-
auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
216+
auditor.info(transform.getId(),
217+
"Updated data frame transform state to [" + state.getTaskState() + "].");
217218
long now = System.currentTimeMillis();
218219
// kick off the indexer
219220
triggered(new Event(schedulerJobName(), now, now));
@@ -293,10 +294,9 @@ void persistStateToClusterState(DataFrameTransformState state,
293294
synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
294295
taskState.set(DataFrameTransformTaskState.FAILED);
295296
stateReason.set(reason);
297+
auditor.error(transform.getId(), reason);
296298
persistStateToClusterState(getState(), ActionListener.wrap(
297-
r -> {
298-
listener.onResponse(null);
299-
},
299+
r -> listener.onResponse(null),
300300
listener::onFailure
301301
));
302302
}
@@ -560,6 +560,8 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi
560560
},
561561
statsExc -> {
562562
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
563+
auditor.warning(getJobId(),
564+
"Failure updating stats of transform: " + statsExc.getMessage());
563565
next.run();
564566
}
565567
));
@@ -588,7 +590,7 @@ protected void onFinish(ActionListener<Void> listener) {
588590
try {
589591
super.onFinish(listener);
590592
long checkpoint = transformTask.currentCheckpoint.incrementAndGet();
591-
auditor.info(transformTask.getTransformId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "]");
593+
auditor.info(transformTask.getTransformId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "].");
592594
logger.info(
593595
"Finished indexing for data frame transform [" + transformTask.getTransformId() + "] checkpoint [" + checkpoint + "]");
594596
listener.onResponse(null);
@@ -599,14 +601,14 @@ protected void onFinish(ActionListener<Void> listener) {
599601

600602
@Override
601603
protected void onStop() {
602-
auditor.info(transformConfig.getId(), "Indexer has stopped");
604+
auditor.info(transformConfig.getId(), "Data frame transform has stopped.");
603605
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());
604606
transformTask.shutdown();
605607
}
606608

607609
@Override
608610
protected void onAbort() {
609-
auditor.info(transformConfig.getId(), "Received abort request, stopping indexer");
611+
auditor.info(transformConfig.getId(), "Received abort request, stopping data frame transform.");
610612
logger.info("Data frame transform [" + transformConfig.getId() + "] received abort request, stopping indexer");
611613
transformTask.shutdown();
612614
}

0 commit comments

Comments
 (0)