Skip to content

Commit

Permalink
[enhancement](mysql-channel) avoid potential buffer overflow when flu…
Browse files Browse the repository at this point in the history
…shing send buffer occurs IOE (apache#30868)
  • Loading branch information
TangSiyang2001 authored Feb 8, 2024
1 parent f26c8c2 commit 20186b3
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 10 deletions.
22 changes: 12 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,11 @@ public void flush() throws IOException {
return;
}
sendBuffer.flip();
realNetSend(sendBuffer);
sendBuffer.clear();
try {
realNetSend(sendBuffer);
} finally {
sendBuffer.clear();
}
isSend = true;
}

Expand All @@ -423,18 +426,17 @@ private void writeHeader(int length, boolean isSsl) throws IOException {
sendBuffer.put((byte) sequenceId);
}

private void writeBuffer(ByteBuffer buffer, boolean isSsl) throws IOException {
private void writeBuffer(ByteBuffer buffer) throws IOException {
if (null == sendBuffer) {
return;
}
long leftLength = sendBuffer.capacity() - sendBuffer.position();
// If too long for buffer, send buffered data.
if (leftLength < buffer.remaining()) {
if (sendBuffer.remaining() < buffer.remaining()) {
// Flush data in buffer.
flush();
}
// Send this buffer if large enough
if (buffer.remaining() > sendBuffer.capacity()) {
if (buffer.remaining() > sendBuffer.remaining()) {
realNetSend(buffer);
return;
}
Expand All @@ -451,20 +453,20 @@ public void sendOnePacket(ByteBuffer packet) throws IOException {
bufLen = MAX_PHYSICAL_PACKET_LENGTH;
packet.limit(packet.position() + bufLen);
if (isSslHandshaking) {
writeBuffer(packet, true);
writeBuffer(packet);
} else {
writeHeader(bufLen, isSslMode);
writeBuffer(packet, isSslMode);
writeBuffer(packet);
accSequenceId();
}
}
if (isSslHandshaking) {
packet.limit(oldLimit);
writeBuffer(packet, true);
writeBuffer(packet);
} else {
writeHeader(oldLimit - packet.position(), isSslMode);
packet.limit(oldLimit);
writeBuffer(packet, isSslMode);
writeBuffer(packet);
accSequenceId();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.mysql;


import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.qe.ConnectContext;

import mockit.Delegate;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Test;
import org.xnio.StreamConnection;

import java.io.IOException;
import java.nio.ByteBuffer;

public class MysqlChannelTest {

@Mocked
StreamConnection streamConnection;

@Test
public void testSendAfterException() throws IOException {
// Mock.
new Expectations() {
{
streamConnection.getSinkChannel().write((ByteBuffer) any);
// The first call to `write()` throws IOException.
result = new IOException();
// The second call to `write()` executes normally.
result = new Delegate() {
int fakeRead(ByteBuffer buffer) {
int writeLen = buffer.remaining();
buffer.position(buffer.limit());
return writeLen;
}
};

streamConnection.getSinkChannel().flush();
result = true;
}
};

ConnectContext ctx = new ConnectContext(streamConnection);
MysqlChannel mysqlChannel = new MysqlChannel(streamConnection, ctx);
Deencapsulation.setField(mysqlChannel, "sendBuffer", ByteBuffer.allocate(5));
// The first call to `realNetSend()` in `flush()` throws IOException.
// If `flush()` doesn't consider this exception, `sendBuffer` won't be reset to write mode,
// which will cause BufferOverflowException at the next calling `sendOnePacket()`.
ByteBuffer buf = ByteBuffer.allocate(12);
buf.putInt(1);
buf.putInt(2);
// limit=8
buf.flip();
try {
mysqlChannel.sendOnePacket(buf);
Assert.fail();
} catch (IOException ignore) {
// do nothing
}
buf.clear();

buf.putInt(1);
// limit=4
buf.flip();
mysqlChannel.sendOnePacket(buf);
buf.clear();

buf.putInt(1);
buf.putInt(2);
// limit=8
buf.flip();
mysqlChannel.sendOnePacket(buf);
}
}

0 comments on commit 20186b3

Please sign in to comment.