Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,61 @@
*/
package org.apache.pinot.query.mailbox;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.PinotMailboxGrpc;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* GRPC-based implementation of {@link MailboxService}.
* GRPC-based implementation of {@link MailboxService}. Note that there can be cases where the ReceivingMailbox
* and/or the underlying connection can be leaked:
*
* <p>It maintains a collection of connected mailbox servers and clients to remote hosts. All indexed by the
* mailboxID in the format of: <code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code>
* <ol>
* <li>When the OpChain corresponding to the receiver was never registered.</li>
* <li>When the receiving OpChain exited before data was sent for the first time by the sender.</li>
* </ol>
*
* <p>Connections are established/initiated from the sender side and only tier-down from the sender side as well.
* In the event of exception or timed out, the connection is cloased based on a mutually agreed upon timeout period
* after the last successful message sent/received.
*
* <p>Noted that:
* <ul>
* <li>the latter part of the mailboxID consist of the channelID.</li>
* <li>the job_id should be uniquely identifying a send/receving pair, for example if one bundle job requires
* to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to distinguish the 2 different mailbox.</li>
* </ul>
* To handle these cases, we store the {@link ReceivingMailbox} entries in a time-expiring cache. If there was a
* leak, the entry would be evicted, and in that case we also issue a cancel to ensure the underlying stream is also
* released.
*/
public class GrpcMailboxService implements MailboxService<TransferableBlock> {
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcMailboxService.class);
// channel manager
private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = Duration.ofMinutes(5);
private final ChannelManager _channelManager;
private final String _hostname;
private final int _mailboxPort;

// maintaining a list of registered mailboxes.
private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
new ConcurrentHashMap<>();
// We use a cache to ensure that the receiving mailbox and the underlying gRPC stream are not leaked in the cases
// where the corresponding OpChain is either never registered or died before the sender sent data for the first time.
private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache =
CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(), TimeUnit.MINUTES)
.removalListener(new RemovalListener<String, GrpcReceivingMailbox>() {
@Override
public void onRemoval(RemovalNotification<String, GrpcReceivingMailbox> notification) {
if (notification.wasEvicted()) {
// TODO: This should be tied with query deadline, but for that we need to know the query deadline
// when the GrpcReceivingMailbox is initialized in MailboxContentStreamObserver.
LOGGER.warn("Removing dangling GrpcReceivingMailbox: {}", notification.getKey());
notification.getValue().cancel();
}
}
})
.build();
private final Consumer<MailboxIdentifier> _gotMailCallback;

public GrpcMailboxService(String hostname, int mailboxPort, PinotConfiguration extraConfig,
Expand Down Expand Up @@ -88,29 +104,57 @@ public int getMailboxPort() {
}

/**
* Register a mailbox, mailbox needs to be registered before use.
* @param mailboxId the id of the mailbox.
* {@inheritDoc}
*/
public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) {
ManagedChannel channel = getChannel(mailboxId.toString());
PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<Mailbox.MailboxContent> mailboxContentStreamObserver =
stub.open(new MailboxStatusStreamObserver(latch));
GrpcSendingMailbox mailbox = new GrpcSendingMailbox(mailboxId.toString(), mailboxContentStreamObserver, latch);
@Override
public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs) {
MailboxStatusStreamObserver statusStreamObserver = new MailboxStatusStreamObserver();

GrpcSendingMailbox mailbox = new GrpcSendingMailbox(mailboxId.toString(), statusStreamObserver, (deadline) -> {
ManagedChannel channel = getChannel(mailboxId.toString());
PinotMailboxGrpc.PinotMailboxStub stub =
PinotMailboxGrpc.newStub(channel)
.withDeadlineAfter(Math.max(0L, deadline - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
return stub.open(statusStreamObserver);
}, deadlineMs);
return mailbox;
}

/**
* Register a mailbox, mailbox needs to be registered before use.
* @param mailboxId the id of the mailbox.
* {@inheritDoc} See {@link GrpcMailboxService} for details on the design.
*/
@Override
public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
return _receivingMailboxMap.computeIfAbsent(mailboxId.toString(),
(mId) -> new GrpcReceivingMailbox(mId, _gotMailCallback));
try {
return _receivingMailboxCache.get(mailboxId.toString(),
() -> new GrpcReceivingMailbox(mailboxId.toString(), _gotMailCallback));
} catch (ExecutionException e) {
LOGGER.error(String.format("Error getting receiving mailbox: %s", mailboxId), e);
throw new RuntimeException(e);
}
}

/**
* If there's a cached receiving mailbox and it isn't closed (i.e. query didn't finish successfully), then this
* calls a cancel to ensure that the underlying gRPC stream is closed. After that the receiving mailbox is removed
* from the cache.
* <p>
* Also refer to the definition in the interface:
* </p>
* <p>
* {@inheritDoc}
* </p>
*/
@Override
public void releaseReceivingMailbox(MailboxIdentifier mailboxId) {
GrpcReceivingMailbox receivingMailbox = _receivingMailboxCache.getIfPresent(mailboxId.toString());
if (receivingMailbox != null && !receivingMailbox.isClosed()) {
receivingMailbox.cancel();
}
_receivingMailboxCache.invalidate(mailboxId.toString());
}

public ManagedChannel getChannel(String mailboxId) {
private ManagedChannel getChannel(String mailboxId) {
return _channelManager.getChannel(Utils.constructChannelId(mailboxId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,53 @@
*/
package org.apache.pinot.query.mailbox;

import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
import org.apache.pinot.query.mailbox.channel.MailboxContentStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* GRPC implementation of the {@link ReceivingMailbox}.
* GRPC implementation of the {@link ReceivingMailbox}. This mailbox doesn't hold any resources upon creation.
* Instead an explicit {@link #init} call is made when the sender sends the first data-block which attaches
* references to the {@link StreamObserver} to this mailbox.
*/
public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcReceivingMailbox.class);
private static final long DEFAULT_MAILBOX_INIT_TIMEOUT = 100L;
private final String _mailboxId;
private Consumer<MailboxIdentifier> _gotMailCallback;
private final Consumer<MailboxIdentifier> _gotMailCallback;
private final CountDownLatch _initializationLatch;
private final AtomicInteger _totalMsgReceived = new AtomicInteger(0);

private MailboxContentStreamObserver _contentStreamObserver;
private StreamObserver<Mailbox.MailboxStatus> _statusStreamObserver;

public GrpcReceivingMailbox(String mailboxId, Consumer<MailboxIdentifier> gotMailCallback) {
_mailboxId = mailboxId;
_gotMailCallback = gotMailCallback;
_initializationLatch = new CountDownLatch(1);
}

public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver streamObserver) {
public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver streamObserver,
StreamObserver<Mailbox.MailboxStatus> statusStreamObserver) {
if (_initializationLatch.getCount() > 0) {
_contentStreamObserver = streamObserver;
_statusStreamObserver = statusStreamObserver;
_initializationLatch.countDown();
}
return _gotMailCallback;
Expand All @@ -70,29 +79,37 @@ public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver streamObser
* 2. If the received block from the sender is a data-block with 0 rows.
* </p>
*/
@Nullable
@Override
public TransferableBlock receive()
throws Exception {
public TransferableBlock receive() throws Exception {
if (!waitForInitialize()) {
return null;
}
MailboxContent mailboxContent = _contentStreamObserver.poll();
_totalMsgReceived.incrementAndGet();
return mailboxContent == null ? null : fromMailboxContent(mailboxContent);
}

@Override
public boolean isInitialized() {
return _initializationLatch.getCount() <= 0;
return _initializationLatch.getCount() == 0;
}

@Override
public boolean isClosed() {
return isInitialized() && _contentStreamObserver.isCompleted();
return isInitialized() && _contentStreamObserver.hasConsumedAllData();
}

@Override
public void cancel(Throwable e) {
public void cancel() {
if (isInitialized()) {
try {
_statusStreamObserver.onError(Status.CANCELLED.asRuntimeException());
} catch (Exception e) {
// TODO: This can happen if the call is already closed. Consider removing this log altogether or find a way
// to check if the stream is already closed.
LOGGER.info("Tried to cancel receiving mailbox", e);
Comment on lines +108 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's search for a better way to deal with already closed issue.
I think i read some article regarding close/half-close conditions. but we can follow up later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to gRPC but our business logic. There can be concurrent cancellations/terminations of the stream so this can always happen.

}
}
}

private boolean waitForInitialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,79 +18,94 @@
*/
package org.apache.pinot.query.mailbox;

import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* GRPC implementation of the {@link SendingMailbox}.
* gRPC implementation of the {@link SendingMailbox}. The gRPC stream is created on the first call to {@link #send}.
*/
public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class);
private final String _mailboxId;
private final AtomicBoolean _initialized = new AtomicBoolean(false);
private final AtomicInteger _totalMsgSent = new AtomicInteger(0);

private final CountDownLatch _finishLatch;
private final StreamObserver<MailboxContent> _mailboxContentStreamObserver;
private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
private final Function<Long, StreamObserver<MailboxContent>> _mailboxContentStreamObserverSupplier;
private final MailboxStatusStreamObserver _statusObserver;
private final long _deadlineMs;

public GrpcSendingMailbox(String mailboxId, StreamObserver<MailboxContent> mailboxContentStreamObserver,
CountDownLatch latch) {
public GrpcSendingMailbox(String mailboxId, MailboxStatusStreamObserver statusObserver,
Function<Long, StreamObserver<MailboxContent>> contentStreamObserverSupplier, long deadlineMs) {
_mailboxId = mailboxId;
_mailboxContentStreamObserver = mailboxContentStreamObserver;
_finishLatch = latch;
_initialized.set(false);
_mailboxContentStreamObserverSupplier = contentStreamObserverSupplier;
_statusObserver = statusObserver;
_deadlineMs = deadlineMs;
}

@Override
public void send(TransferableBlock block)
throws UnsupportedOperationException {
throws Exception {
if (!_initialized.get()) {
// initialization is special
open();
}
Preconditions.checkState(!_statusObserver.isFinished(),
"Called send when stream is already closed for mailbox=" + _mailboxId);
MailboxContent data = toMailboxContent(block.getDataBlock());
_mailboxContentStreamObserver.onNext(data);
_totalMsgSent.incrementAndGet();
}

@Override
public void complete() {
public void complete()
throws Exception {
_mailboxContentStreamObserver.onCompleted();
}

@Override
public void open() {
// TODO: Get rid of init call.
// send a begin-of-stream message.
_mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
.putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
_initialized.set(true);
public boolean isInitialized() {
return _initialized.get();
}

@Override
public String getMailboxId() {
return _mailboxId;
public void cancel(Throwable t) {
if (_initialized.get() && !_statusObserver.isFinished()) {
LOGGER.warn("GrpcSendingMailbox={} cancelling stream", _mailboxId);
try {
_mailboxContentStreamObserver.onError(Status.fromThrowable(
new RuntimeException("Cancelled by the sender")).asRuntimeException());
} catch (Exception e) {
// TODO: We don't necessarily need to log this since this is relatively quite likely to happen. Logging this
// anyways as info for now so we can see how frequently this happens.
LOGGER.info("Unexpected error issuing onError to MailboxContentStreamObserver: {}", e.getMessage());
}
}
}

@Override
public void waitForFinish(long timeout, TimeUnit unit)
throws InterruptedException {
_finishLatch.await(timeout, unit);
public String getMailboxId() {
return _mailboxId;
}

@Override
public void cancel(Throwable t) {
private void open() {
_mailboxContentStreamObserver = _mailboxContentStreamObserverSupplier.apply(_deadlineMs);
_initialized.set(true);
// send a begin-of-stream message.
_mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
.putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
}

private MailboxContent toMailboxContent(DataBlock dataBlock) {
Expand Down
Loading