diff --git a/CHANGELOG.md b/CHANGELOG.md index 805767f6..92babdb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## [0.25.1](https://github.com/osheroff/mysql-binlog-connector-java/compare/0.25.0...0.25.1) - 2021-04-20 + +- performance improves in ByteArrayInputStream#read + ## [0.25.0](https://github.com/osheroff/mysql-binlog-connector-java/compare/0.24.1...0.25.0) - 2021-03-04 - bring back jdk 8 support, this caused... ahem. Issues. diff --git a/README.md b/README.md index f665442c..407e27a3 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,18 @@ -# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](https://img.shields.io/maven-central/v/com.github.shyiko/mysql-binlog-connector-java.svg)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) +# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](https://img.shields.io/maven-central/v/com.zendesk/mysql-binlog-connector-java.svg)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) -MySQL Binary Log connector. +MySQL Binary Log connector. @osheroff's fork of @shiyko's project, probably +the "official" version of this. With help from the Debezium devs. + +## Usage + +```xml + + com.zendesk + mysql-binlog-connector-java + 0.25.0 + +``` Initially project was started as a fork of [open-replicator](https://code.google.com/p/open-replicator), but ended up as a complete rewrite. Key differences/features: @@ -21,17 +32,8 @@ but ended up as a complete rewrite. Key differences/features: [siddontang/go-mysql](https://github.com/siddontang/go-mysql) (Go), [noplay/python-mysql-replication](https://github.com/noplay/python-mysql-replication) (Python). -## Usage -Get the latest JAR(s) from [here](https://search.maven.org/search?q=g:com.zendesk%20AND%20a:mysql-binlog-connector-java). Alternatively you can include following Maven dependency (available through Maven Central): - -```xml - - com.zendesk - mysql-binlog-connector-java - 0.23.4 - -``` +Or get the latest JAR(s) from [here](https://search.maven.org/search?q=g:com.zendesk%20AND%20a:mysql-binlog-connector-java). #### Reading binary log file @@ -197,6 +199,7 @@ For the insight into the internals of MySQL look [here](https://dev.mysql.com/do Some of the OSS using / built on top of mysql-binlog-conector-java: * [apache/nifi](https://github.com/apache/nifi) An easy to use, powerful, and reliable system to process and distribute data. * [debezium](https://github.com/debezium/debezium) A low latency data streaming platform for change data capture (CDC). +* [zendesk/maxwell](https://github.com/zendesk/maxwell) A MySQL-to-JSON Kafka producer. * [mavenlink/changestream](https://github.com/mavenlink/changestream) - A stream of changes for MySQL built on Akka. * [mardambey/mypipe](https://github.com/mardambey/mypipe) MySQL binary log consumer with the ability to act on changed rows and publish changes to different systems with emphasis on Apache Kafka. * [ngocdaothanh/mydit](https://github.com/ngocdaothanh/mydit) MySQL to MongoDB data replicator. @@ -204,7 +207,6 @@ Some of the OSS using / built on top of mysql-binlog-conector-java: * [shyiko/rook](https://github.com/shyiko/rook) Generic Change Data Capture (CDC) toolkit. * [streamsets/datacollector](https://github.com/streamsets/datacollector) Continuous big data ingestion infrastructure. * [twingly/ecco](https://github.com/twingly/ecco) MySQL replication binlog parser in JRuby. -* [zendesk/maxwell](https://github.com/zendesk/maxwell) A MySQL-to-JSON Kafka producer. * [zzt93/syncer](https://github.com/zzt93/syncer) A tool sync & manipulate data from MySQL/MongoDB to ES/Kafka/MySQL, which make 'Eventual Consistency' promise. It's also used [on a large scale](https://twitter.com/atwinmutt/status/626816601078300672) in MailChimp. You can read about it [here](http://devs.mailchimp.com/blog/powering-mailchimp-pro-reporting/). diff --git a/pom.xml b/pom.xml index 8c788d4c..8df99701 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.zendesk mysql-binlog-connector-java - 0.25.0 + 0.25.1 mysql-binlog-connector-java MySQL Binary Log connector diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java index 81e7f1ae..d8b824eb 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java @@ -261,8 +261,6 @@ protected void parse(ValueType type, JsonFormatter formatter) throws IOException * json_binary.h file: *

Grammar

* - *

Grammar

- * *
      *   value ::=
      *       object  |
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java
index b883394f..3cfaf979 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java
@@ -222,6 +222,50 @@ private int readWithinBlockBoundaries() throws IOException {
         return inputStream.read();
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+
+        if (peek != null) {
+            b[off] = (byte)(int)peek;
+            off += 1;
+            len -= 1;
+        }
+
+        int read = readWithinBlockBoundaries(b, off, len);
+
+        if (read > 0) {
+            this.pos += read;
+        }
+
+        if (peek != null) {
+            peek = null;
+            read = read <= 0 ? 1 : read + 1;
+        }
+
+        return read;
+    }
+
+    private int readWithinBlockBoundaries(byte[] b, int off, int len) throws IOException {
+        if (blockLength == -1) {
+            return inputStream.read(b, off, len);
+        } else if (blockLength == 0) {
+            return -1;
+        }
+
+        int read = inputStream.read(b, off, Math.min(len, blockLength));
+        if (read > 0) {
+            blockLength -= read;
+        }
+        return read;
+    }
+
     @Override
     public void close() throws IOException {
         inputStream.close();
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStreamTest.java b/src/test/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStreamTest.java
new file mode 100644
index 00000000..8f0eb112
--- /dev/null
+++ b/src/test/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStreamTest.java
@@ -0,0 +1,102 @@
+package com.github.shyiko.mysql.binlog.io;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class ByteArrayInputStreamTest {
+    @Test
+    public void testReadToArray() throws Exception {
+        byte[] buff = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+        ByteArrayInputStream in = new ByteArrayInputStream(buff);
+        assertEquals(in.getPosition(), 0);
+
+        byte[] b = new byte[20];
+
+        int read = in.read(b, 0, 0);
+        assertEquals(read, 0);
+        assertEquals(in.getPosition(), 0);
+
+        read = in.read(b, 0, 4);
+        assertEquals(read, 4);
+        assertEquals(b[3], 3);
+        assertEquals(in.getPosition(), 4);
+
+        read = in.read(b, 4, 4);
+        assertEquals(read, 4);
+        assertEquals(b[7], 7);
+        assertEquals(in.getPosition(), 8);
+
+        read = in.read(b, 8, 4);
+        assertEquals(read, 4);
+        assertEquals(b[11], 11);
+        assertEquals(in.getPosition(), 12);
+
+        read = in.read(b, 12, 4);
+        assertEquals(read, 4);
+        assertEquals(b[15], 15);
+        assertEquals(in.getPosition(), 16);
+
+        read = in.read(b, 16, 4);
+        assertEquals(read, -1);
+        assertEquals(in.getPosition(), 16);
+    }
+
+    @Test
+    public void testReadToArrayWithinBlockBoundaries() throws Exception {
+        byte[] buff = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8};
+        ByteArrayInputStream in = new ByteArrayInputStream(buff);
+        byte[] b = new byte[8];
+
+        in.enterBlock(4);
+
+        int read = in.read(b, 0, 3);
+        assertEquals(read, 3);
+        assertEquals(b[2], 2);
+
+        read = in.read(b, 3, 3);
+        assertEquals(read, 1);
+        assertEquals(b[3], 3);
+
+        read = in.read(b, 4, 3);
+        assertEquals(read, -1);
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testReadToArrayWithNullBuff() throws Exception {
+        ByteArrayInputStream in = new ByteArrayInputStream(new byte[]{});
+        in.read(null, 0, 4);
+    }
+
+    @Test(expectedExceptions = IndexOutOfBoundsException.class)
+    public void testReadToArrayWhenLenExceedsBuffSize() throws Exception {
+        ByteArrayInputStream in = new ByteArrayInputStream(new byte[]{0, 1, 2});
+        byte[] b = new byte[1];
+        in.read(b, 0, 4);
+    }
+
+    @Test(expectedExceptions = IndexOutOfBoundsException.class)
+    public void testReadToArrayWhenOffsetNegative() throws Exception {
+        ByteArrayInputStream in = new ByteArrayInputStream(new byte[]{0, 1, 2});
+        byte[] b = new byte[1];
+        in.read(b, -1, 1);
+    }
+
+    @Test(expectedExceptions = IndexOutOfBoundsException.class)
+    public void testReadToArrayWhenLengthNegative() throws Exception {
+        ByteArrayInputStream in = new ByteArrayInputStream(new byte[]{0, 1, 2});
+        byte[] b = new byte[1];
+        in.read(b, 0, -1);
+    }
+
+    @Test
+    public void testPeekAndReadToArray() throws Exception {
+        ByteArrayInputStream in = new ByteArrayInputStream(new byte[]{5, 6, 7});
+        byte[] b = new byte[3];
+        assertEquals(in.peek(), 5);
+        int read = in.read(b, 0, 3);
+        assertEquals(read, 3);
+        assertEquals(b[0], 5);
+        assertEquals(b[2], 7);
+    }
+}