-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-20127: Add new methods to StateSerdes #21411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
| * @param rawKey the key as raw bytes | ||
| * @return the key as typed object | ||
| */ | ||
| @Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does that make sense to replace this deprecated method with the newly introduced one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose that we should add @SupressWarning and replace them in follow-up PRs.
82f39db to
847d808
Compare
# Conflicts: # streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
847d808 to
d5fdfb2
Compare
| @@ -149,26 +151,49 @@ public String topic() { | |||
| * @param rawKey the key as raw bytes | |||
| * @return the key as typed object | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add:
@deprecated Since 4.3. Use {@link #keyFrom(byte[], Headers} instead.
Similar elsewhere.
| * @param rawValue the value as raw bytes | ||
| * @return the value as typed object | ||
| */ | ||
| public V valueFrom(final byte[] rawValue, Headers headers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing final
| * @return the value as typed object | ||
| */ | ||
| public V valueFrom(final byte[] rawValue, Headers headers) { | ||
| return valueSerde.deserializer().deserialize(topic, headers, Utils.wrapNullable(rawValue)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to add Utils.wrapNullable -- we don't use it in existing valueFrom(...) either?
| * @return the serialized key | ||
| */ | ||
| public byte[] rawKey(final K key, final Headers headers) { | ||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a lot of code duplication -- should we update existing rawKey(...) to just call this new one, passing in new RecordHeaders() object instead?
| * @return the serialized value | ||
| */ | ||
| @SuppressWarnings("rawtypes") | ||
| public byte[] rawValue(final V value, final Headers headers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question.
| } | ||
|
|
||
| @Test | ||
| public void shouldDeserializeValueWithNullHeaders() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
| } | ||
|
|
||
| @Test | ||
| public void shouldSerializeKeyWithEmptyHeaders() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case should be covered implicitly (if we let rawKey(key) call rawKey(key, new RecordHeaders())) via the existing test for rawKey(key). So might be redundant?
| } | ||
|
|
||
| @Test | ||
| public void shouldSerializeValueWithEmptyHeaders() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant as above?
| } | ||
|
|
||
| @Test | ||
| public void shouldThrowIfIncompatibleSerdeForKeyWithHeaders() throws ClassNotFoundException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
| } | ||
|
|
||
| @Test | ||
| public void shouldThrowIfIncompatibleSerdeForValueWithHeaders() throws ClassNotFoundException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
JIRA: KAFKA-20127
This PR is part of implementation of KIP-1271, adding new methods to
enable storing headers in state store.