Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Bug Fixes

1. [#148](https://github.com/InfluxCommunity/influxdb3-java/pull/148): InfluxDB Edge (OSS) error handling
1. [#153](https://github.com/InfluxCommunity/influxdb3-java/pull/153): Parsing timestamp columns

## 0.8.0 [2024-06-24]

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/influxdb/v3/client/Point.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public Point setMeasurement(@Nonnull final String measurement) {
}

/**
* Get timestamp. Can be null if not set.
* Get timestamp in nanoseconds. If the timestamp is not set, returns null.
*
* @return timestamp or null
*/
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/influxdb/v3/client/PointValues.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public PointValues setMeasurement(@Nonnull final String measurement) {
}

/**
* Get timestamp. Can be null if not set.
* Get timestamp in nanoseconds. If the timestamp is not set, returns null.
*
* @return timestamp or null
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import java.time.ZoneOffset;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.Text;

import com.influxdb.v3.client.PointValues;
Expand Down Expand Up @@ -79,7 +82,7 @@ PointValues toPointValues(final int rowNumber,

if (metaType == null) {
if (Objects.equals(name, "time") && (value instanceof Long || value instanceof LocalDateTime)) {
setTimestamp(value, p);
setTimestamp(value, schema, p);
} else {
// just push as field If you don't know what type is it
p.setField(name, value);
Expand All @@ -96,15 +99,39 @@ PointValues toPointValues(final int rowNumber,
} else if ("tag".equals(valueType) && value instanceof String) {
p.setTag(name, (String) value);
} else if ("timestamp".equals(valueType)) {
setTimestamp(value, p);
setTimestamp(value, schema, p);
}
}
return p;
}

private void setTimestamp(@Nonnull final Object value, @Nonnull final PointValues pointValues) {
private void setTimestamp(@Nonnull final Object value,
@Nonnull final Field schema,
@Nonnull final PointValues pointValues) {
if (value instanceof Long) {
pointValues.setTimestamp(Instant.ofEpochMilli((Long) value));
if (schema.getFieldType().getType() instanceof ArrowType.Timestamp) {
ArrowType.Timestamp type = (ArrowType.Timestamp) schema.getFieldType().getType();
TimeUnit timeUnit;
switch (type.getUnit()) {
case SECOND:
timeUnit = TimeUnit.SECONDS;
break;
case MILLISECOND:
timeUnit = TimeUnit.MILLISECONDS;
break;
case MICROSECOND:
timeUnit = TimeUnit.MICROSECONDS;
break;
default:
case NANOSECOND:
timeUnit = TimeUnit.NANOSECONDS;
break;
}
long nanoseconds = TimeUnit.NANOSECONDS.convert((Long) value, timeUnit);
pointValues.setTimestamp(Instant.ofEpochSecond(0, nanoseconds));
} else {
pointValues.setTimestamp(Instant.ofEpochMilli((Long) value));
}
} else if (value instanceof LocalDateTime) {
pointValues.setTimestamp(((LocalDateTime) value).toInstant(ZoneOffset.UTC));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.TimeMilliVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.VarCharVector;
Expand Down Expand Up @@ -59,7 +60,18 @@ void timestampAsArrowTime() {
}

@Test
void timestampAsArrowTimestamp() {
void timestampAsArrowTimestampSecond() {
try (VectorSchemaRoot root = createTimeVector(45_678, new ArrowType.Timestamp(TimeUnit.SECOND, "UTC"))) {

PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors());

BigInteger expected = BigInteger.valueOf(45_678L * 1_000_000_000);
Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected);
}
}

@Test
void timestampAsArrowTimestampMillisecond() {
try (VectorSchemaRoot root = createTimeVector(45_678, new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"))) {

PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors());
Expand All @@ -69,6 +81,50 @@ void timestampAsArrowTimestamp() {
}
}

@Test
void timestampAsArrowTimestampMicrosecond() {
try (VectorSchemaRoot root = createTimeVector(45_678, new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"))) {

PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors());

BigInteger expected = BigInteger.valueOf(45_678L * 1_000);
Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected);
}
}

@Test
void timestampAsArrowTimestampNanosecond() {
try (VectorSchemaRoot root = createTimeVector(45_678, new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"))) {

PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors());

BigInteger expected = BigInteger.valueOf(45_678L);
Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected);
}
}

@Test
void timestampAsArrowTimestampNanosecondWithoutTimezone() {
try (VectorSchemaRoot root = createTimeVector(45_978, new ArrowType.Timestamp(TimeUnit.NANOSECOND, null))) {

PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors());

BigInteger expected = BigInteger.valueOf(45_978L);
Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected);
}
}

@Test
void timestampAsArrowInt() {
try (VectorSchemaRoot root = createTimeVector(45_678, new ArrowType.Int(64, true))) {

PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors());

BigInteger expected = BigInteger.valueOf(45_678L * 1_000_000);
Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected);
}
}

@Test
void timestampWithoutMetadataAndFieldWithoutMetadata() {
FieldType timeType = new FieldType(true, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null);
Expand Down Expand Up @@ -144,6 +200,8 @@ private VectorSchemaRoot createTimeVector(final int timeValue, final ArrowType t
((TimeMilliVector) timeVector).setSafe(0, timeValue);
} else if (timeVector instanceof TimeStampVector) {
((TimeStampVector) timeVector).setSafe(0, timeValue);
} else if (timeVector instanceof BigIntVector) {
((BigIntVector) timeVector).setSafe(0, timeValue);
} else {
throw new RuntimeException("Unexpected vector type: " + timeVector.getClass().getName());
}
Expand Down