Skip to content

[ML-FIB] This is a temporary (squashed) commit of https://github.com/elastic/e… #32776

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions x-pack/docs/en/rest-api/rollup/get-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Which will yield the following response:
"stats" : {
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"documents_indexed" : 0,
"trigger_count" : 0
}
}
Expand Down Expand Up @@ -219,7 +219,7 @@ Which will yield the following response:
"stats" : {
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"documents_indexed" : 0,
"trigger_count" : 0
}
},
Expand Down Expand Up @@ -268,7 +268,7 @@ Which will yield the following response:
"stats" : {
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"documents_indexed" : 0,
"trigger_count" : 0
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.rollup.job;
package org.elasticsearch.xpack.core.indexing;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -13,7 +13,6 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

Expand All @@ -24,45 +23,46 @@
* and are only for external monitoring/reference. Statistics are not persisted with the job, so if the
* allocated task is shutdown/restarted on a different node all the stats will reset.
*/
public class RollupJobStats implements ToXContentObject, Writeable {
public class IndexerJobStats implements ToXContentObject, Writeable {

public static final ParseField NAME = new ParseField("job_stats");

private static ParseField NUM_PAGES = new ParseField("pages_processed");
private static ParseField NUM_DOCUMENTS = new ParseField("documents_processed");
private static ParseField NUM_ROLLUPS = new ParseField("rollups_indexed");
private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
// BWC for RollupJobStats
private static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("documents_indexed").withDeprecation("rollups_indexed");
private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");

private long numPages = 0;
private long numDocuments = 0;
private long numRollups = 0;
private long numInputDocuments = 0;
private long numOuputDocuments = 0;
private long numInvocations = 0;

public static final ConstructingObjectParser<RollupJobStats, Void> PARSER =
public static final ConstructingObjectParser<IndexerJobStats, Void> PARSER =
new ConstructingObjectParser<>(NAME.getPreferredName(),
args -> new RollupJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
args -> new IndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));

static {
PARSER.declareLong(constructorArg(), NUM_PAGES);
PARSER.declareLong(constructorArg(), NUM_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_ROLLUPS);
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
}

public RollupJobStats() {
public IndexerJobStats() {
}

public RollupJobStats(long numPages, long numDocuments, long numRollups, long numInvocations) {
public IndexerJobStats(long numPages, long numDocuments, long numOuputDocuments, long numInvocations) {
this.numPages = numPages;
this.numDocuments = numDocuments;
this.numRollups = numRollups;
this.numInputDocuments = numDocuments;
this.numOuputDocuments = numOuputDocuments;
this.numInvocations = numInvocations;
}

public RollupJobStats(StreamInput in) throws IOException {
public IndexerJobStats(StreamInput in) throws IOException {
this.numPages = in.readVLong();
this.numDocuments = in.readVLong();
this.numRollups = in.readVLong();
this.numInputDocuments = in.readVLong();
this.numOuputDocuments = in.readVLong();
this.numInvocations = in.readVLong();
}

Expand All @@ -71,15 +71,15 @@ public long getNumPages() {
}

public long getNumDocuments() {
return numDocuments;
return numInputDocuments;
}

public long getNumInvocations() {
return numInvocations;
}

public long getNumRollups() {
return numRollups;
public long getOutputDocuments() {
return numOuputDocuments;
}

public void incrementNumPages(long n) {
Expand All @@ -89,28 +89,28 @@ public void incrementNumPages(long n) {

public void incrementNumDocuments(long n) {
assert(n >= 0);
numDocuments += n;
numInputDocuments += n;
}

public void incrementNumInvocations(long n) {
assert(n >= 0);
numInvocations += n;
}

public void incrementNumRollups(long n) {
public void incrementNumOutputDocuments(long n) {
assert(n >= 0);
numRollups += n;
numOuputDocuments += n;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(numPages);
out.writeVLong(numDocuments);
out.writeVLong(numRollups);
out.writeVLong(numInputDocuments);
out.writeVLong(numOuputDocuments);
out.writeVLong(numInvocations);
}

public static RollupJobStats fromXContent(XContentParser parser) {
public static IndexerJobStats fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
} catch (IOException e) {
Expand All @@ -122,8 +122,8 @@ public static RollupJobStats fromXContent(XContentParser parser) {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NUM_PAGES.getPreferredName(), numPages);
builder.field(NUM_DOCUMENTS.getPreferredName(), numDocuments);
builder.field(NUM_ROLLUPS.getPreferredName(), numRollups);
builder.field(NUM_INPUT_DOCUMENTS.getPreferredName(), numInputDocuments);
builder.field(NUM_OUTPUT_DOCUMENTS.getPreferredName(), numOuputDocuments);
builder.field(NUM_INVOCATIONS.getPreferredName(), numInvocations);
builder.endObject();
return builder;
Expand All @@ -139,18 +139,16 @@ public boolean equals(Object other) {
return false;
}

RollupJobStats that = (RollupJobStats) other;
IndexerJobStats that = (IndexerJobStats) other;

return Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numDocuments, that.numDocuments)
&& Objects.equals(this.numRollups, that.numRollups)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations);
}

@Override
public int hashCode() {
return Objects.hash(numPages, numDocuments, numRollups, numInvocations);
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations);
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.rollup.job;
package org.elasticsearch.xpack.core.indexing;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.indexing;

import org.elasticsearch.action.index.IndexRequest;

import java.util.List;

/**
* Result object to hold the result of 1 iteration of iterative indexing.
* Acts as an interface between the implementation and the generic indexer.
*/
public class Iteration<JobPosition> {

private final boolean isDone;
private final JobPosition position;
private final List<IndexRequest> toIndex;

/**
* Constructor for the result of 1 iteration.
*
* @param toIndex the list of requests to be indexed
* @param position the extracted, persistable position of the job required for the search phase
* @param isDone true if source is exhausted and job should go to sleep
*
* Note: toIndex.empty() != isDone due to possible filtering in the specific implementation
*/
public Iteration(List<IndexRequest> toIndex, JobPosition position, boolean isDone) {
this.toIndex = toIndex;
this.position = position;
this.isDone = isDone;
}

/**
* Returns true if this indexing iteration is done and job should go into sleep mode.
*/
public boolean isDone() {
return isDone;
}

/**
* Return the position of the job, a generic to be passed to the next query construction.
*
* @return the position
*/
public JobPosition getPosition() {
return position;
}

/**
* List of requests to be passed to bulk indexing.
*
* @return List of index requests.
*/
public List<IndexRequest> getToIndex() {
return toIndex;
}
}
Loading