-
Notifications
You must be signed in to change notification settings - Fork 429
RATIS-2018. Zero-copy buffers are not released - 2nd chunk #1032
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@duongkame , thanks for working on this! Please see the comments inlined.
@@ -117,7 +117,13 @@ public V retain() { | |||
|
|||
@Override | |||
public boolean release() { | |||
return fromRefs.stream().map(ReferenceCountedObject::release).allMatch(r -> r); | |||
boolean allReleased = true; | |||
for (ReferenceCountedObject ref : fromRefs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add type parameter:
for (ReferenceCountedObject<T> ref : fromRefs) {
final boolean isRetry = requests.putIfAbsent(request); | ||
LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this); | ||
if (isRetry || request.getSeqNum() < nextToProcess) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If request.getSeqNum() < nextToProcess
, the request should not be added then.
public synchronized void receivedRequest(REQUEST request, Consumer<REQUEST> processingMethod) {
final long seqNum = request.getSeqNum();
final boolean accepted;
if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
nextToProcess = seqNum;
requests.putNewRequest(request);
LOG.debug("Received seq={} (first request), {}", seqNum, this);
accepted = true;
} else if (request.getSeqNum() < nextToProcess) {
LOG.debug("Received seq={} < nextToProcess {}, {}", seqNum, nextToProcess, this);
accepted = false;
} else {
final boolean isRetry = requests.putIfAbsent(request);
LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
accepted = !isRetry;
}
if (accepted) {
processRequestsFromHead(processingMethod);
} else {
request.release();
}
}
@@ -320,11 +320,13 @@ void processClientRequest(ReferenceCountedObject<RaftClientRequest> requestRef) | |||
|
|||
final CompletableFuture<Void> f = processClientRequest(requestRef, reply -> { | |||
if (!reply.isSuccess()) { | |||
LOG.info("Failed {}, reply={}", request, reply); | |||
LOG.info("Failed {}, reply={}", request.toStringShort(), reply); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot use request
here
final SlidingWindowEntry slidingWindowEntry = request.getSlidingWindowEntry();
final CompletableFuture<Void> f = processClientRequest(requestRef, reply -> {
if (!reply.isSuccess()) {
LOG.info("Failed request cid={}, {}, reply={}", callId, slidingWindowEntry, reply);
}
final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply);
responseNext(proto);
});
private final REQUEST request; | ||
private final ReferenceCountedObject<REQUEST> requestRef; | ||
private AtomicBoolean released = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's combine these three fields:
static class PendingServerRequest<REQUEST> {
private final AtomicReference<ReferenceCountedObject<REQUEST>> requestRef;
private final CompletableFuture<Void> future = new CompletableFuture<>();
PendingServerRequest(ReferenceCountedObject<REQUEST> requestRef) {
requestRef.retain();
this.requestRef = new AtomicReference<>(requestRef);
}
REQUEST getRequest() {
return Optional.ofNullable(requestRef.get())
.map(ReferenceCountedObject::get)
.orElse(null);
}
CompletableFuture<Void> getFuture() {
return future;
}
void release() {
Optional.ofNullable(requestRef.getAndSet(null))
.ifPresent(ReferenceCountedObject::release);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
ratis/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java Line 175 in 66d7d39
One more comments: The line above should not use request since it may be released. We should pass |
Thanks for the detailed review, @szetszwo. We should completely avoid using requests in the asynchronous contexts (reply, error handler) as you suggest. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 the change looks good.
What changes were proposed in this pull request?
Zero-copy buffers are not released - 2nd chunk, including fixes in:
will NEVER be processed
, requests coming with seqNum < nextToProcess.whenComplete
.What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/RATIS-2018
How was this patch tested?
CI