Skip to content

Conversation

jasobrown
Copy link
Contributor

No description provided.

// if we allocated too much buffer for this message, we'll log here.
// if we allocated to little buffer space, we would have hit an exception when trying to write more bytes to it
if (out.isWritable())
errorLogger.error("{} reported message size {}, actual message size {}, msg {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How likely are we to cause log spam?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The errorLogger is an instance of NoSpamLogger, so we can adjust the log intervals. I'm not sure if making it an assert is better (as that would throw and Exception, and the connection would get killed). As it's mostly a developer-level logging thing, I could just remove it, as well. wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine. We can leave it in.

/**
* Default to a very large value.
*/
private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = TimeUnit.DAYS.toMillis(2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its risky to default to a very large value. Why do we want to default to such a large value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The corresponding pre-4.0 code just does blocking IO on a socket, with no timeouts. Thus it blocks forever. 2 days seems shorter than forever, but I'm game to change this to any time-bounded value.

Copy link
Member

@dineshjoshi dineshjoshi Sep 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it doesn't make sense to wait for more than 5 minutes to rebuffer. Given buffer sizes are on the order of a few megabytes at most, 5 minutes is an eternity. It's best to keep it short so failures are exposed sooner than later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've lowered it to 3 minutes.

@@ -183,6 +195,11 @@ public int available() throws EOFException
return availableBytes;
}

public boolean isEmpty() throws EOFException
{
return available() == 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method available() has a side effect of channelConfig.setAutoRead(true) when the availableBytes falls below the lowWatermark. Are you sure that is ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find. I added that side effect to
StreamingInboundHandler to help deserialization. Clearly, I could move that into a separate method, and have StreamingInboundHandler invoke that after the available() call. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, looking at this again, I needed to figure out why available() does the autoRead check. For posterity, here's the reasoning behind it:

There a case where we enqueue a buffer, and disable autoRead. Then read all of the buffer exactly (and no more bytes are available to read in the instance). In an effort to not block indefinitely (so I could check if StreamingInboundHandler.close is true), i called available() (and did the autoRead check) instead of attempt to read a value from the inputPlus (which would call reBuffer() and block either indefinitely or throw an exception if I added a timeout). I didn't want the indefinite block nor an exception to be thrown, so I bolted the autoRead check onto available().

I think it still makes sense to move this into a separate method call.

{
try
{
session.onError(t).get(5, TimeUnit.MINUTES);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have a 5 minute timeout? We should pull this out as a constant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

@SuppressWarnings("resource")
public void process(ByteBuf in) throws IOException
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is a bit complicated to understand. Could you refactor it? For exampleRedisDecoder provides a nice example of this - https://github.com/netty/netty/blob/4.1/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. I also pulled the main loop logic into the base class.

{
String key = DataInputStream.readUTF(in);
ParameterType parameterType = ParameterType.byName.get(key);
int valueLength = in.readInt();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

{
String key = DataInputStream.readUTF(inputTracker);
ParameterType parameterType = ParameterType.byName.get(key);
long valueLength = VIntCoding.readUnsignedVInt(inputTracker);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused variable.

@@ -196,10 +220,12 @@ protected void doFlush(int count) throws IOException
int byteCount = buffer.position();
currentBuf.writerIndex(byteCount);

if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, TimeUnit.MINUTES))
if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, rateLimiterBlockTime, rateLimiterBlockTimeUnit))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going up from 2 minutes to potentially 5 minutes. I think this should be ok but just making sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. ftr, the only place where this went from 2 up to 5 minutes was in NettyStreamingMessageSender.FIleSendTask where I defaulted to 5 minutes. I've switched that call site to default to 2 minutes.


private void buildParamBufPre40(int valueLength) throws IOException
{
buf = Unpooled.buffer(1024, 1024); // 1k should be enough for everybody!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤣

*/
@SuppressWarnings("resource")
public void handleDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
class NonblockingBufferHandler implements BufferHandler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be made into a static inner class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{
for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE)
{
if (((ParameterType)parameters.get(ii + PARAMETER_TUPLE_TYPE_OFFSET)).equals(type))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need the typecast to ParameterType

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{
try
{
UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Spacing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

try
{
UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION);
if (sessionId != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume sessionId != null means that tracing is enabled? Otherwise we should explicitly check whether tracing is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Further, this method is basically taken verbatim from 3.11's OutboundTcpConnection.writeConnected() method.

@jasobrown jasobrown changed the title 13630 CASSANDRA-13630 Sep 9, 2018
if (type == ChannelWriterType.COALESCING)
coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
else
coalescingStrategy = Optional.empty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is beyond the scope of this PR, however, we should refactor OutboundConnectionParams to not store an Optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait for CASSANDRA-14503 :)

@smiklosovic smiklosovic changed the title CASSANDRA-13630 CASSANDRA-13630 support large internode messages with netty Mar 16, 2022
@smiklosovic
Copy link
Contributor

@jasobrown can be this PR closed?

@belliottsmith belliottsmith force-pushed the trunk branch 2 times, most recently from df3eb40 to 54e39a9 Compare July 23, 2025 11:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants