Skip to content

RATIS-2018. Zero-copy buffers are not released - 2nd chunk #1032

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 30, 2024

Conversation

duongkame
Copy link
Contributor

@duongkame duongkame commented Jan 28, 2024

What changes were proposed in this pull request?

Zero-copy buffers are not released - 2nd chunk, including fixes in:

  1. Sliding window: requests being replied as will NEVER be processed, requests coming with seqNum < nextToProcess.
  2. appendEntries: avoid releasing at whenComplete.
  3. Streaming request: pending requests retained must be released after completing.
  4. Log entries being purged (after snapshot) must be released.

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/RATIS-2018

How was this patch tested?

CI

@duongkame duongkame marked this pull request as ready for review January 28, 2024 21:47
@duongkame
Copy link
Contributor Author

@szetszwo

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@duongkame , thanks for working on this! Please see the comments inlined.

@@ -117,7 +117,13 @@ public V retain() {

@Override
public boolean release() {
return fromRefs.stream().map(ReferenceCountedObject::release).allMatch(r -> r);
boolean allReleased = true;
for (ReferenceCountedObject ref : fromRefs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add type parameter:

        for (ReferenceCountedObject<T> ref : fromRefs) {

Comment on lines 464 to 466
final boolean isRetry = requests.putIfAbsent(request);
LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
if (isRetry || request.getSeqNum() < nextToProcess) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If request.getSeqNum() < nextToProcess, the request should not be added then.

    public synchronized void receivedRequest(REQUEST request, Consumer<REQUEST> processingMethod) {
      final long seqNum = request.getSeqNum();
      final boolean accepted;
      if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
        nextToProcess = seqNum;
        requests.putNewRequest(request);
        LOG.debug("Received seq={} (first request), {}", seqNum, this);
        accepted = true;
      } else if (request.getSeqNum() < nextToProcess) {
        LOG.debug("Received seq={} < nextToProcess {}, {}", seqNum, nextToProcess, this);
        accepted = false;
      } else {
        final boolean isRetry = requests.putIfAbsent(request);
        LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
        accepted = !isRetry;
      }

      if (accepted) {
        processRequestsFromHead(processingMethod);
      } else {
        request.release();
      }
    }

@@ -320,11 +320,13 @@ void processClientRequest(ReferenceCountedObject<RaftClientRequest> requestRef)

final CompletableFuture<Void> f = processClientRequest(requestRef, reply -> {
if (!reply.isSuccess()) {
LOG.info("Failed {}, reply={}", request, reply);
LOG.info("Failed {}, reply={}", request.toStringShort(), reply);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot use request here

      final SlidingWindowEntry slidingWindowEntry = request.getSlidingWindowEntry();
      final CompletableFuture<Void> f = processClientRequest(requestRef, reply -> {
        if (!reply.isSuccess()) {
          LOG.info("Failed request cid={}, {}, reply={}", callId, slidingWindowEntry, reply);
        }
        final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply);
        responseNext(proto);
      });

Comment on lines 54 to 56
private final REQUEST request;
private final ReferenceCountedObject<REQUEST> requestRef;
private AtomicBoolean released = new AtomicBoolean(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's combine these three fields:

  static class PendingServerRequest<REQUEST> {
    private final AtomicReference<ReferenceCountedObject<REQUEST>> requestRef;
    private final CompletableFuture<Void> future = new CompletableFuture<>();

    PendingServerRequest(ReferenceCountedObject<REQUEST> requestRef) {
      requestRef.retain();
      this.requestRef = new AtomicReference<>(requestRef);
    }

    REQUEST getRequest() {
      return Optional.ofNullable(requestRef.get())
          .map(ReferenceCountedObject::get)
          .orElse(null);
    }

    CompletableFuture<Void> getFuture() {
      return future;
    }

    void release() {
      Optional.ofNullable(requestRef.getAndSet(null))
          .ifPresent(ReferenceCountedObject::release);
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@szetszwo
Copy link
Contributor

One more comments: The line above should not use request since it may be released. We should pass callId and isHeartbeat; see https://issues.apache.org/jira/secure/attachment/13066333/1032_review.patch

@duongkame
Copy link
Contributor Author

Thanks for the detailed review, @szetszwo. We should completely avoid using requests in the asynchronous contexts (reply, error handler) as you suggest.

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 the change looks good.

@szetszwo szetszwo merged commit c4b10fa into apache:master Jan 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants