Skip to content

Commit 0d9058a

Browse files
committed
Core: add types timestamp_ns and timestamptz_ns
Helps #8657
1 parent 3c77542 commit 0d9058a

31 files changed

+410
-100
lines changed

api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java

+42
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public static LocalDateTime timestampFromMicros(long microsFromEpoch) {
6464
return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch).toLocalDateTime();
6565
}
6666

67+
public static LocalDateTime timestampFromNanos(long nanosFromEpoch) {
68+
return ChronoUnit.NANOS.addTo(EPOCH, nanosFromEpoch).toLocalDateTime();
69+
}
70+
6771
public static long microsFromInstant(Instant instant) {
6872
return ChronoUnit.MICROS.between(EPOCH, instant.atOffset(ZoneOffset.UTC));
6973
}
@@ -72,6 +76,10 @@ public static long microsFromTimestamp(LocalDateTime dateTime) {
7276
return ChronoUnit.MICROS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC));
7377
}
7478

79+
public static long nanosFromTimestamp(LocalDateTime dateTime) {
80+
return ChronoUnit.NANOS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC));
81+
}
82+
7583
public static long microsToMillis(long micros) {
7684
// When the timestamp is negative, i.e before 1970, we need to adjust the milliseconds portion.
7785
// Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision.
@@ -99,10 +107,18 @@ public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) {
99107
return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch);
100108
}
101109

110+
public static OffsetDateTime timestamptzFromNanos(long nanosFromEpoch) {
111+
return ChronoUnit.NANOS.addTo(EPOCH, nanosFromEpoch);
112+
}
113+
102114
public static long microsFromTimestamptz(OffsetDateTime dateTime) {
103115
return ChronoUnit.MICROS.between(EPOCH, dateTime);
104116
}
105117

118+
public static long nanosFromTimestamptz(OffsetDateTime dateTime) {
119+
return ChronoUnit.NANOS.between(EPOCH, dateTime);
120+
}
121+
106122
public static String formatTimestampMillis(long millis) {
107123
return Instant.ofEpochMilli(millis).toString().replace("Z", "+00:00");
108124
}
@@ -126,11 +142,27 @@ public static String microsToIsoTimestamptz(long micros) {
126142
return localDateTime.atOffset(ZoneOffset.UTC).format(zeroOffsetFormatter);
127143
}
128144

145+
public static String nanosToIsoTimestamptz(long nanos) {
146+
LocalDateTime localDateTime = timestampFromNanos(nanos);
147+
DateTimeFormatter zeroOffsetFormatter =
148+
new DateTimeFormatterBuilder()
149+
.parseCaseInsensitive()
150+
.append(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
151+
.appendOffset("+HH:MM:ss", "+00:00")
152+
.toFormatter();
153+
return localDateTime.atOffset(ZoneOffset.UTC).format(zeroOffsetFormatter);
154+
}
155+
129156
public static String microsToIsoTimestamp(long micros) {
130157
LocalDateTime localDateTime = timestampFromMicros(micros);
131158
return localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
132159
}
133160

161+
public static String nanosToIsoTimestamp(long nanos) {
162+
LocalDateTime localDateTime = timestampFromNanos(nanos);
163+
return localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
164+
}
165+
134166
public static int isoDateToDays(String dateString) {
135167
return daysFromDate(LocalDate.parse(dateString, DateTimeFormatter.ISO_LOCAL_DATE));
136168
}
@@ -144,6 +176,11 @@ public static long isoTimestamptzToMicros(String timestampString) {
144176
OffsetDateTime.parse(timestampString, DateTimeFormatter.ISO_DATE_TIME));
145177
}
146178

179+
public static long isoTimestamptzToNanos(String timestampString) {
180+
return nanosFromTimestamptz(
181+
OffsetDateTime.parse(timestampString, DateTimeFormatter.ISO_DATE_TIME));
182+
}
183+
147184
public static boolean isUTCTimestamptz(String timestampString) {
148185
OffsetDateTime offsetDateTime =
149186
OffsetDateTime.parse(timestampString, DateTimeFormatter.ISO_DATE_TIME);
@@ -155,6 +192,11 @@ public static long isoTimestampToMicros(String timestampString) {
155192
LocalDateTime.parse(timestampString, DateTimeFormatter.ISO_LOCAL_DATE_TIME));
156193
}
157194

195+
public static long isoTimestampToNanos(String timestampString) {
196+
return nanosFromTimestamp(
197+
LocalDateTime.parse(timestampString, DateTimeFormatter.ISO_LOCAL_DATE_TIME));
198+
}
199+
158200
public static int daysToYears(int days) {
159201
return convertDays(days, ChronoUnit.YEARS);
160202
}

core/src/main/java/org/apache/iceberg/HistoryTable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
public class HistoryTable extends BaseMetadataTable {
3737
private static final Schema HISTORY_SCHEMA =
3838
new Schema(
39-
Types.NestedField.required(1, "made_current_at", Types.TimestampType.withZone()),
39+
Types.NestedField.required(1, "made_current_at", Types.TimestampType.microsWithZone()),
4040
Types.NestedField.required(2, "snapshot_id", Types.LongType.get()),
4141
Types.NestedField.optional(3, "parent_id", Types.LongType.get()),
4242
Types.NestedField.required(4, "is_current_ancestor", Types.BooleanType.get()));

core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class MetadataLogEntriesTable extends BaseMetadataTable {
2828

2929
private static final Schema METADATA_LOG_ENTRIES_SCHEMA =
3030
new Schema(
31-
Types.NestedField.required(1, "timestamp", Types.TimestampType.withZone()),
31+
Types.NestedField.required(1, "timestamp", Types.TimestampType.microsWithZone()),
3232
Types.NestedField.required(2, "file", Types.StringType.get()),
3333
Types.NestedField.optional(3, "latest_snapshot_id", Types.LongType.get()),
3434
Types.NestedField.optional(4, "latest_schema_id", Types.IntegerType.get()),

core/src/main/java/org/apache/iceberg/PartitionsTable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public class PartitionsTable extends BaseMetadataTable {
8181
Types.NestedField.optional(
8282
9,
8383
"last_updated_at",
84-
Types.TimestampType.withZone(),
84+
Types.TimestampType.microsWithZone(),
8585
"Commit time of snapshot that last updated this partition"),
8686
Types.NestedField.optional(
8787
10,

core/src/main/java/org/apache/iceberg/ScanSummary.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private void addTimestampFilter(UnboundPredicate<Long> filter) {
9191
}
9292

9393
public Builder after(String timestamp) {
94-
Literal<Long> tsLiteral = Literal.of(timestamp).to(Types.TimestampType.withoutZone());
94+
Literal<Long> tsLiteral = Literal.of(timestamp).to(Types.TimestampType.microsWithoutZone());
9595
return after(tsLiteral.value() / 1000);
9696
}
9797

@@ -101,7 +101,7 @@ public Builder after(long timestampMillis) {
101101
}
102102

103103
public Builder before(String timestamp) {
104-
Literal<Long> tsLiteral = Literal.of(timestamp).to(Types.TimestampType.withoutZone());
104+
Literal<Long> tsLiteral = Literal.of(timestamp).to(Types.TimestampType.microsWithoutZone());
105105
return before(tsLiteral.value() / 1000);
106106
}
107107

@@ -133,7 +133,7 @@ private void removeTimeFilters(List<Expression> expressions, Expression expressi
133133
NamedReference<?> ref = (NamedReference<?>) pred.term();
134134
Literal<?> lit = pred.literal();
135135
if (TIMESTAMP_NAMES.contains(ref.name())) {
136-
Literal<Long> tsLiteral = lit.to(Types.TimestampType.withoutZone());
136+
Literal<Long> tsLiteral = lit.to(Types.TimestampType.microsWithoutZone());
137137
long millis = toMillis(tsLiteral.value());
138138
addTimestampFilter(Expressions.predicate(pred.op(), "timestamp_ms", millis));
139139
return;

core/src/main/java/org/apache/iceberg/SingleValueParser.java

+39-11
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,29 @@ public static Object fromJson(Type type, JsonNode defaultValue) {
129129
case TIMESTAMP:
130130
Preconditions.checkArgument(
131131
defaultValue.isTextual(), "Cannot parse default as a %s value: %s", type, defaultValue);
132-
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
133-
String timestampTz = defaultValue.textValue();
132+
Types.TimestampType timestamp = (Types.TimestampType) type;
133+
String timestampText = defaultValue.textValue();
134+
if (timestamp.shouldAdjustToUTC()) {
134135
Preconditions.checkArgument(
135-
DateTimeUtil.isUTCTimestamptz(timestampTz),
136+
DateTimeUtil.isUTCTimestamptz(timestampText),
136137
"Cannot parse default as a %s value: %s, offset must be +00:00",
137-
type,
138+
timestamp,
138139
defaultValue);
139-
return DateTimeUtil.isoTimestamptzToMicros(timestampTz);
140-
} else {
141-
return DateTimeUtil.isoTimestampToMicros(defaultValue.textValue());
140+
}
141+
switch (timestamp.unit()) {
142+
case MICROS:
143+
if (timestamp.shouldAdjustToUTC()) {
144+
return DateTimeUtil.isoTimestamptzToMicros(timestampText);
145+
}
146+
return DateTimeUtil.isoTimestampToMicros(timestampText);
147+
case NANOS:
148+
if (timestamp.shouldAdjustToUTC()) {
149+
return DateTimeUtil.isoTimestamptzToNanos(timestampText);
150+
}
151+
return DateTimeUtil.isoTimestampToNanos(timestampText);
152+
default:
153+
throw new UnsupportedOperationException(
154+
"Unsupported timestamp unit: " + timestamp.unit());
142155
}
143156
case FIXED:
144157
Preconditions.checkArgument(
@@ -283,10 +296,25 @@ public static void toJson(Type type, Object defaultValue, JsonGenerator generato
283296
case TIMESTAMP:
284297
Preconditions.checkArgument(
285298
defaultValue instanceof Long, "Invalid default %s value: %s", type, defaultValue);
286-
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
287-
generator.writeString(DateTimeUtil.microsToIsoTimestamptz((Long) defaultValue));
288-
} else {
289-
generator.writeString(DateTimeUtil.microsToIsoTimestamp((Long) defaultValue));
299+
Types.TimestampType timestamp = (Types.TimestampType) type;
300+
switch (timestamp.unit()) {
301+
case MICROS:
302+
if (timestamp.shouldAdjustToUTC()) {
303+
generator.writeString(DateTimeUtil.microsToIsoTimestamptz((Long) defaultValue));
304+
} else {
305+
generator.writeString(DateTimeUtil.microsToIsoTimestamp((Long) defaultValue));
306+
}
307+
break;
308+
case NANOS:
309+
if (timestamp.shouldAdjustToUTC()) {
310+
generator.writeString(DateTimeUtil.nanosToIsoTimestamptz((Long) defaultValue));
311+
} else {
312+
generator.writeString(DateTimeUtil.nanosToIsoTimestamp((Long) defaultValue));
313+
}
314+
break;
315+
default:
316+
throw new UnsupportedOperationException(
317+
"Unsupported timestamp unit: " + timestamp.unit());
290318
}
291319
break;
292320
case STRING:

core/src/main/java/org/apache/iceberg/SnapshotsTable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
public class SnapshotsTable extends BaseMetadataTable {
3030
private static final Schema SNAPSHOT_SCHEMA =
3131
new Schema(
32-
Types.NestedField.required(1, "committed_at", Types.TimestampType.withZone()),
32+
Types.NestedField.required(1, "committed_at", Types.TimestampType.microsWithZone()),
3333
Types.NestedField.required(2, "snapshot_id", Types.LongType.get()),
3434
Types.NestedField.optional(3, "parent_id", Types.LongType.get()),
3535
Types.NestedField.optional(4, "operation", Types.StringType.get()),

core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ public static Schema buildAvroProjection(
129129
public static boolean isTimestamptz(Schema schema) {
130130
LogicalType logicalType = schema.getLogicalType();
131131
if (logicalType instanceof LogicalTypes.TimestampMillis
132-
|| logicalType instanceof LogicalTypes.TimestampMicros) {
132+
|| logicalType instanceof LogicalTypes.TimestampMicros
133+
|| logicalType instanceof IcebergLogicalTypes.TimestampNanos) {
133134
// timestamptz is adjusted to UTC
134135
Object value = schema.getObjectProp(ADJUST_TO_UTC_PROP);
135136

core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java

+3
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ public ValueReader<?> primitive(Schema primitive) {
144144
// Spark uses the same representation
145145
return ValueReaders.longs();
146146

147+
case "timestamp-nanos":
148+
return ValueReaders.longs();
149+
147150
case "decimal":
148151
return ValueReaders.decimal(
149152
ValueReaders.decimalBytesReader(primitive),

core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java

+3
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ public ValueWriter<?> primitive(Schema primitive) {
107107
case "timestamp-micros":
108108
return ValueWriters.longs();
109109

110+
case "timestamp-nanos":
111+
return ValueWriters.longs();
112+
110113
case "decimal":
111114
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
112115
return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.avro;
20+
21+
import org.apache.avro.LogicalType;
22+
import org.apache.avro.Schema;
23+
24+
public class IcebergLogicalTypes {
25+
26+
private static final String TIMESTAMP_NANOS = "timestamp-nanos";
27+
28+
private static final TimestampNanos TIMESTAMP_NANOS_TYPE = new TimestampNanos();
29+
30+
public static TimestampNanos timestampNanos() {
31+
return TIMESTAMP_NANOS_TYPE;
32+
}
33+
34+
/** TimestampNanos represents a date and time in nanoseconds */
35+
public static class TimestampNanos extends LogicalType {
36+
private TimestampNanos() {
37+
super(TIMESTAMP_NANOS);
38+
}
39+
40+
@Override
41+
public void validate(Schema schema) {
42+
super.validate(schema);
43+
if (schema.getType() != Schema.Type.LONG) {
44+
throw new IllegalArgumentException(
45+
"Timestamp (nanos) can only be used with an underlying long type");
46+
}
47+
}
48+
}
49+
}

core/src/main/java/org/apache/iceberg/avro/SchemaToType.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,16 @@ public Type primitive(Schema primitive) {
182182
} else if (logical instanceof LogicalTypes.TimestampMillis
183183
|| logical instanceof LogicalTypes.TimestampMicros) {
184184
if (AvroSchemaUtil.isTimestamptz(primitive)) {
185-
return Types.TimestampType.withZone();
185+
return Types.TimestampType.microsWithZone();
186186
} else {
187-
return Types.TimestampType.withoutZone();
187+
return Types.TimestampType.microsWithoutZone();
188+
}
189+
190+
} else if (logical instanceof IcebergLogicalTypes.TimestampNanos) {
191+
if (AvroSchemaUtil.isTimestamptz(primitive)) {
192+
return Types.TimestampType.nanosWithZone();
193+
} else {
194+
return Types.TimestampType.nanosWithoutZone();
188195
}
189196

190197
} else if (LogicalTypes.uuid().getName().equals(name)) {

core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java

+25-4
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
4444
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
4545
private static final Schema TIMESTAMPTZ_SCHEMA =
4646
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
47+
private static final Schema TIMESTAMPNS_SCHEMA =
48+
IcebergLogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
49+
private static final Schema TIMESTAMPTZNS_SCHEMA =
50+
IcebergLogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
4751
private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING);
4852
private static final Schema UUID_SCHEMA =
4953
LogicalTypes.uuid().addToSchema(Schema.createFixed("uuid_fixed", null, null, 16));
@@ -52,6 +56,8 @@ class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
5256
static {
5357
TIMESTAMP_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false);
5458
TIMESTAMPTZ_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, true);
59+
TIMESTAMPNS_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false);
60+
TIMESTAMPTZNS_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, true);
5561
}
5662

5763
private final Deque<Integer> fieldIds = Lists.newLinkedList();
@@ -204,10 +210,25 @@ public Schema primitive(Type.PrimitiveType primitive) {
204210
primitiveSchema = TIME_SCHEMA;
205211
break;
206212
case TIMESTAMP:
207-
if (((Types.TimestampType) primitive).shouldAdjustToUTC()) {
208-
primitiveSchema = TIMESTAMPTZ_SCHEMA;
209-
} else {
210-
primitiveSchema = TIMESTAMP_SCHEMA;
213+
Types.TimestampType timestamp = (Types.TimestampType) primitive;
214+
switch (timestamp.unit()) {
215+
case MICROS:
216+
if (timestamp.shouldAdjustToUTC()) {
217+
primitiveSchema = TIMESTAMPTZ_SCHEMA;
218+
} else {
219+
primitiveSchema = TIMESTAMP_SCHEMA;
220+
}
221+
break;
222+
case NANOS:
223+
if (timestamp.shouldAdjustToUTC()) {
224+
primitiveSchema = TIMESTAMPTZNS_SCHEMA;
225+
} else {
226+
primitiveSchema = TIMESTAMPNS_SCHEMA;
227+
}
228+
break;
229+
default:
230+
throw new UnsupportedOperationException(
231+
"Unsupported timestamp unit: " + timestamp.unit());
211232
}
212233
break;
213234
case STRING:

core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java

+15-4
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,21 @@ public static Object convertConstant(Type type, Object value) {
4040
case DATE:
4141
return DateTimeUtil.dateFromDays((Integer) value);
4242
case TIMESTAMP:
43-
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
44-
return DateTimeUtil.timestamptzFromMicros((Long) value);
45-
} else {
46-
return DateTimeUtil.timestampFromMicros((Long) value);
43+
Types.TimestampType timestamp = (Types.TimestampType) type;
44+
switch (timestamp.unit()) {
45+
case MICROS:
46+
if (timestamp.shouldAdjustToUTC()) {
47+
return DateTimeUtil.timestamptzFromMicros((Long) value);
48+
}
49+
return DateTimeUtil.timestampFromMicros((Long) value);
50+
case NANOS:
51+
if (timestamp.shouldAdjustToUTC()) {
52+
return DateTimeUtil.timestamptzFromNanos((Long) value);
53+
}
54+
return DateTimeUtil.timestampFromNanos((Long) value);
55+
default:
56+
throw new UnsupportedOperationException(
57+
"Cannot convert timestamp with unit: " + timestamp.unit());
4758
}
4859
case FIXED:
4960
if (value instanceof GenericData.Fixed) {

0 commit comments

Comments
 (0)