Skip to content

Commit

Permalink
[ML] Data Frame HLRC Get Stats API (#40443)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle authored Mar 26, 2019
1 parent cd4eac5 commit 1354696
Show file tree
Hide file tree
Showing 27 changed files with 1,310 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsRequest;
import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse;
import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PreviewDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
Expand Down Expand Up @@ -82,6 +84,46 @@ public void putDataFrameTransformAsync(PutDataFrameTransformRequest request, Req
Collections.emptySet());
}

/**
* Get the running statistics of a Data Frame Transform
* <p>
* For additional info
* see <a href="https://www.TODO.com">Get Data Frame transform stats documentation</a>
*
* @param request Specifies the which transforms to get the stats for
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return The Data Frame Transform stats
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public GetDataFrameTransformStatsResponse getDataFrameTransformStats(GetDataFrameTransformStatsRequest request, RequestOptions options)
throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
DataFrameRequestConverters::getDataFrameTransformStats,
options,
GetDataFrameTransformStatsResponse::fromXContent,
Collections.emptySet());
}

/**
* Get the running statistics of a Data Frame Transform asynchronously and notifies listener on completion
* <p>
* For additional info
* see <a href="https://www.TODO.com">Get Data Frame transform stats documentation</a>
*
* @param request Specifies the which transforms to get the stats for
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void getDataFrameTransformStatsAsync(GetDataFrameTransformStatsRequest request, RequestOptions options,
ActionListener<GetDataFrameTransformStatsResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
DataFrameRequestConverters::getDataFrameTransformStats,
options,
GetDataFrameTransformStatsResponse::fromXContent,
listener,
Collections.emptySet());
}

/**
* Delete a data frame transform
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package org.elasticsearch.client;

import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsRequest;
import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
Expand Down Expand Up @@ -94,4 +96,13 @@ static Request previewDataFrameTransform(PreviewDataFrameTransformRequest previe
request.setEntity(createEntity(previewRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request getDataFrameTransformStats(GetDataFrameTransformStatsRequest statsRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_data_frame", "transforms")
.addPathPart(statsRequest.getId())
.addPathPartAsIs("_stats")
.build();
return new Request(HttpGet.METHOD_NAME, endpoint);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.core;

import org.elasticsearch.common.ParseField;

import java.util.Objects;

public abstract class IndexerJobStats {
public static final String NAME = "data_frame_indexer_transform_stats";
public static ParseField NUM_PAGES = new ParseField("pages_processed");
public static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
public static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("documents_indexed");
public static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
public static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
public static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
public static ParseField INDEX_TOTAL = new ParseField("index_total");
public static ParseField SEARCH_TOTAL = new ParseField("search_total");
public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
public static ParseField INDEX_FAILURES = new ParseField("index_failures");

private final long numPages;
private final long numInputDocuments;
private final long numOuputDocuments;
private final long numInvocations;
private final long indexTime;
private final long indexTotal;
private final long searchTime;
private final long searchTotal;
private final long indexFailures;
private final long searchFailures;

public IndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures, long searchFailures) {
this.numPages = numPages;
this.numInputDocuments = numInputDocuments;
this.numOuputDocuments = numOuputDocuments;
this.numInvocations = numInvocations;
this.indexTime = indexTime;
this.indexTotal = indexTotal;
this.searchTime = searchTime;
this.searchTotal = searchTotal;
this.indexFailures = indexFailures;
this.searchFailures = searchFailures;
}

/**
* The number of pages read from the input indices
*/
public long getNumPages() {
return numPages;
}

/**
* The number of documents read from the input indices
*/
public long getNumDocuments() {
return numInputDocuments;
}

/**
* Number of times that the job woke up to write documents
*/
public long getNumInvocations() {
return numInvocations;
}

/**
* Number of documents written
*/
public long getOutputDocuments() {
return numOuputDocuments;
}

/**
* Number of index failures that have occurred
*/
public long getIndexFailures() {
return indexFailures;
}

/**
* Number of failures that have occurred
*/
public long getSearchFailures() {
return searchFailures;
}

/**
* Returns the time spent indexing (cumulative) in milliseconds
*/
public long getIndexTime() {
return indexTime;
}

/**
* Returns the time spent searching (cumulative) in milliseconds
*/
public long getSearchTime() {
return searchTime;
}

/**
* Returns the total number of indexing requests that have been processed
* (Note: this is not the number of _documents_ that have been indexed)
*/
public long getIndexTotal() {
return indexTotal;
}

/**
* Returns the total number of search requests that have been made
*/
public long getSearchTotal() {
return searchTotal;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other instanceof IndexerJobStats == false) {
return false;
}

IndexerJobStats that = (IndexerJobStats) other;
return Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.indexTotal, that.indexTotal);
}

@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, searchTotal, indexTotal);
}

@Override
public final String toString() {
return "{pages=" + numPages
+ ", input_docs=" + numInputDocuments
+ ", output_docs=" + numOuputDocuments
+ ", invocations=" + numInvocations
+ ", index_failures=" + indexFailures
+ ", search_failures=" + searchFailures
+ ", index_time_in_ms=" + indexTime
+ ", index_total=" + indexTotal
+ ", search_time_in_ms=" + searchTime
+ ", search_total=" + searchTotal+ "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.core;


import java.util.Locale;

/**
* IndexerState represents the internal state of the indexer. It
* is also persistent when changing from started/stopped in case the allocated
* task is restarted elsewhere.
*/
public enum IndexerState {
/** Indexer is running, but not actively indexing data (e.g. it's idle). */
STARTED,

/** Indexer is actively indexing data. */
INDEXING,

/**
* Transition state to where an indexer has acknowledged the stop
* but is still in process of halting.
*/
STOPPING,

/** Indexer is "paused" and ignoring scheduled triggers. */
STOPPED,

/**
* Something (internal or external) has requested the indexer abort
* and shutdown.
*/
ABORTING;

public static IndexerState fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
}

public String value() {
return name().toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.client.core;
package org.elasticsearch.client.dataframe;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.TaskOperationFailure;
Expand All @@ -26,7 +26,6 @@
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -36,8 +35,8 @@

public class AcknowledgedTasksResponse {

protected static final ParseField TASK_FAILURES = new ParseField("task_failures");
protected static final ParseField NODE_FAILURES = new ParseField("node_failures");
public static final ParseField TASK_FAILURES = new ParseField("task_failures");
public static final ParseField NODE_FAILURES = new ParseField("node_failures");

@SuppressWarnings("unchecked")
protected static <T extends AcknowledgedTasksResponse> ConstructingObjectParser<T, Void> generateParser(
Expand All @@ -60,8 +59,8 @@ protected static <T extends AcknowledgedTasksResponse> ConstructingObjectParser<
public AcknowledgedTasksResponse(boolean acknowledged, @Nullable List<TaskOperationFailure> taskFailures,
@Nullable List<? extends ElasticsearchException> nodeFailures) {
this.acknowledged = acknowledged;
this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(taskFailures));
this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(nodeFailures));
this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(taskFailures);
this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(nodeFailures);
}

public boolean isAcknowledged() {
Expand Down
Loading

0 comments on commit 1354696

Please sign in to comment.