-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values() #9007
KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values() #9007
Conversation
@cmccabe, @omkreddy, @mimaison, @dajac, @dongjinleekr since you all voted on the KIP feel free to review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tombentley Thanks for the PR. It looks good overall. I have left few comments.
return "LogDirDescription{" + | ||
"replicaInfos=" + replicaInfos + | ||
", error=" + error + | ||
'}'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We usually use parenthesis instead of curly braces.
return "ReplicaInfo{" + | ||
"size=" + size + | ||
", offsetLag=" + offsetLag + | ||
", isFuture=" + isFuture + | ||
'}'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: curly braces instead of parenthesis.
@@ -64,7 +63,7 @@ object LogDirsCommand { | |||
"logDirs" -> logDirInfos.map { case (logDir, logDirInfo) => | |||
Map( | |||
"logDir" -> logDir, | |||
"error" -> logDirInfo.error.exceptionName(), | |||
"error" -> Option(logDirInfo.error).flatMap(ex => Some(ex.getClass.getName)).orNull, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can't we use map
instead of flatMap
and remove the Some
?
HashMap<String, LogDirDescription> result = new HashMap<>(response.data().results().size()); | ||
for (DescribeLogDirsResponseData.DescribeLogDirsResult logDirResult : response.data().results()) { | ||
Map<TopicPartition, ReplicaInfo> replicaInfoMap = new HashMap<>(); | ||
if (logDirResult.topics() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: topics
is not nullable in the protocol so it should never be null
, does it?
TopicPartition tp = replicaInfoEntry.getKey(); | ||
DescribeLogDirsResponse.ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); | ||
ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); | ||
ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); | ||
if (replicaLogDirInfo == null) { | ||
handleFailure(new IllegalStateException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to your PR but this look weird. It seems that we fail all the futures if an unexpected replica is provided by the broker in the response. I think that we should log a warning when this happen like we do in the other methods (e.g. createTopics). What do you think?
Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions(); | ||
assertEquals(Collections.singleton(0), descriptions.keySet()); | ||
assertNotNull(descriptions.get(0)); | ||
assertEquals(Collections.singleton("/var/data/kafka"), descriptions.get(0).get().keySet()); | ||
assertNull(descriptions.get(0).get().get("/var/data/kafka").error()); | ||
assertEquals(Collections.singleton(tp), descriptions.get(0).get().get("/var/data/kafka").replicaInfos().keySet()); | ||
assertEquals(1234567890, descriptions.get(0).get().get("/var/data/kafka").replicaInfos().get(tp).size()); | ||
assertEquals(0, descriptions.get(0).get().get("/var/data/kafka").replicaInfos().get(tp).offsetLag()); | ||
assertFalse(descriptions.get(0).get().get("/var/data/kafka").replicaInfos().get(tp).isFuture()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These blocks of assertions are quite hard to read. Can we try to make them more digestable? We could perhaps extract temporary variable to reduce the number of .get()
. We could also define an verifyDescription
helper that verify a LogDirDescription
for instance. It may be worth having dedicated unit tests for the new and the old APIs as well.
case class ReplicaInfo(size: Long, offsetLag: Long, isFuture: Boolean) | ||
case class LogDirInfo(error: Errors, replicaInfos: Map[TopicPartition, ReplicaInfo]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a pity that we have to redefine there classes here. Couldn't we update the test to work with the plain response instead?
Thats for the review @dajac. I've addressed your comments. |
@@ -64,7 +63,7 @@ object LogDirsCommand { | |||
"logDirs" -> logDirInfos.map { case (logDir, logDirInfo) => | |||
Map( | |||
"logDir" -> logDir, | |||
"error" -> logDirInfo.error.exceptionName(), | |||
"error" -> Option(logDirInfo.error).map(ex => ex.getClass.getName).orNull, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the Option
way. 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. One minor change! 😃
* possibly some other exception if there were problems describing the log directory | ||
* or null if the directory is online. | ||
*/ | ||
public ApiException error() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this? (in consistency with current DescribeLogDirsResponse.LogDirInfo
):
Returns `ApiException` if the log directory is offline or an error occurred. If not, returns null.
<p><ul>
<li> KafkaStorageException - The log directory is offline.
<li> UnknownServerException - The server experienced an unexpected error when processing the request.
</ul><p>
(Description of UnknownServerException
was from Errors.UNKNOWN_SERVER_ERROR
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with the other Javadoc it seems like we should use a single <p>
only when between the paragraphs. Please remove the <p>
tags. (Sorry, I was also confused.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…alues() As per KIP-621. Also added some tests in KafkaAdminClientTest
33a9413
to
ba2160c
Compare
@dongjinleekr thanks for the review, amended. Also rebased for conflict. |
private final Map<TopicPartition, ReplicaInfo> replicaInfos; | ||
private final ApiException error; | ||
|
||
public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to have package-private visibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially thought the same, but TopicDescription
, for example (as well as other classes accessible from *Results
classes) have a public constructors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's great point. At the moment, I think that we are not consistent about this. Some are package private and some are not. The advantage of keeping it public is that it allows to use the class in unit tests which resides in other packages.
@@ -95,6 +76,7 @@ public static DescribeLogDirsResponse parse(ByteBuffer buffer, short version) { | |||
* KAFKA_STORAGE_ERROR (56) | |||
* UNKNOWN (-1) | |||
*/ | |||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it need comment to describe the replacement? for example
@deprecated Deprecated Since Kafka 2.7. Use {@link LogDirDescription}.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tombentley Thanks for the updated PR. I have left some more comments.
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Show resolved
Hide resolved
private final Map<TopicPartition, ReplicaInfo> replicaInfos; | ||
private final ApiException error; | ||
|
||
public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's great point. At the moment, I think that we are not consistent about this. Some are package private and some are not. The advantage of keeping it public is that it allows to use the class in unit tests which resides in other packages.
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Outdated
Show resolved
Hide resolved
assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet()); | ||
assertNull(descriptionsMap.get("/var/data/kafka").error()); | ||
Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get("/var/data/kafka").replicaInfos(); | ||
assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet()); | ||
assertEquals(1234567890, descriptionsReplicaInfos.get(tp).size()); | ||
assertEquals(0, descriptionsReplicaInfos.get(tp).offsetLag()); | ||
assertFalse(descriptionsReplicaInfos.get(tp).isFuture()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block of assertions is used multiple times. Would it make sense to extract it in a helper method, say assertDescriptions
, that verifies a descriptions map contains the information about a single log dir/topic partition?
Something like assertDescriptionContains(descriptionsMap, logDir, tp, size, offsetLag, isFuture)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might have less mileage than you expected because the different types mean we need two methods each with two call sites, rather than 4 call sites for a single method, but I've done it anyway.
} | ||
|
||
@Test | ||
public void testDescribeReplicaLogDirs() throws ExecutionException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not due to your PR but shall we add a unit test which uses multiple brokers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added it to the existing test. Due to the new helper methods I felt this didn't really complicate the test very much and is also allows us to cover the case where the RPC returns STORAGE_ERROR
.
* possibly some other exception if there were problems describing the log directory | ||
* or null if the directory is online. | ||
*/ | ||
public ApiException error() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with the other Javadoc it seems like we should use a single <p>
only when between the paragraphs. Please remove the <p>
tags. (Sorry, I was also confused.)
@dongjinleekr ah, thank you, I hadn't noticed that that was the norm. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tombentley Thanks for the update. LGTM pending jenkins.
ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tombentley. I left a few comments
@@ -2306,6 +2310,22 @@ void handleFailure(Throwable throwable) { | |||
return new DescribeLogDirsResult(new HashMap<>(futures)); | |||
} | |||
|
|||
private Map<String, LogDirDescription> logDirDescriptions(DescribeLogDirsResponse response) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be static. Also should we keep it in DescribeLogDirsResponse
?
@@ -2306,6 +2310,22 @@ void handleFailure(Throwable throwable) { | |||
return new DescribeLogDirsResult(new HashMap<>(futures)); | |||
} | |||
|
|||
private Map<String, LogDirDescription> logDirDescriptions(DescribeLogDirsResponse response) { | |||
HashMap<String, LogDirDescription> result = new HashMap<>(response.data().results().size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The left side can be Map
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception { | |||
} | |||
} | |||
|
|||
private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be static
prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false)); | ||
} | ||
|
||
private List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be static
.setOffsetLag(offsetLag)))); | ||
} | ||
|
||
private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be static
} | ||
} | ||
|
||
private void assertDescriptionContains(Map<String, LogDirDescription> descriptionsMap, String logDir, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be static
} | ||
|
||
@SuppressWarnings("deprecation") | ||
private void assertDescriptionContains(Map<String, DescribeLogDirsResponse.LogDirInfo> descriptionsMap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be static
@mimaison done, thanks. |
retest this please |
ok to test |
Failures look unrelated:
|
@tombentley Congratulations! ㊗️ @omkreddy @mimaison Thanks again for the detailed review, as usual! 😃 |
As per KIP-621. Also added some tests in KafkaAdminClientTest