Skip to content

[ML] Improve multiline_start_pattern for CSV in find_file_structure #51737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,20 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
}
}

int maxLinesPerMessage = 1;
List<String> sampleLines = Arrays.asList(sample.split("\n"));
List<String> sampleMessages = new ArrayList<>();
List<Map<String, ?>> sampleRecords = new ArrayList<>();
int prevMessageEndLineNumber = isHeaderInFile ? lineNumbers.get(0) : -1;
int prevMessageEndLineNumber = isHeaderInFile ? lineNumbers.get(0) : 0; // This is an exclusive end
for (int index = isHeaderInFile ? 1 : 0; index < rows.size(); ++index) {
List<String> row = rows.get(index);
int lineNumber = lineNumbers.get(index);
Map<String, String> sampleRecord = new LinkedHashMap<>();
Util.filterListToMap(sampleRecord, columnNames,
trimFields ? row.stream().map(field -> (field == null) ? null : field.trim()).collect(Collectors.toList()) : row);
sampleRecords.add(sampleRecord);
sampleMessages.add(
String.join("\n", sampleLines.subList(prevMessageEndLineNumber + 1, lineNumbers.get(index))));
sampleMessages.add(String.join("\n", sampleLines.subList(prevMessageEndLineNumber, lineNumber)));
maxLinesPerMessage = Math.max(maxLinesPerMessage, lineNumber - prevMessageEndLineNumber);
prevMessageEndLineNumber = lineNumber;
}

Expand All @@ -102,8 +103,7 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
char delimiter = (char) csvPreference.getDelimiterChar();
char quoteChar = csvPreference.getQuoteChar();

Map<String, Object> csvProcessorSettings = makeCsvProcessorSettings("message", columnNamesList, delimiter, quoteChar,
trimFields);
Map<String, Object> csvProcessorSettings = makeCsvProcessorSettings("message", columnNamesList, delimiter, quoteChar, trimFields);

FileStructure.Builder structureBuilder = new FileStructure.Builder(FileStructure.Format.DELIMITED)
.setCharset(charsetName)
Expand All @@ -116,15 +116,17 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
.setQuote(quoteChar)
.setColumnNames(columnNamesList);

String quote = String.valueOf(quoteChar);
String twoQuotes = quote + quote;
String quotePattern = quote.replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1");
String optQuotePattern = quotePattern + "?";
String delimiterPattern =
(delimiter == '\t') ? "\\t" : String.valueOf(delimiter).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1");
if (isHeaderInFile) {
String quote = String.valueOf(quoteChar);
String twoQuotes = quote + quote;
String optQuote = quote.replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + "?";
String delimiterMatcher =
(delimiter == '\t') ? "\\t" : String.valueOf(delimiter).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1");
structureBuilder.setExcludeLinesPattern("^" + Arrays.stream(header)
.map(column -> optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote)
.collect(Collectors.joining(delimiterMatcher)));
.map(column ->
optQuotePattern + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuotePattern)
.collect(Collectors.joining(delimiterPattern)));
}

if (trimFields) {
Expand All @@ -134,28 +136,6 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
Tuple<String, TimestampFormatFinder> timeField = FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides,
timeoutChecker);
if (timeField != null) {
String timeLineRegex = null;
StringBuilder builder = new StringBuilder("^");
// We make the assumption that the timestamp will be on the first line of each record. Therefore, if the
// timestamp is the last column then either our assumption is wrong (and the approach will completely
// break down) or else every record is on a single line and there's no point creating a multiline config.
// This is why the loop excludes the last column.
for (String column : Arrays.asList(columnNames).subList(0, columnNames.length - 1)) {
if (timeField.v1().equals(column)) {
builder.append("\"?");
String simpleTimePattern = timeField.v2().getSimplePattern().pattern();
builder.append(simpleTimePattern.startsWith("\\b") ? simpleTimePattern.substring(2) : simpleTimePattern);
timeLineRegex = builder.toString();
break;
} else {
builder.append(".*?");
if (delimiter == '\t') {
builder.append("\\t");
} else {
builder.append(delimiter);
}
}
}

boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();

Expand All @@ -165,14 +145,15 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings,
mappings, timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone))
.setMultilineStartPattern(timeLineRegex);
.setMultilineStartPattern(makeMultilineStartPattern(explanation, columnNamesList, maxLinesPerMessage, delimiterPattern,
quotePattern, mappings, timeField.v1(), timeField.v2()));

mappings.put(FileStructureUtils.DEFAULT_TIMESTAMP_FIELD, FileStructureUtils.DATE_MAPPING_WITHOUT_FORMAT);
} else {
structureBuilder.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(),
csvProcessorSettings, mappings, null, null, false));
}

if (timeField != null) {
mappings.put(FileStructureUtils.DEFAULT_TIMESTAMP_FIELD, FileStructureUtils.DATE_MAPPING_WITHOUT_FORMAT);
structureBuilder.setMultilineStartPattern(makeMultilineStartPattern(explanation, columnNamesList, maxLinesPerMessage,
delimiterPattern, quotePattern, mappings, null, null));
}

if (mappingsAndFieldStats.v2() != null) {
Expand Down Expand Up @@ -609,4 +590,84 @@ static Map<String, Object> makeCsvProcessorSettings(String field, List<String> t
}
return Collections.unmodifiableMap(csvProcessorSettings);
}

/**
* The multi-line start pattern is based on the first field in the line that is boolean, numeric
* or the detected timestamp, and consists of a pattern matching that field, preceded by wildcards
* to match any prior fields and to match the delimiters in between them.
*
* This is based on the observation that a boolean, numeric or timestamp field will not contain a
* newline.
*
* The approach works best when the chosen field is early in each record, ideally the very first
* field. It doesn't work when fields prior to the chosen field contain newlines in some of the
* records.
*/
static String makeMultilineStartPattern(List<String> explanation, List<String> columnNames, int maxLinesPerMessage,
String delimiterPattern, String quotePattern, Map<String, Object> mappings,
String timeFieldName, TimestampFormatFinder timeFieldFormat) {

assert columnNames.isEmpty() == false;
assert maxLinesPerMessage > 0;
assert (timeFieldName == null) == (timeFieldFormat == null);

// This is the easy case: a file where there are no multi-line fields
if (maxLinesPerMessage == 1) {
explanation.add("Not creating a multi-line start pattern as no sampled message spanned multiple lines");
return null;
}

StringBuilder builder = new StringBuilder("^");
// Look for a field early in the line that cannot be a multi-line field based on the type we've determined for
// it, and create a pattern that matches this field with the appropriate number of delimiters before it.
// There is no point doing this for the last field on the line, so this is why the loop excludes the last column.
for (String columnName : columnNames.subList(0, columnNames.size() - 1)) {
if (columnName.equals(timeFieldName)) {
builder.append(quotePattern).append("?");
String simpleTimePattern = timeFieldFormat.getSimplePattern().pattern();
builder.append(simpleTimePattern.startsWith("\\b") ? simpleTimePattern.substring(2) : simpleTimePattern);
explanation.add("Created a multi-line start pattern based on timestamp column [" + columnName + "]");
return builder.toString();
}
Object columnMapping = mappings.get(columnName);
if (columnMapping instanceof Map) {
String type = (String) ((Map<?, ?>) columnMapping).get(FileStructureUtils.MAPPING_TYPE_SETTING);
if (type != null) {
String columnPattern;
switch (type) {
case "boolean":
columnPattern = "(?:true|false)";
break;
case "byte":
case "short":
case "integer":
case "long":
columnPattern = "[+-]?\\d+";
break;
case "half_float":
case "float":
case "double":
columnPattern = "[+-]?(?:\\d+(?:\\.\\d+)?|\\.\\d+)(?:[eE][+-]?\\d+)?";
break;
default:
columnPattern = null;
break;
}
if (columnPattern != null) {
builder.append("(?:").append(columnPattern).append("|")
.append(quotePattern).append(columnPattern).append(quotePattern).append(")")
.append(delimiterPattern);
explanation.add("Created a multi-line start pattern based on [" + type + "] column [" + columnName + "]");
return builder.toString();
}
}
}
builder.append(".*?").append(delimiterPattern);
}
// TODO: if this happens a lot then we should try looking for the a multi-line END pattern instead of a start pattern.
// But this would require changing the find_file_structure response, and the file upload UI, and would make creating Filebeat
// configs from the find_file_structure response more complex, so let's wait to see if there's significant demand.
explanation.add("Failed to create a suitable multi-line start pattern");
return null;
}
}
Loading