-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Support to make up the sequence number to nanosecond precision #1247
Conversation
115f5cb
to
38ef728
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just introduce something like:
- sequence.auto-gen.second-micro: the user provides a long with precision to the second, and we give the complement to the micro.
- sequence.auto-gen.mills-micro: the user provides a long with precision to the mills, and we give the complement to the micro.
This has the advantage that
- it can be applied to all types
- a long value can only be saved up to micro, not to nanos
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And please add some ut cases for SequenceGenerator
.
Or we can introduce a 'sequence.auto-padding' key, valid values are 'second-to-micro' and 'mills-to-micro' and 'none'. Default is none. |
@JingsongLi Updated, thanks for you suggestions~ |
@@ -68,6 +73,32 @@ public long generate(InternalRow row) { | |||
return generator.generate(row, index); | |||
} | |||
|
|||
public long generateWithPadding(InternalRow row, CoreOptions.SequenceAutoPadding autoPadding) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can try to finish this in a simpler way:
public long generateWithPadding(InternalRow row, CoreOptions.SequenceAutoPadding autoPadding) {
switch (autoPadding) {
case SECOND_TO_MICRO:
long value = generate(row);
// timestamp returns mills
int second = fieldType.is(DataTypeFamily.TIMESTAMP) ? value / 1000 : value;
return secondToMicro(second);
case MILLIS_TO_MICRO:
return millsToMicro(generate(row));
default:
throw new UnsupportedOperationException(
"Unknown sequence padding mode " + autoPadding.name());
}
}
@@ -102,6 +103,20 @@ public SinkRecord writeAndReturn(InternalRow row) throws Exception { | |||
return record; | |||
} | |||
|
|||
@VisibleForTesting | |||
public T writeAndReturnData(InternalRow row) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not using writeAndReturn
and get row from SinkRecord
?
@@ -260,6 +260,8 @@ there will be some cases that lead to data disorder. At this time, you can use a | |||
{{< hint info >}} | |||
When the record is updated or deleted, the `sequence.field` must become larger and cannot remain unchanged. For example, | |||
you can use [Mysql Binlog operation time](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#available-metadata) as `sequence.field`. | |||
If the provided `sequence.field` doesn't meet the precision, like a rough second or millisecond, you can set `sequence.auto-padding` to `second-to-micro` or `millis-to-micro` so that the precision of sequence number will be made up to microsecond by system. | |||
Note that only fields of data type `Timestamp`, `Integer` and `BigInt` which indicates a `second` or `millisecond` can be padded by microseconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can apply this padding to al types.
@JingsongLi Thanks, your suggestion simplify the implementation for sure, I updated in the latest commit. Why we need the |
@schnappi17 Please ensure test passing first. |
@VisibleForTesting | ||
public T writeAndReturnData(InternalRow row) throws Exception { | ||
keyAndBucketExtractor.setRecord(row); | ||
SinkRecord record = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can create a method: toSinkRecord
to reuse code.
@JingsongLi Updated, and I didn't see any test failed, maybe it's confused by another pr of mine that UT failed, please help to approve the workflows and let's check it, thanks a lot! |
expectedResult = | ||
"1|10|101|1685530987|1685530987123|2023-05-23T11:22:33|2023-05-23T11:22:33.123|2023-05-23T11:22:33.123456|a2"; | ||
} else { | ||
expectedResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the situation mentioned in #1050, a2 should always be the final result.
I mean ,the last record should always as final result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, when two records have the same sequence number, the latter one is not the final result. #1050 is aimed at resolving this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lppsuixn In real world, the current implementation is possible because it is difficult to have two data with the same key in one microsecond at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
In the current implementation, it's possible for the "padding" of later data's sequence to be smaller than that of the earlier data,resulting in data errors. Possible solutions? |
You are right. This PR does not solve the problem. |
Purpose
fix #1050
Taking
System.nanotime()
as the time source with high precision, make up to the providedsequence.field
to nanosecond if it's inTimestamp
data type and with the precision of second or millisecond.Tests
ChangelogWithKeyFileStoreTableTest#testNanosSequenceNumberOnTimestampSecond
ChangelogWithKeyFileStoreTableTest#testNanosSequenceNumberOnTimestampMilliSecond
ChangelogWithKeyFileStoreTableTest#testNanosSequenceNumberOnTimestampMicroSecond
ChangelogWithKeyFileStoreTableTest#testNanosSequenceNumberOnNonTimestampField