Skip to content

Commit ba73ba8

Browse files
authored
Merge pull request osheroff#130 from methodmissing/mysql8-gtid-event
Introduce support for MySQL 8 specific metadata on GTID events
2 parents 9d715a5 + c8a4acf commit ba73ba8

File tree

3 files changed

+212
-6
lines changed

3 files changed

+212
-6
lines changed

src/main/java/com/github/shyiko/mysql/binlog/event/GtidEventData.java

+56-1
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,28 @@ public class GtidEventData implements EventData {
2424

2525
private MySqlGtid gtid;
2626
private byte flags;
27+
private long lastCommitted;
28+
private long sequenceNumber;
29+
private long immediateCommitTimestamp;
30+
private long originalCommitTimestamp;
31+
private long transactionLength;
32+
private int immediateServerVersion;
33+
private int originalServerVersion;
2734

2835
@Deprecated
2936
public GtidEventData() {
3037
}
3138

32-
public GtidEventData(MySqlGtid gtid, byte flags) {
39+
public GtidEventData(MySqlGtid gtid, byte flags, long lastCommitted, long sequenceNumber, long immediateCommitTimestamp, long originalCommitTimestamp, long transactionLength, int immediateServerVersion, int originalServerVersion) {
3340
this.gtid = gtid;
3441
this.flags = flags;
42+
this.lastCommitted = lastCommitted;
43+
this.sequenceNumber = sequenceNumber;
44+
this.immediateCommitTimestamp = immediateCommitTimestamp;
45+
this.originalCommitTimestamp = originalCommitTimestamp;
46+
this.transactionLength = transactionLength;
47+
this.immediateServerVersion = immediateServerVersion;
48+
this.originalServerVersion = originalServerVersion;
3549
}
3650

3751
@Deprecated
@@ -52,6 +66,34 @@ public byte getFlags() {
5266
return flags;
5367
}
5468

69+
public long getLastCommitted() {
70+
return lastCommitted;
71+
}
72+
73+
public long getSequenceNumber() {
74+
return sequenceNumber;
75+
}
76+
77+
public long getImmediateCommitTimestamp() {
78+
return immediateCommitTimestamp;
79+
}
80+
81+
public long getOriginalCommitTimestamp() {
82+
return originalCommitTimestamp;
83+
}
84+
85+
public long getTransactionLength() {
86+
return transactionLength;
87+
}
88+
89+
public int getImmediateServerVersion() {
90+
return immediateServerVersion;
91+
}
92+
93+
public int getOriginalServerVersion() {
94+
return originalServerVersion;
95+
}
96+
5597
@Deprecated
5698
public void setFlags(byte flags) {
5799
this.flags = flags;
@@ -61,6 +103,19 @@ public String toString() {
61103
final StringBuilder sb = new StringBuilder();
62104
sb.append("GtidEventData");
63105
sb.append("{flags=").append(flags).append(", gtid='").append(gtid).append('\'');
106+
sb.append(", last_committed='").append(lastCommitted).append('\'');
107+
sb.append(", sequence_number='").append(sequenceNumber).append('\'');
108+
if (immediateCommitTimestamp != 0) {
109+
sb.append(", immediate_commit_timestamp='").append(immediateCommitTimestamp).append('\'');
110+
sb.append(", original_commit_timestamp='").append(originalCommitTimestamp).append('\'');
111+
}
112+
if (transactionLength != 0) {
113+
sb.append(", transaction_length='").append(transactionLength).append('\'');
114+
if (immediateServerVersion != 0) {
115+
sb.append(", immediate_server_version='").append(immediateServerVersion).append('\'');
116+
sb.append(", original_server_version='").append(originalServerVersion).append('\'');
117+
}
118+
}
64119
sb.append('}');
65120
return sb.toString();
66121
}

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidEventDataDeserializer.java

+72-5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,22 @@
2525
* @author <a href="mailto:pprasse@actindo.de">Patrick Prasse</a>
2626
*/
2727
public class GtidEventDataDeserializer implements EventDataDeserializer<GtidEventData> {
28+
public static final int LOGICAL_TIMESTAMP_TYPECODE_LENGTH = 1;
29+
// Type code used before the logical timestamps.
30+
public static final int LOGICAL_TIMESTAMP_TYPECODE = 2;
31+
public static final int LOGICAL_TIMESTAMP_LENGTH = 8;
32+
// Length of immediate and original commit timestamps
33+
public static final int IMMEDIATE_COMMIT_TIMESTAMP_LENGTH = 7;
34+
public static final int ORIGINAL_COMMIT_TIMESTAMP_LENGTH = 7;
35+
// Use 7 bytes out of which 1 bit is used as a flag.
36+
public static final int ENCODED_COMMIT_TIMESTAMP_LENGTH = 55;
37+
public static final int TRANSACTION_LENGTH_MIN_LENGTH = 1;
38+
// Length of immediate and original server versions
39+
public static final int IMMEDIATE_SERVER_VERSION_LENGTH = 4;
40+
public static final int ORIGINAL_SERVER_VERSION_LENGTH = 4;
41+
// Use 4 bytes out of which 1 bit is used as a flag.
42+
public static final int ENCODED_SERVER_VERSION_LENGTH = 31;
43+
public static final int UNDEFINED_SERVER_VERSION = 999999;
2844

2945
@Override
3046
public GtidEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
@@ -33,13 +49,63 @@ public GtidEventData deserialize(ByteArrayInputStream inputStream) throws IOExce
3349
long sourceIdLeastSignificantBits = readLongBigEndian(inputStream);
3450
long transactionId = inputStream.readLong(8);
3551

36-
return new GtidEventData(
37-
new MySqlGtid(
52+
final MySqlGtid gtid = new MySqlGtid(
3853
new UUID(sourceIdMostSignificantBits, sourceIdLeastSignificantBits),
3954
transactionId
40-
),
41-
flags
42-
);
55+
);
56+
57+
// MTR logical clock
58+
long lastCommitted = 0;
59+
long sequenceNumber = 0;
60+
// ImmediateCommitTimestamp/OriginalCommitTimestamp are introduced in MySQL-8.0.1, see:
61+
// https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-1.html
62+
long immediateCommitTimestamp = 0;
63+
long originalCommitTimestamp = 0;
64+
// Total transaction length (including this GTIDEvent), introduced in MySQL-8.0.2, see:
65+
// https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-2.html
66+
long transactionLength = 0;
67+
// ImmediateServerVersion/OriginalServerVersion are introduced in MySQL-8.0.14, see
68+
// https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-14.html
69+
int immediateServerVersion = 0;
70+
int originalServerVersion = 0;
71+
72+
// Logical timestamps - since MySQL 5.7.6
73+
if (inputStream.peek() == LOGICAL_TIMESTAMP_TYPECODE) {
74+
inputStream.skip(LOGICAL_TIMESTAMP_TYPECODE_LENGTH);
75+
lastCommitted = inputStream.readLong(LOGICAL_TIMESTAMP_LENGTH);
76+
sequenceNumber = inputStream.readLong(LOGICAL_TIMESTAMP_LENGTH);
77+
// Immediate and original commit timestamps are introduced in MySQL-8.0.1
78+
if (inputStream.available() >= IMMEDIATE_COMMIT_TIMESTAMP_LENGTH) {
79+
immediateCommitTimestamp = inputStream.readLong(IMMEDIATE_COMMIT_TIMESTAMP_LENGTH);
80+
// Check the MSB to determine how to populate the original commit timestamp
81+
if ((immediateCommitTimestamp & (1L << ENCODED_COMMIT_TIMESTAMP_LENGTH)) != 0) {
82+
immediateCommitTimestamp &= ~(1L << ENCODED_COMMIT_TIMESTAMP_LENGTH);
83+
originalCommitTimestamp = inputStream.readLong(ORIGINAL_COMMIT_TIMESTAMP_LENGTH);
84+
} else {
85+
// Transaction originated in the previous server eg. writer if direct connect
86+
originalCommitTimestamp = immediateCommitTimestamp;
87+
}
88+
// Total transaction length (including this GTIDEvent), introduced in MySQL-8.0.2
89+
if (inputStream.available() >= TRANSACTION_LENGTH_MIN_LENGTH) {
90+
transactionLength = inputStream.readPackedInteger();
91+
}
92+
immediateServerVersion = UNDEFINED_SERVER_VERSION;
93+
originalServerVersion = UNDEFINED_SERVER_VERSION;
94+
// Immediate and original server versions are introduced in MySQL-8.0.14
95+
if (inputStream.available() >= IMMEDIATE_SERVER_VERSION_LENGTH) {
96+
immediateServerVersion = inputStream.readInteger(IMMEDIATE_SERVER_VERSION_LENGTH);
97+
// Check the MSB to determine how to populate original server version
98+
if ((immediateServerVersion & (1L << ENCODED_SERVER_VERSION_LENGTH)) != 0) {
99+
immediateServerVersion &= ~(1L << ENCODED_SERVER_VERSION_LENGTH);
100+
originalServerVersion = inputStream.readInteger(ORIGINAL_SERVER_VERSION_LENGTH);
101+
} else {
102+
originalServerVersion = immediateServerVersion;
103+
}
104+
}
105+
}
106+
}
107+
108+
return new GtidEventData(gtid, flags, lastCommitted, sequenceNumber, immediateCommitTimestamp, originalCommitTimestamp, transactionLength, immediateServerVersion, originalServerVersion);
43109
}
44110

45111
private static long readLongBigEndian(ByteArrayInputStream input) throws IOException {
@@ -49,4 +115,5 @@ private static long readLongBigEndian(ByteArrayInputStream input) throws IOExcep
49115
}
50116
return result;
51117
}
118+
52119
}

src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MysqlGtidEventDataDeserializerTest.java

+84
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,89 @@ public void testDeserialize() throws IOException {
2424
));
2525
assertEquals(data.getFlags(), 0x03);
2626
assertEquals(data.getMySqlGtid().toString(), "24bc7850-2c16-11e6-a073-0242ac110002:11");
27+
assertEquals(data.getLastCommitted(), 0);
28+
assertEquals(data.getSequenceNumber(), 0);
29+
assertEquals(data.toString(), "GtidEventData{flags=3, gtid='24bc7850-2c16-11e6-a073-0242ac110002:11', last_committed='0', sequence_number='0'}");
30+
}
31+
32+
@Test
33+
public void testDeserializeMySQL801() throws IOException {
34+
GtidEventData data = deserializer.deserialize(new ByteArrayInputStream(
35+
new byte[]{
36+
0x01, // flags
37+
(byte) 0xaa, (byte) 0xe5, 0x7b, 0x2f, (byte) 0x8e, 0x44, 0x11, (byte) 0xee, // sourceId mostSignificantBits big endian
38+
(byte) 0xa3, (byte) 0xd6, (byte) 0xa0, 0x36, (byte) 0xbc, (byte) 0xda, 0x1a, 0x41, // sourceId leastSignificantBits big endian
39+
0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // sequence little endian
40+
0x02, // MTR
41+
0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // last committed
42+
0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // sequence number
43+
(byte) 0x97, (byte) 0xef, 0x0c, 0x25, 0x3f, 0x0b, 0x06, // commit timestamp
44+
}
45+
));
46+
assertEquals(data.getFlags(), 0x01);
47+
assertEquals(data.getMySqlGtid().toString(), "aae57b2f-8e44-11ee-a3d6-a036bcda1a41:4");
48+
assertEquals(data.getLastCommitted(), 3);
49+
assertEquals(data.getSequenceNumber(), 4);
50+
assertEquals(data.getImmediateCommitTimestamp(), 1701215692713879L);
51+
assertEquals(data.getOriginalCommitTimestamp(), 1701215692713879L);
52+
assertEquals(data.getTransactionLength(), 0);
53+
assertEquals(data.getImmediateServerVersion(), 999999);
54+
assertEquals(data.getOriginalServerVersion(), 999999);
55+
assertEquals(data.toString(), "GtidEventData{flags=1, gtid='aae57b2f-8e44-11ee-a3d6-a036bcda1a41:4', last_committed='3', sequence_number='4', immediate_commit_timestamp='1701215692713879', original_commit_timestamp='1701215692713879'}");
56+
}
57+
58+
@Test
59+
public void testDeserializeMySQL802() throws IOException {
60+
GtidEventData data = deserializer.deserialize(new ByteArrayInputStream(
61+
new byte[]{
62+
0x00, // flags
63+
(byte) 0x99, 0x4a, (byte) 0xb8, 0x59, (byte) 0x8e, (byte) 0xa8, 0x11, (byte) 0xee, // sourceId mostSignificantBits big endian
64+
(byte) 0xa5, 0x68, (byte) 0xa0, 0x36, (byte) 0xbc, (byte) 0xda, 0x1a, 0x41, // sourceId leastSignificantBits big endian
65+
0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // sequence little endian
66+
0x02, // MTR
67+
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // last committed
68+
0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // sequence number
69+
0x40, 0x55, 0x04, (byte) 0xc4, 0x48, 0x0b, 0x06, // commit timestamp
70+
(byte) 0xfc, 0x34, 0x01, // transaction length
71+
}
72+
));
73+
assertEquals(data.getFlags(), 0x00);
74+
assertEquals(data.getMySqlGtid().toString(), "994ab859-8ea8-11ee-a568-a036bcda1a41:3");
75+
assertEquals(data.getLastCommitted(), 2);
76+
assertEquals(data.getSequenceNumber(), 3);
77+
assertEquals(data.getImmediateCommitTimestamp(), 1701257014433088L);
78+
assertEquals(data.getOriginalCommitTimestamp(), 1701257014433088L);
79+
assertEquals(data.getTransactionLength(), 308);
80+
assertEquals(data.getImmediateServerVersion(), 999999);
81+
assertEquals(data.getOriginalServerVersion(), 999999);
82+
assertEquals(data.toString(), "GtidEventData{flags=0, gtid='994ab859-8ea8-11ee-a568-a036bcda1a41:3', last_committed='2', sequence_number='3', immediate_commit_timestamp='1701257014433088', original_commit_timestamp='1701257014433088', transaction_length='308', immediate_server_version='999999', original_server_version='999999'}");
83+
}
84+
85+
@Test
86+
public void testDeserializeMySQL810() throws IOException {
87+
GtidEventData data = deserializer.deserialize(new ByteArrayInputStream(
88+
new byte[]{
89+
0x00, // flags
90+
(byte) 0xbd, (byte) 0x97, (byte) 0x94, (byte) 0xe0, 0x1d, 0x65, 0x11, (byte) 0xed, // sourceId mostSignificantBits big endian
91+
(byte) 0xa7, (byte) 0xe7, 0x0a, (byte) 0xdb, 0x30, 0x5b, 0x3a, 0x12, // sourceId leastSignificantBits big endian
92+
0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // sequence little endian
93+
0x02, // MTR
94+
0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // last committed
95+
0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // sequence number
96+
0x66, 0x29, (byte) 0xaa, 0x69, 0x55, 0x09, 0x06, // commit timestamp
97+
(byte) 0xfc, 0x3b, 0x01, // transaction length
98+
(byte) 0xe4, 0x38, 0x01, 0x00, // immediate server version
99+
}
100+
));
101+
assertEquals(data.getFlags(), 0x00);
102+
assertEquals(data.getMySqlGtid().toString(), "bd9794e0-1d65-11ed-a7e7-0adb305b3a12:9");
103+
assertEquals(data.getLastCommitted(), 7);
104+
assertEquals(data.getSequenceNumber(), 8);
105+
assertEquals(data.getImmediateCommitTimestamp(), 1699112309893478L);
106+
assertEquals(data.getOriginalCommitTimestamp(), 1699112309893478L);
107+
assertEquals(data.getTransactionLength(), 315);
108+
assertEquals(data.getImmediateServerVersion(), 80100);
109+
assertEquals(data.getOriginalServerVersion(), 80100);
110+
assertEquals(data.toString(), "GtidEventData{flags=0, gtid='bd9794e0-1d65-11ed-a7e7-0adb305b3a12:9', last_committed='7', sequence_number='8', immediate_commit_timestamp='1699112309893478', original_commit_timestamp='1699112309893478', transaction_length='315', immediate_server_version='80100', original_server_version='80100'}");
27111
}
28112
}

0 commit comments

Comments
 (0)