Skip to content

Commit

Permalink
[core] Support to make up the sequence number to nanosecond precision (
Browse files Browse the repository at this point in the history
  • Loading branch information
schnappi17 authored Jun 19, 2023
1 parent c700db4 commit cdf911b
Show file tree
Hide file tree
Showing 9 changed files with 602 additions and 10 deletions.
1 change: 1 addition & 0 deletions docs/content/concepts/primary-key-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ there will be some cases that lead to data disorder. At this time, you can use a
{{< hint info >}}
When the record is updated or deleted, the `sequence.field` must become larger and cannot remain unchanged. For example,
you can use [Mysql Binlog operation time](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#available-metadata) as `sequence.field`.
If the provided `sequence.field` doesn't meet the precision, like a rough second or millisecond, you can set `sequence.auto-padding` to `second-to-micro` or `millis-to-micro` so that the precision of sequence number will be made up to microsecond by system.
{{< /hint >}}

{{< tabs "sequence.field" >}}
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,12 @@
<td>Long</td>
<td>Optional timestamp used in case of "from-timestamp" scan mode.</td>
</tr>
<tr>
<td><h5>sequence.auto-padding</h5></td>
<td style="word-wrap: break-word;">none</td>
<td><p>Enum</p></td>
<td>Specify the way of padding precision up to micro-second if the provided sequence field is used to indicate "time" but doesn't meet the precise.<br /><br />Possible values:<ul><li>"none": No padding for sequence field.</li><li>"second-to-micro": Pads the sequence field that indicates time with precision of seconds to micro-second.</li><li>"millis-to-micro": Pads the sequence field that indicates time with precision of milli-second to micro-second.</li></ul></td>
</tr>
<tr>
<td><h5>sequence.field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
41 changes: 41 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,14 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");

public static final ConfigOption<SequenceAutoPadding> SEQUENCE_AUTO_PADDING =
key("sequence.auto-padding")
.enumType(SequenceAutoPadding.class)
.defaultValue(SequenceAutoPadding.NONE)
.withDescription(
"Specify the way of padding precision up to micro-second"
+ " if the provided sequence field is used to indicate \"time\" but doesn't meet the precise.");

public static final ConfigOption<StartupMode> SCAN_MODE =
key("scan.mode")
.enumType(StartupMode.class)
Expand Down Expand Up @@ -902,6 +910,10 @@ public Optional<String> sequenceField() {
return options.getOptional(SEQUENCE_FIELD);
}

public SequenceAutoPadding sequenceAutoPadding() {
return options.get(SEQUENCE_AUTO_PADDING);
}

public WriteMode writeMode() {
return options.get(WRITE_MODE);
}
Expand Down Expand Up @@ -1341,4 +1353,33 @@ public InlineElement getDescription() {
return text(description);
}
}

/** Specifies the way of making up time precision for sequence field. */
public enum SequenceAutoPadding implements DescribedEnum {
NONE("none", "No padding for sequence field."),
SECOND_TO_MICRO(
"second-to-micro",
"Pads the sequence field that indicates time with precision of seconds to micro-second."),
MILLIS_TO_MICRO(
"millis-to-micro",
"Pads the sequence field that indicates time with precision of milli-second to micro-second.");

private final String value;
private final String description;

SequenceAutoPadding(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ public TableWriteImpl<KeyValue> newWrite(
.sequenceField()
.map(field -> new SequenceGenerator(field, schema().logicalRowType()))
.orElse(null);
final CoreOptions.SequenceAutoPadding sequenceAutoPadding =
store().options().sequenceAutoPadding();
final KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
store().newWrite(commitUser, manifestFilter),
Expand All @@ -230,7 +232,10 @@ record -> {
long sequenceNumber =
sequenceGenerator == null
? KeyValue.UNKNOWN_SEQUENCE
: sequenceGenerator.generate(record.row());
: sequenceAutoPadding == CoreOptions.SequenceAutoPadding.NONE
? sequenceGenerator.generate(record.row())
: sequenceGenerator.generateWithPadding(
record.row(), sequenceAutoPadding);
return kv.replace(
record.primaryKey(),
sequenceNumber,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.paimon.table.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeDefaultVisitor;
import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DateType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.DoubleType;
Expand All @@ -38,12 +40,14 @@

import javax.annotation.Nullable;

import java.util.concurrent.TimeUnit;

/** Generate sequence number. */
public class SequenceGenerator {

private final int index;

private final Generator generator;
private final DataType fieldType;

public SequenceGenerator(String field, RowType rowType) {
index = rowType.getFieldNames().indexOf(field);
Expand All @@ -52,7 +56,8 @@ public SequenceGenerator(String field, RowType rowType) {
String.format(
"Can not find sequence field %s in table schema: %s", field, rowType));
}
generator = rowType.getTypeAt(index).accept(new SequenceGeneratorVisitor());
fieldType = rowType.getTypeAt(index);
generator = fieldType.accept(new SequenceGeneratorVisitor());
}

public int index() {
Expand All @@ -68,6 +73,37 @@ public long generate(InternalRow row) {
return generator.generate(row, index);
}

public long generateWithPadding(InternalRow row, CoreOptions.SequenceAutoPadding autoPadding) {
switch (autoPadding) {
case SECOND_TO_MICRO:
long value = generate(row);
// timestamp returns millis
long second = fieldType.is(DataTypeFamily.TIMESTAMP) ? value / 1000 : value;
return second * 1_000_000 + getCurrentMicroOfSeconds();
case MILLIS_TO_MICRO:
// Generated value is millis
long millis = generate(row);
return millis * 1_000 + getCurrentMicroOfMillis();
default:
throw new UnsupportedOperationException(
"Unknown sequence padding mode " + autoPadding.name());
}
}

private static long getCurrentMicroOfMillis() {
long currentNanoTime = System.nanoTime();
long mills = TimeUnit.MILLISECONDS.convert(currentNanoTime, TimeUnit.NANOSECONDS);
long microOfMillis = (currentNanoTime - mills * 1_000_000) / 1000;
return microOfMillis;
}

private static long getCurrentMicroOfSeconds() {
long currentNanoTime = System.nanoTime();
long seconds = TimeUnit.SECONDS.convert(currentNanoTime, TimeUnit.NANOSECONDS);
long microOfSecs = (currentNanoTime - seconds * 1_000_000_000) / 1000;
return microOfSecs;
}

private interface Generator {
long generate(InternalRow row, int i);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.table.sink;

import org.apache.paimon.FileStore;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
Expand Down Expand Up @@ -91,17 +92,28 @@ public void write(InternalRow row) throws Exception {
}

public SinkRecord writeAndReturn(InternalRow row) throws Exception {
keyAndBucketExtractor.setRecord(row);
SinkRecord record =
new SinkRecord(
keyAndBucketExtractor.partition(),
keyAndBucketExtractor.bucket(),
keyAndBucketExtractor.trimmedPrimaryKey(),
row);
SinkRecord record = toSinkRecord(row);
write.write(record.partition(), record.bucket(), recordExtractor.extract(record));
return record;
}

@VisibleForTesting
public T writeAndReturnData(InternalRow row) throws Exception {
SinkRecord record = toSinkRecord(row);
T data = recordExtractor.extract(record);
write.write(record.partition(), record.bucket(), data);
return data;
}

private SinkRecord toSinkRecord(InternalRow row) throws Exception {
keyAndBucketExtractor.setRecord(row);
return new SinkRecord(
keyAndBucketExtractor.partition(),
keyAndBucketExtractor.bucket(),
keyAndBucketExtractor.trimmedPrimaryKey(),
row);
}

public SinkRecord toLogRecord(SinkRecord record) {
keyAndBucketExtractor.setRecord(record.row());
return new SinkRecord(
Expand Down
Loading

0 comments on commit cdf911b

Please sign in to comment.