|
35 | 35 | import com.github.shyiko.mysql.binlog.network.SocketFactory;
|
36 | 36 | import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket;
|
37 | 37 | import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket;
|
| 38 | +import com.github.shyiko.mysql.binlog.network.protocol.Packet; |
38 | 39 | import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
|
39 | 40 | import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
|
40 | 41 | import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand;
|
|
73 | 74 | */
|
74 | 75 | public class BinaryLogClient implements BinaryLogClientMXBean {
|
75 | 76 |
|
| 77 | + // https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html |
| 78 | + private static final int MAX_PACKET_LENGTH = 16777215; |
| 79 | + |
76 | 80 | private final Logger logger = Logger.getLogger(getClass().getName());
|
77 | 81 |
|
78 | 82 | private final String hostname;
|
@@ -592,7 +596,9 @@ private void listenForEventPackets() throws IOException {
|
592 | 596 | }
|
593 | 597 | Event event;
|
594 | 598 | try {
|
595 |
| - event = eventDeserializer.nextEvent(inputStream); |
| 599 | + event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ? |
| 600 | + new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) : |
| 601 | + inputStream); |
596 | 602 | } catch (Exception e) {
|
597 | 603 | Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
|
598 | 604 | if (cause instanceof EOFException || cause instanceof SocketException) {
|
@@ -628,6 +634,18 @@ private void listenForEventPackets() throws IOException {
|
628 | 634 | }
|
629 | 635 | }
|
630 | 636 |
|
| 637 | + private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int packetLength) throws IOException { |
| 638 | + byte[] result = inputStream.read(packetLength); |
| 639 | + int chunkLength; |
| 640 | + do { |
| 641 | + chunkLength = inputStream.readInteger(3); |
| 642 | + inputStream.skip(1); // 1 byte for sequence |
| 643 | + result = Arrays.copyOf(result, result.length + chunkLength); |
| 644 | + inputStream.fill(result, result.length - chunkLength, chunkLength); |
| 645 | + } while (chunkLength == Packet.MAX_LENGTH); |
| 646 | + return result; |
| 647 | + } |
| 648 | + |
631 | 649 | private void updateClientBinlogFilenameAndPosition(Event event) {
|
632 | 650 | EventHeader eventHeader = event.getHeader();
|
633 | 651 | if (eventHeader.getEventType() == EventType.ROTATE) {
|
|
0 commit comments