Skip to content

Commit

Permalink
remove ingest.new_date_format (#25583)
Browse files Browse the repository at this point in the history
  • Loading branch information
talevy authored Jul 10, 2017
1 parent b22bbf9 commit e04be73
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,18 @@ static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config,
if (pipeline == null) {
throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist");
}
List<IngestDocument> ingestDocumentList = parseDocs(config, pipelineStore.isNewIngestDateFormat());
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}

static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
List<IngestDocument> ingestDocumentList = parseDocs(config, pipelineStore.isNewIngestDateFormat());
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}

private static List<IngestDocument> parseDocs(Map<String, Object> config, boolean newDateFormat) {
private static List<IngestDocument> parseDocs(Map<String, Object> config) {
List<Map<String, Object>> docs = ConfigurationUtils.readList(null, null, config, Fields.DOCS);
List<IngestDocument> ingestDocumentList = new ArrayList<>();
for (Map<String, Object> dataMap : docs) {
Expand All @@ -183,7 +183,7 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config, boolea
ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.ID.getFieldName(), "_id"),
ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.ROUTING.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.PARENT.getFieldName()),
document, newDateFormat);
document);
ingestDocumentList.add(ingestDocument);
}
return ingestDocumentList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -27,6 +28,8 @@
import org.elasticsearch.ingest.IngestDocument;

import java.io.IOException;
import java.time.ZoneId;
import java.util.Date;
import java.util.Map;
import java.util.Objects;

Expand All @@ -42,6 +45,12 @@ final class WriteableIngestDocument implements Writeable, ToXContent {
WriteableIngestDocument(StreamInput in) throws IOException {
Map<String, Object> sourceAndMetadata = in.readMap();
Map<String, Object> ingestMetadata = in.readMap();
if (in.getVersion().before(Version.V_6_0_0_beta1)) {
ingestMetadata.computeIfPresent("timestamp", (k, o) -> {
Date date = (Date) o;
return date.toInstant().atZone(ZoneId.systemDefault());
});
}
this.ingestDocument = new IngestDocument(sourceAndMetadata, ingestMetadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import java.nio.file.FileSystemLoopException;
import java.nio.file.NoSuchFileException;
import java.nio.file.NotDirectoryException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
Expand Down Expand Up @@ -542,6 +545,8 @@ public Object readGenericValue() throws IOException {
return readBytesRef();
case 22:
return readGeoPoint();
case 23:
return readZonedDateTime();
default:
throw new IOException("Can't read unknown type [" + type + "]");
}
Expand All @@ -562,6 +567,11 @@ private DateTime readDateTime() throws IOException {
return new DateTime(readLong(), DateTimeZone.forID(timeZoneId));
}

private ZonedDateTime readZonedDateTime() throws IOException {
final String timeZoneId = readString();
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(readLong()), ZoneId.of(timeZoneId));
}

private Object[] readArray() throws IOException {
int size8 = readArraySize();
Object[] list8 = new Object[size8];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.nio.file.FileSystemLoopException;
import java.nio.file.NoSuchFileException;
import java.nio.file.NotDirectoryException;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -620,6 +621,13 @@ public final <K, V> void writeMap(final Map<K, V> map, final Writer<K> keyWriter
o.writeByte((byte) 22);
o.writeGeoPoint((GeoPoint) v);
});
writers.put(ZonedDateTime.class, (o, v) -> {
o.writeByte((byte) 23);
final ZonedDateTime zonedDateTime = (ZonedDateTime) v;
zonedDateTime.getZone().getId();
o.writeString(zonedDateTime.getZone().getId());
o.writeLong(zonedDateTime.toInstant().toEpochMilli());
});
WRITERS = Collections.unmodifiableMap(writers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING,
ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
Node.BREAKER_TYPE_KEY,
IngestService.NEW_INGEST_DATE_FORMAT
Node.BREAKER_TYPE_KEY
)));
}
11 changes: 1 addition & 10 deletions core/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ public final class IngestDocument {
private final Map<String, Object> ingestMetadata;

public IngestDocument(String index, String type, String id, String routing, String parent, Map<String, Object> source) {
this(index, type, id, routing, parent, source, false);
}

public IngestDocument(String index, String type, String id, String routing, String parent, Map<String, Object> source,
boolean newDateFormat) {
this.sourceAndMetadata = new HashMap<>();
this.sourceAndMetadata.putAll(source);
this.sourceAndMetadata.put(MetaData.INDEX.getFieldName(), index);
Expand All @@ -75,11 +70,7 @@ public IngestDocument(String index, String type, String id, String routing, Stri
}

this.ingestMetadata = new HashMap<>();
if (newDateFormat) {
this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC));
} else {
this.ingestMetadata.put(TIMESTAMP, new Date());
}
this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,10 @@
* Holder class for several ingest related services.
*/
public class IngestService {
public static final Setting<Boolean> NEW_INGEST_DATE_FORMAT =
Setting.boolSetting("ingest.new_date_format", false, Property.NodeScope, Property.Dynamic, Property.Deprecated);

private final PipelineStore pipelineStore;
private final PipelineExecutionService pipelineExecutionService;

public IngestService(ClusterSettings clusterSettings, Settings settings, ThreadPool threadPool,
public IngestService(Settings settings, ThreadPool threadPool,
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins) {
Processor.Parameters parameters = new Processor.Parameters(env, scriptService,
Expand All @@ -60,7 +57,7 @@ public IngestService(ClusterSettings clusterSettings, Settings settings, ThreadP
}
}
}
this.pipelineStore = new PipelineStore(clusterSettings, settings, Collections.unmodifiableMap(processorFactories));
this.pipelineStore = new PipelineStore(settings, Collections.unmodifiableMap(processorFactories));
this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws E
String routing = indexRequest.routing();
String parent = indexRequest.parent();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent,
sourceAsMap, store.isNewIngestDateFormat());
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, sourceAsMap);
pipeline.execute(ingestDocument);

Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
Expand Down
13 changes: 1 addition & 12 deletions core/src/main/java/org/elasticsearch/ingest/PipelineStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,16 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl

private final Pipeline.Factory factory = new Pipeline.Factory();
private final Map<String, Processor.Factory> processorFactories;
private volatile boolean newIngestDateFormat;

// Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
// processor factories rely on other node services. Custom metadata is statically registered when classes
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
volatile Map<String, Pipeline> pipelines = new HashMap<>();

public PipelineStore(ClusterSettings clusterSettings, Settings settings, Map<String, Processor.Factory> processorFactories) {
public PipelineStore(Settings settings, Map<String, Processor.Factory> processorFactories) {
super(settings);
this.processorFactories = processorFactories;
this.newIngestDateFormat = IngestService.NEW_INGEST_DATE_FORMAT.get(settings);
clusterSettings.addSettingsUpdateConsumer(IngestService.NEW_INGEST_DATE_FORMAT, this::setNewIngestDateFormat);
}

private void setNewIngestDateFormat(Boolean newIngestDateFormat) {
this.newIngestDateFormat = newIngestDateFormat;
}

@Override
Expand Down Expand Up @@ -212,10 +205,6 @@ public Map<String, Processor.Factory> getProcessorFactories() {
return processorFactories;
}

public boolean isNewIngestDateFormat() {
return newIngestDateFormat;
}

/**
* @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines
* may be returned
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addListener(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment,
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,14 @@

public class IngestDocumentTests extends ESTestCase {

private static final Date BOGUS_TIMESTAMP = new Date(0L);
private static final ZonedDateTime BOGUS_TIMESTAMP_NEW_DATE_FORMAT = ZonedDateTime.of(2016, 10, 23, 0, 0, 0, 0, ZoneOffset.UTC);
private static final ZonedDateTime BOGUS_TIMESTAMP = ZonedDateTime.of(2016, 10, 23, 0, 0, 0, 0, ZoneOffset.UTC);
private IngestDocument ingestDocument;
private IngestDocument ingestDocumentWithNewDateFormat;

public IngestDocument getTestIngestDocument(boolean newDateFormat) {
@Before
public void setTestIngestDocument() {
Map<String, Object> document = new HashMap<>();
Map<String, Object> ingestMap = new HashMap<>();
if (newDateFormat) {
ingestMap.put("timestamp", BOGUS_TIMESTAMP_NEW_DATE_FORMAT);
} else {
ingestMap.put("timestamp", BOGUS_TIMESTAMP);
}
ingestMap.put("timestamp", BOGUS_TIMESTAMP);
document.put("_ingest", ingestMap);
document.put("foo", "bar");
document.put("int", 123);
Expand All @@ -79,18 +74,7 @@ public IngestDocument getTestIngestDocument(boolean newDateFormat) {
list.add(null);

document.put("list", list);
return new IngestDocument("index", "type", "id", null, null, document, newDateFormat);
}

@Before
public void setIngestDocuments() {
ingestDocument = getTestIngestDocument(false);
ingestDocumentWithNewDateFormat = getTestIngestDocument(true);
}

public void testDefaultConstructorUsesDateClass() {
IngestDocument ingestDocument = new IngestDocument("foo", "bar", "baz", "fuzz", "buzz", Collections.emptyMap());
assertThat(ingestDocument.getFieldValue("_ingest.timestamp", Object.class).getClass(), equalTo(Date.class));
ingestDocument = new IngestDocument("index", "type", "id", null, null, document);
}

public void testSimpleGetFieldValue() {
Expand All @@ -101,16 +85,10 @@ public void testSimpleGetFieldValue() {
assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("index"));
assertThat(ingestDocument.getFieldValue("_type", String.class), equalTo("type"));
assertThat(ingestDocument.getFieldValue("_id", String.class), equalTo("id"));
assertThat(ingestDocument.getFieldValue("_ingest.timestamp", Date.class),
both(notNullValue()).and(not(equalTo(BOGUS_TIMESTAMP))));
assertThat(ingestDocument.getFieldValue("_source._ingest.timestamp", Date.class), equalTo(BOGUS_TIMESTAMP));
}

public void testNewDateFormat() {
assertThat(ingestDocumentWithNewDateFormat.getFieldValue("_ingest.timestamp", ZonedDateTime.class),
both(notNullValue()).and(not(equalTo(BOGUS_TIMESTAMP_NEW_DATE_FORMAT))));
assertThat(ingestDocumentWithNewDateFormat.getFieldValue("_source._ingest.timestamp", ZonedDateTime.class),
equalTo(BOGUS_TIMESTAMP_NEW_DATE_FORMAT));
assertThat(ingestDocument.getFieldValue("_ingest.timestamp", ZonedDateTime.class),
both(notNullValue()).and(not(equalTo(BOGUS_TIMESTAMP))));
assertThat(ingestDocument.getFieldValue("_source._ingest.timestamp", ZonedDateTime.class),
equalTo(BOGUS_TIMESTAMP));
}

public void testGetSourceObject() {
Expand Down Expand Up @@ -994,10 +972,11 @@ public void testIngestMetadataTimestamp() throws Exception {
long before = System.currentTimeMillis();
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
long after = System.currentTimeMillis();
Date timestamp = (Date) ingestDocument.getIngestMetadata().get(IngestDocument.TIMESTAMP);
ZonedDateTime timestamp = (ZonedDateTime) ingestDocument.getIngestMetadata().get(IngestDocument.TIMESTAMP);
long actualMillis = timestamp.toInstant().toEpochMilli();
assertThat(timestamp, notNullValue());
assertThat(timestamp.getTime(), greaterThanOrEqualTo(before));
assertThat(timestamp.getTime(), lessThanOrEqualTo(after));
assertThat(actualMillis, greaterThanOrEqualTo(before));
assertThat(actualMillis, lessThanOrEqualTo(after));
}

public void testCopyConstructor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet

public void testIngestPlugin() {
ThreadPool tp = Mockito.mock(ThreadPool.class);
IngestService ingestService = new IngestService(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
Settings.EMPTY, tp, null, null, null, Collections.singletonList(DUMMY_PLUGIN));
IngestService ingestService = new IngestService(Settings.EMPTY, tp, null, null,
null, Collections.singletonList(DUMMY_PLUGIN));
Map<String, Processor.Factory> factories = ingestService.getPipelineStore().getProcessorFactories();
assertTrue(factories.containsKey("foo"));
assertEquals(1, factories.size());
Expand All @@ -50,9 +50,8 @@ public void testIngestPlugin() {
public void testIngestPluginDuplicate() {
ThreadPool tp = Mockito.mock(ThreadPool.class);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
new IngestService(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
Settings.EMPTY, tp, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))
);
new IngestService(Settings.EMPTY, tp, null, null,
null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN)));
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

public class PipelineStoreTests extends ESTestCase {

private ClusterSettings clusterSettings;
private PipelineStore store;

@Before
Expand Down Expand Up @@ -95,8 +94,7 @@ public String getTag() {
}
};
});
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
store = new PipelineStore(clusterSettings, Settings.EMPTY, processorFactories);
store = new PipelineStore(Settings.EMPTY, processorFactories);
}

public void testUpdatePipelines() {
Expand Down Expand Up @@ -371,12 +369,4 @@ public void testValidateNoIngestInfo() throws Exception {
IngestInfo ingestInfo = new IngestInfo(Collections.singletonList(new ProcessorInfo("set")));
store.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest);
}

public void testUpdateIngestNewDateFormatSetting() throws Exception {
assertFalse(store.isNewIngestDateFormat());
clusterSettings.applySettings(Settings.builder().put(IngestService.NEW_INGEST_DATE_FORMAT.getKey(), true).build());
assertTrue(store.isNewIngestDateFormat());
assertWarnings("[ingest.new_date_format] setting was deprecated in Elasticsearch and will be " +
"removed in a future release! See the breaking changes documentation for the next major version.");
}
}
10 changes: 9 additions & 1 deletion docs/reference/migration/migrate_6_0/ingest.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,12 @@

==== Timestamp meta-data field type has changed

The type of the "timestamp" meta-data field has changed from `java.lang.String` to `java.util.Date`.
The type of the "timestamp" meta-data field has changed from `java.lang.String` to `java.util.Date`.

==== The format of the string-formatted ingest.timestamp field has changed

Previously, since Elasticsearch 5.4.0, you needed to use `ingest.new_date_format` to have the
`ingest.timestamp` metadata field be formatted in such a way that ES can coerce it to a field
of type `date` without further transformation. This is not necessary anymore and this setting was
removed. You can now simply set a field to `{{ingest.timestamp}}` in a pipeline, and have that
field be of type `date` without any mapping errors.
Loading

0 comments on commit e04be73

Please sign in to comment.