-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[multistage] Fix Leaks in Mailbox #10322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
339feb6
32cf0e8
f3d7009
37f91fc
1abaa53
7eeec9f
d452189
cd6c8d3
0bfdf70
384ca5b
bb59e27
82d73cc
93da2a8
35b3bad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,44 +18,53 @@ | |
| */ | ||
| package org.apache.pinot.query.mailbox; | ||
|
|
||
| import io.grpc.Status; | ||
| import io.grpc.stub.StreamObserver; | ||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.function.Consumer; | ||
| import javax.annotation.Nullable; | ||
| import org.apache.pinot.common.datablock.DataBlock; | ||
| import org.apache.pinot.common.datablock.DataBlockUtils; | ||
| import org.apache.pinot.common.datablock.MetadataBlock; | ||
| import org.apache.pinot.common.proto.Mailbox; | ||
| import org.apache.pinot.common.proto.Mailbox.MailboxContent; | ||
| import org.apache.pinot.query.mailbox.channel.ChannelUtils; | ||
| import org.apache.pinot.query.mailbox.channel.MailboxContentStreamObserver; | ||
| import org.apache.pinot.query.runtime.blocks.TransferableBlock; | ||
| import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
||
| /** | ||
| * GRPC implementation of the {@link ReceivingMailbox}. | ||
| * GRPC implementation of the {@link ReceivingMailbox}. This mailbox doesn't hold any resources upon creation. | ||
| * Instead an explicit {@link #init} call is made when the sender sends the first data-block which attaches | ||
| * references to the {@link StreamObserver} to this mailbox. | ||
| */ | ||
| public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock> { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(GrpcReceivingMailbox.class); | ||
| private static final long DEFAULT_MAILBOX_INIT_TIMEOUT = 100L; | ||
| private final String _mailboxId; | ||
| private Consumer<MailboxIdentifier> _gotMailCallback; | ||
| private final Consumer<MailboxIdentifier> _gotMailCallback; | ||
| private final CountDownLatch _initializationLatch; | ||
| private final AtomicInteger _totalMsgReceived = new AtomicInteger(0); | ||
|
|
||
| private MailboxContentStreamObserver _contentStreamObserver; | ||
| private StreamObserver<Mailbox.MailboxStatus> _statusStreamObserver; | ||
|
|
||
| public GrpcReceivingMailbox(String mailboxId, Consumer<MailboxIdentifier> gotMailCallback) { | ||
| _mailboxId = mailboxId; | ||
| _gotMailCallback = gotMailCallback; | ||
| _initializationLatch = new CountDownLatch(1); | ||
| } | ||
|
|
||
| public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver streamObserver) { | ||
| public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver streamObserver, | ||
| StreamObserver<Mailbox.MailboxStatus> statusStreamObserver) { | ||
| if (_initializationLatch.getCount() > 0) { | ||
| _contentStreamObserver = streamObserver; | ||
| _statusStreamObserver = statusStreamObserver; | ||
| _initializationLatch.countDown(); | ||
| } | ||
| return _gotMailCallback; | ||
|
|
@@ -70,29 +79,37 @@ public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver streamObser | |
| * 2. If the received block from the sender is a data-block with 0 rows. | ||
| * </p> | ||
| */ | ||
| @Nullable | ||
| @Override | ||
| public TransferableBlock receive() | ||
| throws Exception { | ||
| public TransferableBlock receive() throws Exception { | ||
| if (!waitForInitialize()) { | ||
| return null; | ||
| } | ||
| MailboxContent mailboxContent = _contentStreamObserver.poll(); | ||
| _totalMsgReceived.incrementAndGet(); | ||
| return mailboxContent == null ? null : fromMailboxContent(mailboxContent); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isInitialized() { | ||
| return _initializationLatch.getCount() <= 0; | ||
| return _initializationLatch.getCount() == 0; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isClosed() { | ||
| return isInitialized() && _contentStreamObserver.isCompleted(); | ||
| return isInitialized() && _contentStreamObserver.hasConsumedAllData(); | ||
| } | ||
|
|
||
| @Override | ||
| public void cancel(Throwable e) { | ||
| public void cancel() { | ||
| if (isInitialized()) { | ||
| try { | ||
| _statusStreamObserver.onError(Status.CANCELLED.asRuntimeException()); | ||
| } catch (Exception e) { | ||
| // TODO: This can happen if the call is already closed. Consider removing this log altogether or find a way | ||
| // to check if the stream is already closed. | ||
| LOGGER.info("Tried to cancel receiving mailbox", e); | ||
|
Comment on lines
+108
to
+110
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's search for a better way to deal with already closed issue.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not related to gRPC but our business logic. There can be concurrent cancellations/terminations of the stream so this can always happen. |
||
| } | ||
| } | ||
| } | ||
|
|
||
| private boolean waitForInitialize() | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.