Skip to content

Adding support for the date_nanos field type (#1803) #1830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public Object createArray(FieldType type) {
arrayType = BooleanWritable.class;
break;
case DATE:
case DATE_NANOS:
arrayType = dateType();
break;
case BINARY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum FieldType {
DOUBLE,
STRING,
DATE,
DATE_NANOS,
BINARY,
TOKEN_COUNT,
// ES 5.x
Expand Down Expand Up @@ -75,6 +76,7 @@ public enum FieldType {
CAST_HIERARCHY.put(FLOAT, new LinkedHashSet<FieldType>(Arrays.asList(DOUBLE, KEYWORD)));
CAST_HIERARCHY.put(STRING, new LinkedHashSet<FieldType>(Collections.singletonList(KEYWORD)));
CAST_HIERARCHY.put(DATE, new LinkedHashSet<FieldType>(Collections.singletonList(KEYWORD)));
CAST_HIERARCHY.put(DATE_NANOS, new LinkedHashSet<FieldType>(Collections.singletonList(KEYWORD)));
CAST_HIERARCHY.put(BINARY, new LinkedHashSet<FieldType>(Collections.singletonList(KEYWORD)));
CAST_HIERARCHY.put(TOKEN_COUNT, new LinkedHashSet<FieldType>(Arrays.asList(LONG, KEYWORD)));
CAST_HIERARCHY.put(TEXT, new LinkedHashSet<FieldType>(Collections.singletonList(KEYWORD)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@
*/
package org.elasticsearch.hadoop.serialization.builder;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.FieldType;
import org.elasticsearch.hadoop.serialization.Parser;
Expand All @@ -39,6 +31,14 @@
import org.elasticsearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.unit.Booleans;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;


/**
* Basic value reader handling using the implied JSON type.
Expand Down Expand Up @@ -86,6 +86,8 @@ public Object readValue(Parser parser, String value, FieldType esType) {
return binaryValue(binValue);
case DATE:
return date(value, parser);
case DATE_NANOS:
return dateNanos(value, parser);
case JOIN:
// In the case of a join field reaching this point it is because it is the short-hand form for a parent.
// construct a container and place the short form name into the name subfield.
Expand Down Expand Up @@ -416,6 +418,27 @@ protected Object date(String value, Parser parser) {
return processDate(val);
}

protected Object dateNanos(String value, Parser parser) {
Object val = null;

if (value == null || isEmpty(value)) {
return nullValue();
}
else {
Token tk = parser.currentToken();

// UNIX time format
if (tk == Token.VALUE_NUMBER) {
val = parseDate(parser.longValue(), richDate);
}
else {
val = parseDateNanos(value, richDate);
}
}

return processDate(val);
}

protected Object parseDate(Long value, boolean richDate) {
return (richDate ? createDate(value) : value);
}
Expand All @@ -424,6 +447,10 @@ protected Object parseDate(String value, boolean richDate) {
return (richDate ? createDate(DateUtils.parseDate(value).getTimeInMillis()) : parseString(value));
}

protected Object parseDateNanos(String value, boolean richDate) {
return (richDate ? DateUtils.parseDateNanos(value) : parseString(value));
}

protected Object createDate(long timestamp) {
return new Date(timestamp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
*/
package org.elasticsearch.hadoop.serialization.builder;

import org.elasticsearch.hadoop.serialization.Generator;
import org.elasticsearch.hadoop.util.ObjectUtils;

import javax.xml.bind.DatatypeConverter;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;
import java.util.Map.Entry;

import javax.xml.bind.DatatypeConverter;

import org.elasticsearch.hadoop.serialization.Generator;
import org.elasticsearch.hadoop.util.ObjectUtils;

/**
* Value writer for JDK types.
*/
Expand Down Expand Up @@ -126,7 +129,12 @@ else if (value instanceof Iterable) {
}
generator.writeEndArray();
}
// handles Timestamp also
else if (value instanceof Timestamp) {
Timestamp timestamp = (Timestamp) value;
LocalDateTime localDateTime = timestamp.toLocalDateTime();
OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, OffsetDateTime.now().getOffset());
generator.writeString(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(offsetDateTime));
}
else if (value instanceof Date) {
Calendar cal = Calendar.getInstance();
cal.setTime((Date) value);
Expand Down
47 changes: 42 additions & 5 deletions mr/src/main/java/org/elasticsearch/hadoop/util/DateUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@
*/
package org.elasticsearch.hadoop.util;

import java.lang.reflect.Method;
import java.util.Calendar;

import javax.xml.bind.DatatypeConverter;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import javax.xml.bind.DatatypeConverter;
import java.lang.reflect.Method;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalField;
import java.time.temporal.TemporalQueries;
import java.util.Calendar;

/**
* Utility used for parsing date ISO8601.
* Morphed into a runtime bridge over possible ISO8601 (simply because the spec is too large, especially when considering the various optional formats).
Expand All @@ -36,6 +44,9 @@ public abstract class DateUtils {

private final static boolean jodaTimeAvailable = ObjectUtils.isClassPresent("org.joda.time.format.ISODateTimeFormat", DateUtils.class.getClassLoader());

static final DateTimeFormatter DATE_OPTIONAL_TIME_OFFSET =
DateTimeFormatter.ofPattern("uuuu-MM-dd['T'HH:mm:ss][.SSSSSSSSS][.SSSSSS][.SSS][XXX]");

private static abstract class Jdk6 {
// Parses ISO date through the JDK XML bind class. However the spec doesn't support all ISO8601 formats which this class tries to address
// in particular Time offsets from UTC are available in 3 forms:
Expand Down Expand Up @@ -121,4 +132,30 @@ public static Calendar parseDate(String value) {

return (jodaTimeAvailable && JodaTime.INITIALIZED) ? JodaTime.parseDate(value) : Jdk6.parseDate(value);
}

public static Timestamp parseDateNanos(String value) {
return DATE_OPTIONAL_TIME_OFFSET.parse(value, temporal -> {
int year = temporal.get(ChronoField.YEAR);
int month = temporal.get(ChronoField.MONTH_OF_YEAR);
int dayOfMonth = temporal.get(ChronoField.DAY_OF_MONTH);
int hour = getOrDefault(temporal, ChronoField.HOUR_OF_DAY, 0);
int minute = getOrDefault(temporal, ChronoField.MINUTE_OF_HOUR, 0);
int second = getOrDefault(temporal, ChronoField.SECOND_OF_MINUTE, 0);
int nanoOfSecond = getOrDefault(temporal, ChronoField.NANO_OF_SECOND, 0);
ZoneId zone = temporal.query(TemporalQueries.zone());
if (zone == null) {
zone = ZoneId.of("UTC");
}
ZonedDateTime zonedDateTime = ZonedDateTime.of(year, month, dayOfMonth, hour, minute, second, nanoOfSecond, zone);
return Timestamp.from(Instant.from(zonedDateTime));
});
}

private static int getOrDefault(TemporalAccessor temporal, TemporalField field, int defaultValue) {
if(temporal.isSupported(field)) {
return temporal.get(field);
} else {
return defaultValue;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.elasticsearch.hadoop.serialization.builder;

import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.FieldType;
import org.elasticsearch.hadoop.serialization.Parser;
import org.junit.Test;
import org.mockito.Mockito;

import java.sql.Timestamp;
import java.util.Date;

import static org.junit.Assert.assertEquals;

public class JdkValueReaderTest {
@Test
public void testReadValue() {
JdkValueReader reader = new JdkValueReader();
Parser parser = Mockito.mock(Parser.class);

Mockito.when(parser.currentToken()).thenReturn(Parser.Token.VALUE_STRING);
Timestamp timestamp = (Timestamp) reader.readValue(parser, "2015-01-01T12:10:30.123456789Z", FieldType.DATE_NANOS);
assertEquals(1420114230123l, timestamp.getTime());
assertEquals(123456789, timestamp.getNanos());

Mockito.when(parser.currentToken()).thenReturn(Parser.Token.VALUE_NUMBER);
Mockito.when(parser.longValue()).thenReturn(1420114230123l);
Date date = (Date) reader.readValue(parser, "1420114230123", FieldType.DATE_NANOS);
assertEquals(1420114230123l, date.getTime());

Settings settings = Mockito.mock(Settings.class);
Mockito.when(settings.getMappingDateRich()).thenReturn(false);
reader.setSettings(settings);
Mockito.when(parser.currentToken()).thenReturn(Parser.Token.VALUE_STRING);
String stringValue = (String) reader.readValue(parser, "2015-01-01T12:10:30.123456789Z", FieldType.DATE_NANOS);
assertEquals("2015-01-01T12:10:30.123456789Z", stringValue);

Mockito.when(parser.currentToken()).thenReturn(Parser.Token.VALUE_NUMBER);
Mockito.when(parser.longValue()).thenReturn(1420114230123l);
Long dateLong = (Long) reader.readValue(parser, "1420114230123", FieldType.DATE_NANOS);
assertEquals(1420114230123l, dateLong.longValue());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.elasticsearch.hadoop.serialization.builder;

import org.elasticsearch.hadoop.serialization.Generator;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;

import static org.junit.Assert.assertEquals;

public class JdkValueWriterTest {
@Test
public void testWriteDate() {
JdkValueWriter jdkValueWriter = new JdkValueWriter();
Date date = new Date(1420114230123l);
Generator generator = Mockito.mock(Generator.class);
ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
jdkValueWriter.doWrite(date, generator, "");
Mockito.verify(generator).writeString(argument.capture());
String expected = date.toInstant().atZone(ZoneId.systemDefault()).toOffsetDateTime().toString();
String actual = argument.getValue();
assertEquals(expected, actual);
OffsetDateTime parsedDate = DateTimeFormatter.ISO_OFFSET_DATE_TIME.parse(actual, OffsetDateTime::from);
assertEquals(123000000, parsedDate.getNano()); //Nothing beyond milliseconds
}

@Test
public void testWriteDateWithNanos() {
JdkValueWriter jdkValueWriter = new JdkValueWriter();
Timestamp timestamp = new Timestamp(1420114230123l);
timestamp.setNanos(123456789);
Generator generator = Mockito.mock(Generator.class);
ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
jdkValueWriter.doWrite(timestamp, generator, "");
Mockito.verify(generator).writeString(argument.capture());
String expected = timestamp.toInstant().atZone(ZoneId.systemDefault()).toOffsetDateTime().toString();
String actual = argument.getValue();
assertEquals(expected, actual);
OffsetDateTime parsedDate = DateTimeFormatter.ISO_OFFSET_DATE_TIME.parse(actual, OffsetDateTime::from);
assertEquals(123456789, parsedDate.getNano());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.elasticsearch.hadoop.serialization.FieldType.BOOLEAN;
import static org.elasticsearch.hadoop.serialization.FieldType.BYTE;
import static org.elasticsearch.hadoop.serialization.FieldType.DATE;
import static org.elasticsearch.hadoop.serialization.FieldType.DATE_NANOS;
import static org.elasticsearch.hadoop.serialization.FieldType.DOUBLE;
import static org.elasticsearch.hadoop.serialization.FieldType.FLOAT;
import static org.elasticsearch.hadoop.serialization.FieldType.GEO_POINT;
Expand All @@ -53,6 +54,7 @@
import static org.elasticsearch.hadoop.serialization.FieldType.SHORT;
import static org.elasticsearch.hadoop.serialization.FieldType.STRING;
import static org.elasticsearch.hadoop.serialization.FieldType.TEXT;

import static org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils.findTypos;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -134,7 +136,7 @@ public void testPrimitivesParsing() throws Exception {
MappingSet mappings = getMappingsForResource("primitives.json");
Mapping mapping = ensureAndGet("index", "primitives", mappings);
Field[] props = mapping.getFields();
assertEquals(14, props.length);
assertEquals(15, props.length);
assertEquals("field01", props[0].name());
assertEquals(BOOLEAN, props[0].type());
assertEquals("field02", props[1].name());
Expand Down Expand Up @@ -163,6 +165,8 @@ public void testPrimitivesParsing() throws Exception {
assertEquals(HALF_FLOAT, props[12].type());
assertEquals("field14", props[13].name());
assertEquals(SCALED_FLOAT, props[13].type());
assertEquals("field15", props[14].name());
assertEquals(DATE_NANOS, props[14].type());
}

@Test
Expand Down
38 changes: 38 additions & 0 deletions mr/src/test/java/org/elasticsearch/hadoop/util/DateUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.elasticsearch.hadoop.util;

import org.junit.Test;

import java.sql.Timestamp;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class DateUtilsTest {
@Test
public void parseDateNanos() {
Timestamp timestamp = DateUtils.parseDateNanos("2015-01-01");
assertNotNull(timestamp);
assertEquals(1420070400000l, timestamp.getTime());
assertEquals(0, timestamp.getNanos());

timestamp = DateUtils.parseDateNanos("2015-01-01T12:10:30.123456789Z");
assertNotNull(timestamp);
assertEquals(1420114230123l, timestamp.getTime());
assertEquals(123456789, timestamp.getNanos());

timestamp = DateUtils.parseDateNanos("2015-01-01T00:00:00.000Z");
assertNotNull(timestamp);
assertEquals(1420070400000l, timestamp.getTime());
assertEquals(0, timestamp.getNanos());

timestamp = DateUtils.parseDateNanos("2015-01-01T12:10:30.123456Z");
assertNotNull(timestamp);
assertEquals(1420114230123l, timestamp.getTime());
assertEquals(123456000, timestamp.getNanos());

timestamp = DateUtils.parseDateNanos("2015-01-01T12:10:30.123Z");
assertNotNull(timestamp);
assertEquals(1420114230123l, timestamp.getTime());
assertEquals(123000000, timestamp.getNanos());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
"field14" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"field15" : {
"type" : "date_nanos"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
"field14" : {
"type" : "scaled_float",
"scaling_factor" : 100.0
},
"field15" : {
"type" : "date_nanos"
}
}
}
Expand Down
Loading