Skip to content

Commit 3250ba3

Browse files
committed
Handle ntz in other formats as well
1 parent eafaf5e commit 3250ba3

File tree

4 files changed

+32
-9
lines changed

4 files changed

+32
-9
lines changed

xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,13 @@ private InternalSchema toInternalSchema(
160160
metadata.put(
161161
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
162162
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
163-
newDataType = InternalType.TIMESTAMP_NTZ;
163+
// TODO: Hudi 0.x writes INT64 in parquet, TimestampNTZType support added in 1.x
164+
newDataType = InternalType.LONG;
164165
metadata.put(
165166
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS);
166167
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
167-
newDataType = InternalType.TIMESTAMP_NTZ;
168+
// TODO: Hudi 0.x writes INT64 in parquet, TimestampNTZType support added in 1.x
169+
newDataType = InternalType.LONG;
168170
metadata.put(
169171
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
170172
} else {
@@ -350,6 +352,22 @@ private Schema fromInternalSchema(InternalSchema internalSchema, String currentP
350352
case INT:
351353
return finalizeSchema(Schema.create(Schema.Type.INT), internalSchema);
352354
case LONG:
355+
if (internalSchema.getMetadata() != null
356+
&& internalSchema
357+
.getMetadata()
358+
.containsKey(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)) {
359+
if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
360+
== InternalSchema.MetadataValue.MILLIS) {
361+
return finalizeSchema(
362+
LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)),
363+
internalSchema);
364+
}
365+
{
366+
return finalizeSchema(
367+
LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)),
368+
internalSchema);
369+
}
370+
}
353371
return finalizeSchema(Schema.create(Schema.Type.LONG), internalSchema);
354372
case STRING:
355373
return finalizeSchema(Schema.create(Schema.Type.STRING), internalSchema);

xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) {
173173
case INT:
174174
return Types.IntegerType.get();
175175
case LONG:
176-
case TIMESTAMP_NTZ: // TODO - revisit this
177176
return Types.LongType.get();
178177
case BYTES:
179178
return Types.BinaryType.get();
@@ -189,6 +188,8 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) {
189188
return Types.DateType.get();
190189
case TIMESTAMP:
191190
return Types.TimestampType.withZone();
191+
case TIMESTAMP_NTZ:
192+
return Types.TimestampType.withoutZone();
192193
case DOUBLE:
193194
return Types.DoubleType.get();
194195
case DECIMAL:

xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ public void testAvroLogicalTypes() {
576576
.schema(
577577
InternalSchema.builder()
578578
.name("long")
579-
.dataType(InternalType.TIMESTAMP_NTZ)
579+
.dataType(InternalType.LONG)
580580
.isNullable(false)
581581
.metadata(millisMetadata)
582582
.build())
@@ -586,7 +586,7 @@ public void testAvroLogicalTypes() {
586586
.schema(
587587
InternalSchema.builder()
588588
.name("long")
589-
.dataType(InternalType.TIMESTAMP_NTZ)
589+
.dataType(InternalType.LONG)
590590
.isNullable(false)
591591
.metadata(microsMetadata)
592592
.build())

xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -501,14 +501,18 @@ public void testTimestamps() {
501501
1, "requiredTimestampMillis", Types.TimestampType.withZone()),
502502
Types.NestedField.optional(
503503
2, "optionalTimestampMillis", Types.TimestampType.withZone()),
504-
Types.NestedField.required(3, "requiredTimestampNtzMillis", Types.LongType.get()),
505-
Types.NestedField.optional(4, "optionalTimestampNtzMillis", Types.LongType.get()),
504+
Types.NestedField.required(
505+
3, "requiredTimestampNtzMillis", Types.TimestampType.withoutZone()),
506+
Types.NestedField.optional(
507+
4, "optionalTimestampNtzMillis", Types.TimestampType.withoutZone()),
506508
Types.NestedField.required(
507509
5, "requiredTimestampMicros", Types.TimestampType.withZone()),
508510
Types.NestedField.optional(
509511
6, "optionalTimestampMicros", Types.TimestampType.withZone()),
510-
Types.NestedField.required(7, "requiredTimestampNtzMicros", Types.LongType.get()),
511-
Types.NestedField.optional(8, "optionalTimestampNtzMicros", Types.LongType.get()));
512+
Types.NestedField.required(
513+
7, "requiredTimestampNtzMicros", Types.TimestampType.withoutZone()),
514+
Types.NestedField.optional(
515+
8, "optionalTimestampNtzMicros", Types.TimestampType.withoutZone()));
512516
assertTrue(expectedTargetSchema.sameSchema(SCHEMA_EXTRACTOR.toIceberg(irSchema)));
513517

514518
Schema sourceSchema =

0 commit comments

Comments
 (0)