|
30 | 30 | import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
|
31 | 31 | import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer;
|
32 | 32 | import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer;
|
| 33 | +import com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream; |
33 | 34 | import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
|
34 | 35 | import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
|
35 | 36 | import com.github.shyiko.mysql.binlog.network.AuthenticationException;
|
|
49 | 50 |
|
50 | 51 | import java.io.EOFException;
|
51 | 52 | import java.io.IOException;
|
| 53 | +import java.io.InputStream; |
52 | 54 | import java.net.InetSocketAddress;
|
53 | 55 | import java.net.Socket;
|
54 | 56 | import java.net.SocketException;
|
|
76 | 78 | */
|
77 | 79 | public class BinaryLogClient implements BinaryLogClientMXBean {
|
78 | 80 |
|
| 81 | + private static final SocketFactory DEFAULT_SOCKET_FACTORY = new SocketFactory() { |
| 82 | + |
| 83 | + @Override |
| 84 | + public Socket createSocket() throws SocketException { |
| 85 | + return new Socket() { |
| 86 | + |
| 87 | + private InputStream inputStream; |
| 88 | + |
| 89 | + @Override |
| 90 | + public synchronized InputStream getInputStream() throws IOException { |
| 91 | + return inputStream != null ? inputStream : |
| 92 | + (inputStream = new BufferedSocketInputStream(super.getInputStream())); |
| 93 | + } |
| 94 | + }; |
| 95 | + } |
| 96 | + }; |
| 97 | + |
79 | 98 | private final Logger logger = Logger.getLogger(getClass().getName());
|
80 | 99 |
|
81 | 100 | private final String hostname;
|
@@ -322,7 +341,7 @@ public void setEventDeserializer(EventDeserializer eventDeserializer) {
|
322 | 341 | }
|
323 | 342 |
|
324 | 343 | /**
|
325 |
| - * @param socketFactory custom socket factory. If not provided, socket will be created with "new Socket()". |
| 344 | + * @param socketFactory custom socket factory |
326 | 345 | */
|
327 | 346 | public void setSocketFactory(SocketFactory socketFactory) {
|
328 | 347 | this.socketFactory = socketFactory;
|
@@ -402,7 +421,8 @@ private void reset() {
|
402 | 421 |
|
403 | 422 | private void establishConnection() throws IOException {
|
404 | 423 | try {
|
405 |
| - Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket(); |
| 424 | + SocketFactory socketFactory = this.socketFactory != null ? this.socketFactory : DEFAULT_SOCKET_FACTORY; |
| 425 | + Socket socket = socketFactory.createSocket(); |
406 | 426 | socket.connect(new InetSocketAddress(hostname, port));
|
407 | 427 | channel = new PacketChannel(socket);
|
408 | 428 | if (channel.getInputStream().peek() == -1) {
|
|
0 commit comments