Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support to make up the sequence number to nanosecond precision #1247

Merged
merged 6 commits into from
Jun 19, 2023

Conversation

schnappi17
Copy link
Contributor

Purpose

fix #1050

Taking System.nanotime() as the time source with high precision, make up to the provided sequence.field to nanosecond if it's inTimestamp data type and with the precision of second or millisecond.

Tests

ChangelogWithKeyFileStoreTableTest#testNanosSequenceNumberOnTimestampSecond
ChangelogWithKeyFileStoreTableTest#testNanosSequenceNumberOnTimestampMilliSecond
ChangelogWithKeyFileStoreTableTest#testNanosSequenceNumberOnTimestampMicroSecond
ChangelogWithKeyFileStoreTableTest#testNanosSequenceNumberOnNonTimestampField

@schnappi17 schnappi17 force-pushed the PAIMON-1050 branch 3 times, most recently from 115f5cb to 38ef728 Compare May 27, 2023 18:06
@schnappi17 schnappi17 changed the title Paimon 1050 [core] Support to make up the sequence number to nanosecond precision May 28, 2023
Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just introduce something like:

  1. sequence.auto-gen.second-micro: the user provides a long with precision to the second, and we give the complement to the micro.
  2. sequence.auto-gen.mills-micro: the user provides a long with precision to the mills, and we give the complement to the micro.

This has the advantage that

  1. it can be applied to all types
  2. a long value can only be saved up to micro, not to nanos

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And please add some ut cases for SequenceGenerator.

@JingsongLi
Copy link
Contributor

Or we can introduce a 'sequence.auto-padding' key, valid values are 'second-to-micro' and 'mills-to-micro' and 'none'. Default is none.

@schnappi17
Copy link
Contributor Author

@JingsongLi Updated, thanks for you suggestions~

@@ -68,6 +73,32 @@ public long generate(InternalRow row) {
return generator.generate(row, index);
}

public long generateWithPadding(InternalRow row, CoreOptions.SequenceAutoPadding autoPadding) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can try to finish this in a simpler way:

public long generateWithPadding(InternalRow row, CoreOptions.SequenceAutoPadding autoPadding) {
    switch (autoPadding) {
            case SECOND_TO_MICRO:
                long value = generate(row); 
                // timestamp returns mills
                int second = fieldType.is(DataTypeFamily.TIMESTAMP) ? value / 1000 : value;
                return secondToMicro(second); 
            case MILLIS_TO_MICRO:
                return millsToMicro(generate(row)); 
            default:
                throw new UnsupportedOperationException(
                        "Unknown sequence padding mode " + autoPadding.name());
        }
}

@@ -102,6 +103,20 @@ public SinkRecord writeAndReturn(InternalRow row) throws Exception {
return record;
}

@VisibleForTesting
public T writeAndReturnData(InternalRow row) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not using writeAndReturn and get row from SinkRecord?

@@ -260,6 +260,8 @@ there will be some cases that lead to data disorder. At this time, you can use a
{{< hint info >}}
When the record is updated or deleted, the `sequence.field` must become larger and cannot remain unchanged. For example,
you can use [Mysql Binlog operation time](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#available-metadata) as `sequence.field`.
If the provided `sequence.field` doesn't meet the precision, like a rough second or millisecond, you can set `sequence.auto-padding` to `second-to-micro` or `millis-to-micro` so that the precision of sequence number will be made up to microsecond by system.
Note that only fields of data type `Timestamp`, `Integer` and `BigInt` which indicates a `second` or `millisecond` can be padded by microseconds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can apply this padding to al types.

@schnappi17
Copy link
Contributor Author

@JingsongLi Thanks, your suggestion simplify the implementation for sure, I updated in the latest commit. Why we need the writeAndReturnData is because we can only get the generated sequence number from the origin KeyValue but not SinkRecord. We need to use the generated sequence number to do comparation and make the assertions.

@JingsongLi
Copy link
Contributor

@schnappi17 Please ensure test passing first.

@VisibleForTesting
public T writeAndReturnData(InternalRow row) throws Exception {
keyAndBucketExtractor.setRecord(row);
SinkRecord record =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can create a method: toSinkRecord to reuse code.

@schnappi17
Copy link
Contributor Author

@JingsongLi Updated, and I didn't see any test failed, maybe it's confused by another pr of mine that UT failed, please help to approve the workflows and let's check it, thanks a lot!

expectedResult =
"1|10|101|1685530987|1685530987123|2023-05-23T11:22:33|2023-05-23T11:22:33.123|2023-05-23T11:22:33.123456|a2";
} else {
expectedResult =
Copy link

@lppsuixn lppsuixn Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the situation mentioned in #1050, a2 should always be the final result.
I mean ,the last record should always as final result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the situation mentioned in #1050, a2 should always be the final result.

@lppsuixn In this situation, if you just set the sequence.field without auto-padding, the last record is the final result as expected.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, when two records have the same sequence number, the latter one is not the final result. #1050 is aimed at resolving this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lppsuixn In real world, the current implementation is possible because it is difficult to have two data with the same key in one microsecond at the same time.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit cdf911b into apache:master Jun 19, 2023
@hzjhjjyy
Copy link
Contributor

hzjhjjyy commented Aug 9, 2023

In the current implementation, it's possible for the "padding" of later data's sequence to be smaller than that of the earlier data,resulting in data errors.
for example:
seconds->micros
currentNanoTime1:1062154,766334,200
currentSecondsTime1:1062154
currentNanoTime2:1062155,676231,100
currentSecondsTime2:1062155
sourceMillis:1691482327000
result_sequence:1691482327,766334 and 1691482327,676231(smaller)

Possible solutions?
set an incremental sequence for the sequence.field, incrementing by one for each incoming data?

@lppsuixn
Copy link

lppsuixn commented Aug 9, 2023

In the current implementation, it's possible for the "padding" of later data's sequence to be smaller than that of the earlier data,resulting in data errors. for example: seconds->micros currentNanoTime1:1062154,766334,200 currentSecondsTime1:1062154 currentNanoTime2:1062155,676231,100 currentSecondsTime2:1062155 sourceMillis:1691482327000 result_sequence:1691482327,766334 and 1691482327,676231(smaller)

Possible solutions? set an incremental sequence for the sequence.field, incrementing by one for each incoming data?

You are right. This PR does not solve the problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] When records have same sequence number , the latter one is used as the final result.
4 participants