|
35 | 35 | import com.github.shyiko.mysql.binlog.network.SocketFactory;
|
36 | 36 | import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket;
|
37 | 37 | import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket;
|
| 38 | +import com.github.shyiko.mysql.binlog.network.protocol.Packet; |
38 | 39 | import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
|
39 | 40 | import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
|
40 | 41 | import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand;
|
@@ -597,7 +598,9 @@ private void listenForEventPackets() throws IOException {
|
597 | 598 | }
|
598 | 599 | Event event;
|
599 | 600 | try {
|
600 |
| - event = eventDeserializer.nextEvent(inputStream); |
| 601 | + event = eventDeserializer.nextEvent(packetLength == Packet.MAX_LENGTH ? |
| 602 | + new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) : |
| 603 | + inputStream); |
601 | 604 | } catch (Exception e) {
|
602 | 605 | Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
|
603 | 606 | if (cause instanceof EOFException || cause instanceof SocketException) {
|
@@ -633,6 +636,25 @@ private void listenForEventPackets() throws IOException {
|
633 | 636 | }
|
634 | 637 | }
|
635 | 638 |
|
| 639 | + private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int packetLength) throws IOException { |
| 640 | + List<byte[]> chunks = new LinkedList<byte[]>(); |
| 641 | + chunks.add(inputStream.read(packetLength)); |
| 642 | + int chunkLength; |
| 643 | + do { |
| 644 | + chunkLength = inputStream.readInteger(3); |
| 645 | + inputStream.skip(1); // 1 byte for sequence |
| 646 | + chunks.add(inputStream.read(chunkLength)); |
| 647 | + packetLength += chunkLength; |
| 648 | + } while (chunkLength == Packet.MAX_LENGTH); |
| 649 | + byte[] buffer = new byte[packetLength]; |
| 650 | + int offset = 0; |
| 651 | + for (byte[] chunk : chunks) { |
| 652 | + System.arraycopy(chunk, 0, buffer, offset, chunk.length); |
| 653 | + offset += chunk.length; |
| 654 | + } |
| 655 | + return buffer; |
| 656 | + } |
| 657 | + |
636 | 658 | private void updateClientBinlogFilenameAndPosition(Event event) {
|
637 | 659 | EventHeader eventHeader = event.getHeader();
|
638 | 660 | if (eventHeader.getEventType() == EventType.ROTATE) {
|
|
0 commit comments