Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
import org.apache.drill.exec.proto.BitControl.CustomMessage;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.InitializeFragments;
Expand Down Expand Up @@ -50,6 +51,7 @@ public static RpcConfig getMapping(DrillConfig config, Executor executor) {
.add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
.add(RpcType.REQ_UNPAUSE_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_CUSTOM, CustomMessage.class, RpcType.RESP_CUSTOM, CustomMessage.class)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
*/
package org.apache.drill.exec.rpc.control;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.drill.exec.proto.BitControl.CustomMessage;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.InitializeFragments;
Expand All @@ -26,12 +33,16 @@
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.FutureBitCommand;
import org.apache.drill.exec.rpc.ListeningCommand;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;


public class ControlTunnel {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlTunnel.class);
Expand Down Expand Up @@ -176,4 +187,165 @@ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection
connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, Ack.class);
}
}

public <SEND extends Message, RECEIVE extends Message> CustomTunnel<SEND, RECEIVE> getCustomTunnel(
int messageTypeId, Class<SEND> clazz, Parser<RECEIVE> parser) {
return new CustomTunnel<SEND, RECEIVE>(messageTypeId, parser);
}

private static class CustomMessageSender extends ListeningCommand<CustomMessage, ControlConnection> {

private CustomMessage message;
private ByteBuf[] dataBodies;

public CustomMessageSender(RpcOutcomeListener<CustomMessage> listener, CustomMessage message, ByteBuf[] dataBodies) {
super(listener);
this.message = message;
this.dataBodies = dataBodies;
}

@Override
public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) {
connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies);
}

}

private static class SyncCustomMessageSender extends FutureBitCommand<CustomMessage, ControlConnection> {

private CustomMessage message;
private ByteBuf[] dataBodies;

public SyncCustomMessageSender(CustomMessage message, ByteBuf[] dataBodies) {
super();
this.message = message;
this.dataBodies = dataBodies;
}

@Override
public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) {
connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies);
}
}

/**
* A class used to return a synchronous future when doing custom rpc messages.
* @param <RECEIVE>
* The type of message that will be returned.
*/
public class CustomFuture<RECEIVE> {

private Parser<RECEIVE> parser;
private DrillRpcFuture<CustomMessage> future;

public CustomFuture(Parser<RECEIVE> parser, DrillRpcFuture<CustomMessage> future) {
super();
this.parser = parser;
this.future = future;
}

public RECEIVE get() throws RpcException, InvalidProtocolBufferException {
CustomMessage message = future.checkedGet();
return parser.parseFrom(message.getMessage());
}

public RECEIVE get(long timeout, TimeUnit unit) throws RpcException, TimeoutException,
InvalidProtocolBufferException {
CustomMessage message = future.checkedGet(timeout, unit);
return parser.parseFrom(message.getMessage());
}

public DrillBuf getBuffer() throws RpcException {
return (DrillBuf) future.getBuffer();
}

}

/**
* A special tunnel that can be used for custom types of messages. Its lifecycle is tied to the underlying
* ControlTunnel.
* @param <SEND>
* The type of message the control tunnel will be able to send.
* @param <RECEIVE>
* The expected response the control tunnel expects to receive.
*/
public class CustomTunnel<SEND extends Message, RECEIVE extends Message> {
private int messageTypeId;
private Parser<RECEIVE> parser;

private CustomTunnel(int messageTypeId, Parser<RECEIVE> parser) {
super();
this.messageTypeId = messageTypeId;
this.parser = parser;
}

/**
* Send a message and receive a future for monitoring the outcome.
* @param messageToSend
* The structured message to send.
* @param dataBodies
* One or more optional unstructured messages to append to the structure message.
* @return The CustomFuture that can be used to wait for the response.
*/
public CustomFuture<RECEIVE> send(SEND messageToSend, ByteBuf... dataBodies) {
final CustomMessage customMessage = CustomMessage.newBuilder()
.setMessage(messageToSend.toByteString())
.setType(messageTypeId)
.build();
final SyncCustomMessageSender b = new SyncCustomMessageSender(customMessage, dataBodies);
manager.runCommand(b);
DrillRpcFuture<CustomMessage> innerFuture = b.getFuture();
return new CustomFuture<RECEIVE>(parser, innerFuture);
}

/**
* Send a message using a custom listener.
* @param listener
* The listener to inform of the outcome of the sent message.
* @param messageToSend
* The structured message to send.
* @param dataBodies
* One or more optional unstructured messages to append to the structure message.
*/
public void send(RpcOutcomeListener<RECEIVE> listener, SEND messageToSend, ByteBuf... dataBodies) {
final CustomMessage customMessage = CustomMessage.newBuilder()
.setMessage(messageToSend.toByteString())
.setType(messageTypeId)
.build();
manager.runCommand(new CustomMessageSender(new CustomTunnelListener(listener), customMessage, dataBodies));
}

private class CustomTunnelListener implements RpcOutcomeListener<CustomMessage> {
final RpcOutcomeListener<RECEIVE> innerListener;

public CustomTunnelListener(RpcOutcomeListener<RECEIVE> innerListener) {
super();
this.innerListener = innerListener;
}

@Override
public void failed(RpcException ex) {
innerListener.failed(ex);
}

@Override
public void success(CustomMessage value, ByteBuf buffer) {
try {
RECEIVE message = parser.parseFrom(value.getMessage());
innerListener.success(message, buffer);
} catch (InvalidProtocolBufferException e) {
innerListener.failed(new RpcException("Failure while parsing message locally.", e));
}

}

@Override
public void interrupted(InterruptedException e) {
innerListener.interrupted(e);
}

}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
*/
package org.apache.drill.exec.rpc.control;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;

import java.io.Closeable;

import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.UserRpcException;

import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;

/**
* Service that allows one Drillbit to communicate with another. Internally manages whether each particular bit is a
Expand All @@ -41,5 +48,61 @@ public interface Controller extends Closeable {

public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException;

/**
* Register a new handler for custom message types. Should be done before any messages. This is threadsafe as this
* method manages locking internally.
*
* @param messageTypeId
* The type of message id to handle. This corresponds to the CustomMessage.type field. Note that only a
* single handler for a particular type of message can be registered within a particular Drillbit.
* @param handler
* The handler that should be used to handle this type of message.
* @param parser
* The parser used to handle the types of messages the handler above handles.
*/
public <REQUEST extends MessageLite, RESPONSE extends MessageLite> void registerCustomHandler(int messageTypeId,
CustomMessageHandler<REQUEST, RESPONSE> handler, Parser<REQUEST> parser);

/**
* Defines how the Controller should handle custom messages. Implementations need to be threadsafe.
* @param <REQUEST>
* The type of request message.
* @param <RESPONSE>
* The type of the response message.
*/
public interface CustomMessageHandler<REQUEST extends MessageLite, RESPONSE extends MessageLite> {

/**
* Handle an incoming message.
* @param pBody
* The protobuf body message object of type REQUEST that was sent.
* @param dBody
* An optional byte body that was sent along with the structured message.
* @return The response that should be sent to the message sender.
* @throws UserRpcException
* throw this exception if there is an RPC failure that should be communicated to the sender.
*/
public CustomResponse<RESPONSE> onMessage(REQUEST pBody, DrillBuf dBody) throws UserRpcException;
}

/**
* A simple interface that describes the nature of the response to the custom incoming message.
*
* @param <RESPONSE>
* The type of message that the respopnse contains. Must be a protobuf message type.
*/
public interface CustomResponse<RESPONSE extends MessageLite> {

/**
* The structured portion of the response.
* @return A protobuf message of type RESPONSE
*/
public RESPONSE getMessage();

/**
* The optional unstructured portion of the message.
* @return null or one or more unstructured bodies.
*/
public ByteBuf[] getBodies();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.drill.exec.work.batch.ControlMessageHandler;

import com.google.common.io.Closeables;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;

/**
* Manages communication tunnels between nodes.
Expand All @@ -36,13 +38,15 @@ public class ControllerImpl implements Controller {
private final BootStrapContext context;
private final ConnectionManagerRegistry connectionRegistry;
private final boolean allowPortHunting;
private final CustomHandlerRegistry handlerRegistry;

public ControllerImpl(BootStrapContext context, ControlMessageHandler handler, boolean allowPortHunting) {
super();
this.handler = handler;
this.context = context;
this.connectionRegistry = new ConnectionManagerRegistry(handler, context);
this.allowPortHunting = allowPortHunting;
this.handlerRegistry = handler.getHandlerRegistry();
}

@Override
Expand All @@ -52,6 +56,7 @@ public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitS
port = server.bind(port, allowPortHunting);
DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setControlPort(port).build();
connectionRegistry.setEndpoint(completeEndpoint);
handlerRegistry.setEndpoint(completeEndpoint);
return completeEndpoint;
}

Expand All @@ -60,11 +65,19 @@ public ControlTunnel getTunnel(DrillbitEndpoint endpoint) {
return new ControlTunnel(endpoint, connectionRegistry.getConnectionManager(endpoint));
}


@Override
public <REQUEST extends MessageLite, RESPONSE extends MessageLite> void registerCustomHandler(int messageTypeId,
CustomMessageHandler<REQUEST, RESPONSE> handler, Parser<REQUEST> parser) {
handlerRegistry.registerCustomHandler(messageTypeId, handler, parser);
}

public void close() {
Closeables.closeQuietly(server);
for (ControlConnectionManager bt : connectionRegistry) {
bt.close();
}
}


}
Loading