Skip to content

Commit b53a77b

Browse files
committed
add checksum detect in binlog file reader.
add some test code for checksum detection.
1 parent 6cdecfd commit b53a77b

File tree

7 files changed

+76
-5
lines changed

7 files changed

+76
-5
lines changed

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogFileReader.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,13 @@ public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeser
5858
if (eventDeserializer == null) {
5959
throw new IllegalArgumentException("Event deserializer cannot be NULL");
6060
}
61-
this.inputStream = new ByteArrayInputStream(inputStream);
61+
this.inputStream = new ByteArrayInputStream(new BufferedInputStream(inputStream));
6262
try {
6363
byte[] magicHeader = this.inputStream.read(MAGIC_HEADER.length);
6464
if (!Arrays.equals(magicHeader, MAGIC_HEADER)) {
6565
throw new IOException("Not a valid binary log");
6666
}
67+
this.eventDeserializer = eventDeserializer.detectChecksumType(this.inputStream);
6768
} catch (IOException e) {
6869
try {
6970
this.inputStream.close();
@@ -72,7 +73,6 @@ public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeser
7273
}
7374
throw e;
7475
}
75-
this.eventDeserializer = eventDeserializer;
7676
}
7777

7878
/**

src/main/java/com/github/shyiko/mysql/binlog/event/EventHeaderV4.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
*/
2222
public class EventHeaderV4 implements EventHeader {
2323

24+
public static final int HEADER_LEN = 19;
2425
// v1 (MySQL 3.23)
2526
private long timestamp;
2627
private EventType eventType;
@@ -87,7 +88,7 @@ public void setFlags(int flags) {
8788

8889
@Override
8990
public long getHeaderLength() {
90-
return 19;
91+
return HEADER_LEN;
9192
}
9293

9394
@Override

src/main/java/com/github/shyiko/mysql/binlog/event/FormatDescriptionEventData.java

+9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public class FormatDescriptionEventData implements EventData {
2323
private int binlogVersion;
2424
private String serverVersion;
2525
private int headerLength;
26+
private int eventLength;
2627

2728
public int getBinlogVersion() {
2829
return binlogVersion;
@@ -48,6 +49,14 @@ public void setHeaderLength(int headerLength) {
4849
this.headerLength = headerLength;
4950
}
5051

52+
public void setEventLength(int eventLength) {
53+
this.eventLength = eventLength;
54+
}
55+
56+
public int getEventLength() {
57+
return eventLength;
58+
}
59+
5160
@Override
5261
public String toString() {
5362
final StringBuilder sb = new StringBuilder();

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java

+23
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import com.github.shyiko.mysql.binlog.event.Event;
1919
import com.github.shyiko.mysql.binlog.event.EventData;
2020
import com.github.shyiko.mysql.binlog.event.EventHeader;
21+
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
2122
import com.github.shyiko.mysql.binlog.event.EventType;
23+
import com.github.shyiko.mysql.binlog.event.FormatDescriptionEventData;
2224
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
2325
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
2426

@@ -240,6 +242,27 @@ public EventDataDeserializer getEventDataDeserializer(EventType eventType) {
240242
return eventDataDeserializer != null ? eventDataDeserializer : defaultEventDataDeserializer;
241243
}
242244

245+
public EventDeserializer detectChecksumType(ByteArrayInputStream inputStream)
246+
throws IOException {
247+
inputStream.mark(EventHeaderV4.HEADER_LEN);
248+
EventHeader eventHeader = eventHeaderDeserializer.deserialize(inputStream);
249+
inputStream.reset();
250+
251+
inputStream.mark((int) (EventHeaderV4.HEADER_LEN + eventHeader.getDataLength()));
252+
inputStream.skip(EventHeaderV4.HEADER_LEN); // skip header, not interpret it twice
253+
FormatDescriptionEventDataDeserializer deserializer = new FormatDescriptionEventDataDeserializer();
254+
FormatDescriptionEventData formatDescriptionEventData = deserializer.deserialize(inputStream);
255+
int eventLength = formatDescriptionEventData.getEventLength();
256+
// 1 byte means checksum algo, 4 byte is crc checksum content
257+
if (eventLength + 1 + 4 == eventHeader.getDataLength()) {
258+
setChecksumType(ChecksumType.CRC32);
259+
} else {
260+
setChecksumType(ChecksumType.NONE);
261+
}
262+
inputStream.reset();
263+
return this;
264+
}
265+
243266
/**
244267
* @see CompatibilityMode#DATE_AND_TIME_AS_LONG
245268
* @see CompatibilityMode#DATE_AND_TIME_AS_LONG_MICRO

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/FormatDescriptionEventDataDeserializer.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.github.shyiko.mysql.binlog.event.deserialization;
1717

18+
import com.github.shyiko.mysql.binlog.event.EventType;
1819
import com.github.shyiko.mysql.binlog.event.FormatDescriptionEventData;
1920
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
2021

@@ -32,7 +33,9 @@ public FormatDescriptionEventData deserialize(ByteArrayInputStream inputStream)
3233
eventData.setServerVersion(inputStream.readString(50).trim());
3334
inputStream.skip(4); // redundant, present in a header
3435
eventData.setHeaderLength(inputStream.readInteger(1));
35-
// lengths for all event types
36+
// skip lengths for some event types
37+
inputStream.skip(EventType.FORMAT_DESCRIPTION.ordinal() - 1);
38+
eventData.setEventLength(inputStream.readInteger(1));
3639
return eventData;
3740
}
3841
}

src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java

+15
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public class ByteArrayInputStream extends InputStream {
2828
private InputStream inputStream;
2929
private Integer peek;
3030
private int blockLength = -1;
31+
private Integer peekMark;
32+
private int blockMark;
3133

3234
public ByteArrayInputStream(InputStream inputStream) {
3335
this.inputStream = inputStream;
@@ -218,4 +220,17 @@ public void skipToTheEndOfTheBlock() throws IOException {
218220
}
219221
}
220222

223+
@Override
224+
public synchronized void mark(int readlimit) {
225+
inputStream.mark(readlimit);
226+
peekMark = peek;
227+
blockMark = blockLength;
228+
}
229+
230+
@Override
231+
public synchronized void reset() throws IOException {
232+
inputStream.reset();
233+
peek = peekMark;
234+
blockLength = blockMark;
235+
}
221236
}

src/test/java/com/github/shyiko/mysql/binlog/BinaryLogFileReaderIntegrationTest.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,32 @@ public class BinaryLogFileReaderIntegrationTest {
4242
public void testNextEvent() throws Exception {
4343
BinaryLogFileReader reader = new BinaryLogFileReader(new GZIPInputStream(
4444
new FileInputStream("src/test/resources/mysql-bin.sakila.gz")));
45+
readAll(reader, 1462);
46+
}
47+
48+
@Test
49+
public void testNoChecksum() throws Exception {
50+
EventDeserializer eventDeserializer = new EventDeserializer();
51+
BinaryLogFileReader reader = new BinaryLogFileReader(new GZIPInputStream(
52+
new FileInputStream("src/test/resources/mysql-bin.sakila.gz")), eventDeserializer);
53+
readAll(reader, 1462);
54+
}
55+
56+
@Test
57+
public void testCRCChecksum() throws Exception {
58+
EventDeserializer eventDeserializer = new EventDeserializer();
59+
BinaryLogFileReader reader = new BinaryLogFileReader(
60+
new FileInputStream("/data/mysql-bin.000001"), eventDeserializer);
61+
readAll(reader, 303);
62+
}
63+
64+
private void readAll(BinaryLogFileReader reader, int expect) throws IOException {
4565
try {
4666
int numberOfEvents = 0;
4767
while ((reader.readEvent()) != null) {
4868
numberOfEvents++;
4969
}
50-
assertEquals(numberOfEvents, 1462);
70+
assertEquals(numberOfEvents, expect);
5171
} finally {
5272
reader.close();
5373
}

0 commit comments

Comments
 (0)