Skip to content

[ML] Add an ingest pipeline definition to structure finder #34350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions docs/reference/ml/apis/find-file-structure.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,20 @@ If the request does not encounter errors, you receive the following result:
"type" : "double"
}
},
"ingest_pipeline" : {
"description" : "Ingest pipeline created by file structure finder",
"processors" : [
{
"date" : {
"field" : "tpep_pickup_datetime",
"timezone" : "{{ beat.timezone }}",
"formats" : [
"YYYY-MM-dd HH:mm:ss"
]
}
}
]
},
"field_stats" : {
"DOLocationID" : {
"count" : 19998,
Expand Down Expand Up @@ -1366,6 +1380,33 @@ this:
"type" : "text"
}
},
"ingest_pipeline" : {
"description" : "Ingest pipeline created by file structure finder",
"processors" : [
{
"grok" : {
"field" : "message",
"patterns" : [
"\\[%{TIMESTAMP_ISO8601:timestamp}\\]\\[%{LOGLEVEL:loglevel}.*"
]
}
},
{
"date" : {
"field" : "timestamp",
"timezone" : "{{ beat.timezone }}",
"formats" : [
"ISO8601"
]
}
},
{
"remove" : {
"field" : "timestamp"
}
}
]
},
"field_stats" : {
"loglevel" : {
"count" : 53,
Expand Down Expand Up @@ -1499,6 +1540,33 @@ this:
"type" : "keyword"
}
},
"ingest_pipeline" : {
"description" : "Ingest pipeline created by file structure finder",
"processors" : [
{
"grok" : {
"field" : "message",
"patterns" : [
"\\[%{TIMESTAMP_ISO8601:timestamp}\\]\\[%{LOGLEVEL:loglevel} *\\]\\[%{JAVACLASS:class} *\\] \\[%{HOSTNAME:node}\\] %{JAVALOGMESSAGE:message}"
]
}
},
{
"date" : {
"field" : "timestamp",
"timezone" : "{{ beat.timezone }}",
"formats" : [
"ISO8601"
]
}
},
{
"remove" : {
"field" : "timestamp"
}
}
]
},
"field_stats" : { <2>
"class" : {
"count" : 53,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -103,6 +104,7 @@ public String toString() {
public static final ParseField JAVA_TIMESTAMP_FORMATS = new ParseField("java_timestamp_formats");
public static final ParseField NEED_CLIENT_TIMEZONE = new ParseField("need_client_timezone");
public static final ParseField MAPPINGS = new ParseField("mappings");
public static final ParseField INGEST_PIPELINE = new ParseField("ingest_pipeline");
public static final ParseField FIELD_STATS = new ParseField("field_stats");
public static final ParseField EXPLANATION = new ParseField("explanation");

Expand All @@ -128,6 +130,7 @@ public String toString() {
PARSER.declareStringArray(Builder::setJavaTimestampFormats, JAVA_TIMESTAMP_FORMATS);
PARSER.declareBoolean(Builder::setNeedClientTimezone, NEED_CLIENT_TIMEZONE);
PARSER.declareObject(Builder::setMappings, (p, c) -> new TreeMap<>(p.map()), MAPPINGS);
PARSER.declareObject(Builder::setIngestPipeline, (p, c) -> p.mapOrdered(), INGEST_PIPELINE);
PARSER.declareObject(Builder::setFieldStats, (p, c) -> {
Map<String, FieldStats> fieldStats = new TreeMap<>();
while (p.nextToken() == XContentParser.Token.FIELD_NAME) {
Expand Down Expand Up @@ -157,15 +160,16 @@ public String toString() {
private final String timestampField;
private final boolean needClientTimezone;
private final SortedMap<String, Object> mappings;
private final Map<String, Object> ingestPipeline;
private final SortedMap<String, FieldStats> fieldStats;
private final List<String> explanation;

public FileStructure(int numLinesAnalyzed, int numMessagesAnalyzed, String sampleStart, String charset, Boolean hasByteOrderMarker,
Format format, String multilineStartPattern, String excludeLinesPattern, List<String> columnNames,
Boolean hasHeaderRow, Character delimiter, Character quote, Boolean shouldTrimFields, String grokPattern,
String timestampField, List<String> jodaTimestampFormats, List<String> javaTimestampFormats,
boolean needClientTimezone, Map<String, Object> mappings, Map<String, FieldStats> fieldStats,
List<String> explanation) {
boolean needClientTimezone, Map<String, Object> mappings, Map<String, Object> ingestPipeline,
Map<String, FieldStats> fieldStats, List<String> explanation) {

this.numLinesAnalyzed = numLinesAnalyzed;
this.numMessagesAnalyzed = numMessagesAnalyzed;
Expand All @@ -188,6 +192,7 @@ public FileStructure(int numLinesAnalyzed, int numMessagesAnalyzed, String sampl
(javaTimestampFormats == null) ? null : Collections.unmodifiableList(new ArrayList<>(javaTimestampFormats));
this.needClientTimezone = needClientTimezone;
this.mappings = Collections.unmodifiableSortedMap(new TreeMap<>(mappings));
this.ingestPipeline = (ingestPipeline == null) ? null : Collections.unmodifiableMap(new LinkedHashMap<>(ingestPipeline));
this.fieldStats = Collections.unmodifiableSortedMap(new TreeMap<>(fieldStats));
this.explanation = Collections.unmodifiableList(new ArrayList<>(explanation));
}
Expand All @@ -212,6 +217,7 @@ public FileStructure(StreamInput in) throws IOException {
timestampField = in.readOptionalString();
needClientTimezone = in.readBoolean();
mappings = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap()));
ingestPipeline = in.readBoolean() ? Collections.unmodifiableMap(in.readMap()) : null;
fieldStats = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap(StreamInput::readString, FieldStats::new)));
explanation = Collections.unmodifiableList(in.readList(StreamInput::readString));
}
Expand Down Expand Up @@ -262,6 +268,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(timestampField);
out.writeBoolean(needClientTimezone);
out.writeMap(mappings);
if (ingestPipeline == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeMap(ingestPipeline);
}
out.writeMap(fieldStats, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
out.writeCollection(explanation, StreamOutput::writeString);
}
Expand Down Expand Up @@ -342,6 +354,10 @@ public SortedMap<String, Object> getMappings() {
return mappings;
}

public Map<String, Object> getIngestPipeline() {
return ingestPipeline;
}

public SortedMap<String, FieldStats> getFieldStats() {
return fieldStats;
}
Expand Down Expand Up @@ -397,6 +413,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.field(NEED_CLIENT_TIMEZONE.getPreferredName(), needClientTimezone);
builder.field(MAPPINGS.getPreferredName(), mappings);
if (ingestPipeline != null) {
builder.field(INGEST_PIPELINE.getPreferredName(), ingestPipeline);
}
if (fieldStats.isEmpty() == false) {
builder.startObject(FIELD_STATS.getPreferredName());
for (Map.Entry<String, FieldStats> entry : fieldStats.entrySet()) {
Expand Down Expand Up @@ -476,6 +495,7 @@ public static class Builder {
private List<String> javaTimestampFormats;
private boolean needClientTimezone;
private Map<String, Object> mappings;
private Map<String, Object> ingestPipeline;
private Map<String, FieldStats> fieldStats = Collections.emptyMap();
private List<String> explanation;

Expand Down Expand Up @@ -582,6 +602,11 @@ public Builder setMappings(Map<String, Object> mappings) {
return this;
}

public Builder setIngestPipeline(Map<String, Object> ingestPipeline) {
this.ingestPipeline = ingestPipeline;
return this;
}

public Builder setFieldStats(Map<String, FieldStats> fieldStats) {
this.fieldStats = Objects.requireNonNull(fieldStats);
return this;
Expand Down Expand Up @@ -708,7 +733,8 @@ public FileStructure build() {

return new FileStructure(numLinesAnalyzed, numMessagesAnalyzed, sampleStart, charset, hasByteOrderMarker, format,
multilineStartPattern, excludeLinesPattern, columnNames, hasHeaderRow, delimiter, quote, shouldTrimFields, grokPattern,
timestampField, jodaTimestampFormats, javaTimestampFormats, needClientTimezone, mappings, fieldStats, explanation);
timestampField, jodaTimestampFormats, javaTimestampFormats, needClientTimezone, mappings, ingestPipeline, fieldStats,
explanation);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -74,6 +75,14 @@ public static FileStructure createTestFileStructure() {
}
builder.setMappings(mappings);

if (randomBoolean()) {
Map<String, Object> ingestPipeline = new LinkedHashMap<>();
for (String field : generateRandomStringArray(5, 20, false, false)) {
ingestPipeline.put(field, Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(10)));
}
builder.setMappings(ingestPipeline);
}

if (randomBoolean()) {
Map<String, FieldStats> fieldStats = new TreeMap<>();
for (String field : generateRandomStringArray(5, 20, false, false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,14 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
.collect(Collectors.joining(",")));
}

boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();

structureBuilder.setTimestampField(timeField.v1())
.setJodaTimestampFormats(timeField.v2().jodaTimestampFormats)
.setJavaTimestampFormats(timeField.v2().javaTimestampFormats)
.setNeedClientTimezone(timeField.v2().hasTimezoneDependentParsing())
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, timeField.v1(),
timeField.v2().jodaTimestampFormats, needClientTimeZone))
.setMultilineStartPattern(timeLineRegex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.grok.Grok;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FieldStats;
import org.elasticsearch.xpack.ml.filestructurefinder.TimestampFormatFinder.TimestampMatch;

Expand All @@ -15,6 +16,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -37,6 +39,8 @@ public final class FileStructureUtils {
private static final int KEYWORD_MAX_LEN = 256;
private static final int KEYWORD_MAX_SPACES = 5;

private static final String BEAT_TIMEZONE_FIELD = "beat.timezone";

private FileStructureUtils() {
}

Expand Down Expand Up @@ -306,4 +310,53 @@ static boolean isMoreLikelyTextThanKeyword(String str) {
int length = str.length();
return length > KEYWORD_MAX_LEN || length - str.replaceAll("\\s", "").length() > KEYWORD_MAX_SPACES;
}

/**
* Create an ingest pipeline definition appropriate for the file structure.
* @param grokPattern The Grok pattern used for parsing semi-structured text formats. <code>null</code> for
* fully structured formats.
* @param timestampField The input field containing the timestamp to be parsed into <code>@timestamp</code>.
* <code>null</code> if there is no timestamp.
* @param timestampFormats Timestamp formats to be used for parsing {@code timestampField}.
* May be <code>null</code> if {@code timestampField} is also <code>null</code>.
* @param needClientTimezone Is the timezone of the client supplying data to ingest required to uniquely parse the timestamp?
* @return The ingest pipeline definition, or <code>null</code> if none is required.
*/
public static Map<String, Object> makeIngestPipelineDefinition(String grokPattern, String timestampField, List<String> timestampFormats,
boolean needClientTimezone) {

if (grokPattern == null && timestampField == null) {
return null;
}

Map<String, Object> pipeline = new LinkedHashMap<>();
pipeline.put(Pipeline.DESCRIPTION_KEY, "Ingest pipeline created by file structure finder");

List<Map<String, Object>> processors = new ArrayList<>();

if (grokPattern != null) {
Map<String, Object> grokProcessorSettings = new LinkedHashMap<>();
grokProcessorSettings.put("field", "message");
grokProcessorSettings.put("patterns", Collections.singletonList(grokPattern));
processors.add(Collections.singletonMap("grok", grokProcessorSettings));
}

if (timestampField != null) {
Map<String, Object> dateProcessorSettings = new LinkedHashMap<>();
dateProcessorSettings.put("field", timestampField);
if (needClientTimezone) {
dateProcessorSettings.put("timezone", "{{ " + BEAT_TIMEZONE_FIELD + " }}");
}
dateProcessorSettings.put("formats", timestampFormats);
processors.add(Collections.singletonMap("date", dateProcessorSettings));
}

// This removes the interim timestamp field used for semi-structured text formats
if (grokPattern != null && timestampField != null) {
processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", timestampField)));
}

pipeline.put(Pipeline.PROCESSORS_KEY, processors);
return pipeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@ static JsonFileStructureFinder makeJsonFileStructureFinder(List<String> explanat
Tuple<String, TimestampMatch> timeField =
FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides, timeoutChecker);
if (timeField != null) {
boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();

structureBuilder.setTimestampField(timeField.v1())
.setJodaTimestampFormats(timeField.v2().jodaTimestampFormats)
.setJavaTimestampFormats(timeField.v2().javaTimestampFormats)
.setNeedClientTimezone(timeField.v2().hasTimezoneDependentParsing());
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, timeField.v1(),
timeField.v2().jodaTimestampFormats, needClientTimeZone));
}

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,16 @@ static TextLogFileStructureFinder makeTextLogFileStructureFinder(List<String> ex
}
}

boolean needClientTimeZone = bestTimestamp.v1().hasTimezoneDependentParsing();

FileStructure structure = structureBuilder
.setTimestampField(interimTimestampField)
.setJodaTimestampFormats(bestTimestamp.v1().jodaTimestampFormats)
.setJavaTimestampFormats(bestTimestamp.v1().javaTimestampFormats)
.setNeedClientTimezone(bestTimestamp.v1().hasTimezoneDependentParsing())
.setNeedClientTimezone(needClientTimeZone)
.setGrokPattern(grokPattern)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern, interimTimestampField,
bestTimestamp.v1().jodaTimestampFormats, needClientTimeZone))
.setMappings(mappings)
.setFieldStats(fieldStats)
.setExplanation(explanation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,14 @@ static XmlFileStructureFinder makeXmlFileStructureFinder(List<String> explanatio
Tuple<String, TimestampMatch> timeField =
FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides, timeoutChecker);
if (timeField != null) {
boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();

structureBuilder.setTimestampField(timeField.v1())
.setJodaTimestampFormats(timeField.v2().jodaTimestampFormats)
.setJavaTimestampFormats(timeField.v2().javaTimestampFormats)
.setNeedClientTimezone(timeField.v2().hasTimezoneDependentParsing());
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, topLevelTag + "." + timeField.v1(),
timeField.v2().jodaTimestampFormats, needClientTimeZone));
}

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
Expand Down
Loading