-
Notifications
You must be signed in to change notification settings - Fork 3.7k
CASSANDRA-13630 support large internode messages with netty #253
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
// if we allocated too much buffer for this message, we'll log here. | ||
// if we allocated to little buffer space, we would have hit an exception when trying to write more bytes to it | ||
if (out.isWritable()) | ||
errorLogger.error("{} reported message size {}, actual message size {}, msg {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How likely are we to cause log spam?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The errorLogger
is an instance of NoSpamLogger
, so we can adjust the log intervals. I'm not sure if making it an assert is better (as that would throw and Exception, and the connection would get killed). As it's mostly a developer-level logging thing, I could just remove it, as well. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine. We can leave it in.
/** | ||
* Default to a very large value. | ||
*/ | ||
private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = TimeUnit.DAYS.toMillis(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its risky to default to a very large value. Why do we want to default to such a large value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The corresponding pre-4.0 code just does blocking IO on a socket, with no timeouts. Thus it blocks forever. 2 days seems shorter than forever, but I'm game to change this to any time-bounded value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it doesn't make sense to wait for more than 5 minutes to rebuffer. Given buffer sizes are on the order of a few megabytes at most, 5 minutes is an eternity. It's best to keep it short so failures are exposed sooner than later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've lowered it to 3 minutes.
@@ -183,6 +195,11 @@ public int available() throws EOFException | |||
return availableBytes; | |||
} | |||
|
|||
public boolean isEmpty() throws EOFException | |||
{ | |||
return available() == 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method available()
has a side effect of channelConfig.setAutoRead(true)
when the availableBytes
falls below the lowWatermark
. Are you sure that is ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice find. I added that side effect to
StreamingInboundHandler to help deserialization. Clearly, I could move that into a separate method, and have StreamingInboundHandler
invoke that after the available()
call. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, looking at this again, I needed to figure out why available()
does the autoRead check. For posterity, here's the reasoning behind it:
There a case where we enqueue a buffer, and disable autoRead. Then read all of the buffer exactly (and no more bytes are available to read in the instance). In an effort to not block indefinitely (so I could check if StreamingInboundHandler.close
is true), i called available() (and did the autoRead check) instead of attempt to read a value from the inputPlus (which would call reBuffer()
and block either indefinitely or throw an exception if I added a timeout). I didn't want the indefinite block nor an exception to be thrown, so I bolted the autoRead check onto available()
.
I think it still makes sense to move this into a separate method call.
{ | ||
try | ||
{ | ||
session.onError(t).get(5, TimeUnit.MINUTES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have a 5 minute timeout? We should pull this out as a constant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
@SuppressWarnings("resource") | ||
public void process(ByteBuf in) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is a bit complicated to understand. Could you refactor it? For exampleRedisDecoder
provides a nice example of this - https://github.com/netty/netty/blob/4.1/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. I also pulled the main loop logic into the base class.
…ing on the queue in RebufferingByteBufDataInputPlus; fix MIHTest
…o the base class, and moved logic for each case statement into submethods rather than directly in-line with the loop
{ | ||
String key = DataInputStream.readUTF(in); | ||
ParameterType parameterType = ParameterType.byName.get(key); | ||
int valueLength = in.readInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
{ | ||
String key = DataInputStream.readUTF(inputTracker); | ||
ParameterType parameterType = ParameterType.byName.get(key); | ||
long valueLength = VIntCoding.readUnsignedVInt(inputTracker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused variable.
@@ -196,10 +220,12 @@ protected void doFlush(int count) throws IOException | |||
int byteCount = buffer.position(); | |||
currentBuf.writerIndex(byteCount); | |||
|
|||
if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, TimeUnit.MINUTES)) | |||
if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, rateLimiterBlockTime, rateLimiterBlockTimeUnit)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going up from 2 minutes to potentially 5 minutes. I think this should be ok but just making sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. ftr, the only place where this went from 2 up to 5 minutes was in NettyStreamingMessageSender.FIleSendTask
where I defaulted to 5 minutes. I've switched that call site to default to 2 minutes.
|
||
private void buildParamBufPre40(int valueLength) throws IOException | ||
{ | ||
buf = Unpooled.buffer(1024, 1024); // 1k should be enough for everybody! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤣
*/ | ||
@SuppressWarnings("resource") | ||
public void handleDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception | ||
class NonblockingBufferHandler implements BufferHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be made into a static inner class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
{ | ||
for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE) | ||
{ | ||
if (((ParameterType)parameters.get(ii + PARAMETER_TUPLE_TYPE_OFFSET)).equals(type)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need the typecast to ParameterType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
{ | ||
try | ||
{ | ||
UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Spacing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
try | ||
{ | ||
UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION); | ||
if (sessionId != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume sessionId != null
means that tracing is enabled? Otherwise we should explicitly check whether tracing is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Further, this method is basically taken verbatim from 3.11's OutboundTcpConnection.writeConnected() method.
if (type == ChannelWriterType.COALESCING) | ||
coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test"); | ||
else | ||
coalescingStrategy = Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is beyond the scope of this PR, however, we should refactor OutboundConnectionParams
to not store an Optional
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait for CASSANDRA-14503 :)
@jasobrown can be this PR closed? |
df3eb40
to
54e39a9
Compare
No description provided.