Skip to content

Commit

Permalink
[improve][io] JDBC sinks: implement JDBC Batch API (apache#18017)
Browse files Browse the repository at this point in the history
* [improve][io] JDBC sinks: implement JDBC Batch API

* more tests and transactions support

* remove .db files

* doc

* fix batch results and thread safey

* add next flush test - fix doc - improve code readability
  • Loading branch information
nicoloboschi authored Oct 20, 2022
1 parent e5b3ffd commit 7b52a92
Show file tree
Hide file tree
Showing 6 changed files with 433 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -32,6 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
Expand Down Expand Up @@ -64,15 +71,15 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
protected JdbcUtils.TableDefinition tableDefinition;

// for flush
private List<Record<T>> incomingList;
private List<Record<T>> swapList;
private Deque<Record<T>> incomingList;
private AtomicBoolean isFlushing;
private int batchSize;
private ScheduledExecutorService flushExecutor;

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
jdbcSinkConfig = JdbcSinkConfig.load(config);
jdbcSinkConfig.validate();

jdbcUrl = jdbcSinkConfig.getJdbcUrl();
if (jdbcSinkConfig.getJdbcUrl() == null) {
Expand Down Expand Up @@ -100,12 +107,13 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc

int timeoutMs = jdbcSinkConfig.getTimeoutMs();
batchSize = jdbcSinkConfig.getBatchSize();
incomingList = Lists.newArrayList();
swapList = Lists.newArrayList();
incomingList = new LinkedList<>();
isFlushing = new AtomicBoolean(false);

flushExecutor = Executors.newScheduledThreadPool(1);
flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
if (timeoutMs > 0) {
flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
}
}

private void initStatement() throws Exception {
Expand Down Expand Up @@ -173,11 +181,14 @@ public void close() throws Exception {
@Override
public void write(Record<T> record) throws Exception {
int number;
synchronized (this) {
synchronized (incomingList) {
incomingList.add(record);
number = incomingList.size();
}
if (number == batchSize) {
if (batchSize > 0 && number >= batchSize) {
if (log.isDebugEnabled()) {
log.debug("flushing by batches, hit batch size {}", batchSize);
}
flushExecutor.schedule(this::flush, 0, TimeUnit.MILLISECONDS);
}
}
Expand Down Expand Up @@ -220,63 +231,81 @@ protected enum MutationType {


private void flush() {
// if not in flushing state, do flush, else return;
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
log.debug("Starting flush, queue size: {}", incomingList.size());
}
if (!swapList.isEmpty()) {
throw new IllegalStateException("swapList should be empty since last flush. swapList.size: "
+ swapList.size());
}
synchronized (this) {
List<Record<T>> tmpList;
swapList.clear();
boolean needAnotherRound;
final Deque<Record<T>> swapList = new LinkedList<>();

synchronized (incomingList) {
if (log.isDebugEnabled()) {
log.debug("Starting flush, queue size: {}", incomingList.size());
}
final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) :
incomingList.size();

tmpList = swapList;
swapList = incomingList;
incomingList = tmpList;
for (int i = 0; i < actualBatchSize; i++) {
swapList.add(incomingList.removeFirst());
}
needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize;
}
long start = System.nanoTime();

int count = 0;
try {
PreparedStatement currentBatch = null;
final List<Mutation> mutations = swapList
.stream()
.map(this::createMutation)
.collect(Collectors.toList());
// bind each record value
for (Record<T> record : swapList) {
final Mutation mutation = createMutation(record);
PreparedStatement statement;
for (Mutation mutation : mutations) {
switch (mutation.getType()) {
case DELETE:
bindValue(deleteStatement, mutation);
count += 1;
deleteStatement.execute();
statement = deleteStatement;
break;
case UPDATE:
bindValue(updateStatement, mutation);
count += 1;
updateStatement.execute();
statement = updateStatement;
break;
case INSERT:
bindValue(insertStatement, mutation);
count += 1;
insertStatement.execute();
statement = insertStatement;
break;
case UPSERT:
bindValue(upsertStatement, mutation);
count += 1;
upsertStatement.execute();
statement = upsertStatement;
break;
default:
String msg = String.format(
"Unsupported action %s, can be one of %s, or not set which indicate %s",
mutation.getType(), Arrays.toString(MutationType.values()), MutationType.INSERT);
throw new IllegalArgumentException(msg);
}
bindValue(statement, mutation);
count += 1;
if (jdbcSinkConfig.isUseJdbcBatch()) {
if (currentBatch != null && statement != currentBatch) {
internalFlushBatch(swapList, currentBatch, count, start);
start = System.nanoTime();
}
statement.addBatch();
currentBatch = statement;
} else {
statement.execute();
if (!jdbcSinkConfig.isUseTransactions()) {
swapList.removeFirst().ack();
}
}
}
if (jdbcSinkConfig.isUseTransactions()) {
connection.commit();

if (jdbcSinkConfig.isUseJdbcBatch()) {
internalFlushBatch(swapList, currentBatch, count, start);
} else {
internalFlush(swapList);
}
swapList.forEach(Record::ack);
} catch (Exception e) {
log.error("Got exception {}", e.getMessage(), e);
log.error("Got exception {} after {} ms, failing {} messages",
e.getMessage(),
(System.nanoTime() - start) / 1000 / 1000,
swapList.size(),
e);
swapList.forEach(Record::fail);
try {
if (jdbcSinkConfig.isUseTransactions()) {
Expand All @@ -287,21 +316,70 @@ private void flush() {
}
}

if (swapList.size() != count) {
log.error("Update count {} not match total number of records {}", count, swapList.size());
}

// finish flush
if (log.isDebugEnabled()) {
log.debug("Finish flush, queue size: {}", swapList.size());
}
swapList.clear();
isFlushing.set(false);
if (needAnotherRound) {
flush();
}
} else {
if (log.isDebugEnabled()) {
log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
}
}
}

private void internalFlush(Deque<Record<T>> swapList) throws SQLException {
if (jdbcSinkConfig.isUseTransactions()) {
connection.commit();
swapList.forEach(Record::ack);
}
}

private void internalFlushBatch(Deque<Record<T>> swapList, PreparedStatement currentBatch, int count, long start) throws SQLException {
executeBatch(swapList, currentBatch);
if (log.isDebugEnabled()) {
log.debug("Flushed {} messages in {} ms", count, (System.nanoTime() - start) / 1000 / 1000);
}
}

private void executeBatch(Deque<Record<T>> swapList, PreparedStatement statement) throws SQLException {
final int[] results = statement.executeBatch();
Map<Integer, Integer> failuresMapping = null;
final boolean useTransactions = jdbcSinkConfig.isUseTransactions();

for (int r: results) {
if (isBatchItemFailed(r)) {
if (failuresMapping == null) {
failuresMapping = new HashMap<>();
}
final Integer current = failuresMapping.computeIfAbsent(r, code -> 1);
failuresMapping.put(r, current + 1);
}
}
if (failuresMapping == null || failuresMapping.isEmpty()) {
if (useTransactions) {
connection.commit();
}
for (int r: results) {
swapList.removeFirst().ack();
}
} else {
if (useTransactions) {
connection.rollback();
}
for (int r: results) {
swapList.removeFirst().fail();
}
String msg = "Batch failed, got error results (error_code->count): " + failuresMapping;
// throwing an exception here means the main loop cycle will nack the messages in the next batch
throw new SQLException(msg);
}
}

private static boolean isBatchItemFailed(int returnCode) {
if (returnCode == Statement.SUCCESS_NO_INFO || returnCode >= 0) {
return false;
}
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,24 @@ public class JdbcSinkConfig implements Serializable {
@FieldDoc(
required = false,
defaultValue = "500",
help = "The jdbc operation timeout in milliseconds"
help = "Enable batch mode by time. After timeoutMs milliseconds the operations queue will be flushed."
)
private int timeoutMs = 500;
@FieldDoc(
required = false,
defaultValue = "200",
help = "The batch size of updates made to the database"
help = "Enable batch mode by number of operations. This value is the max number of operations "
+ "batched in the same transaction/batch."
)
private int batchSize = 200;

@FieldDoc(
required = false,
defaultValue = "false",
help = "Use the JDBC batch API. This option is suggested to improve write performance."
)
private boolean useJdbcBatch = false;

@FieldDoc(
required = false,
defaultValue = "true",
Expand Down Expand Up @@ -141,4 +150,11 @@ public static JdbcSinkConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(new ObjectMapper().writeValueAsString(map), JdbcSinkConfig.class);
}

public void validate() {
if (timeoutMs <= 0 && batchSize <= 0) {
throw new IllegalArgumentException("timeoutMs or batchSize must be set to a positive value.");
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.io.jdbc;

import lombok.extern.slf4j.Slf4j;
import java.util.Map;

/**
* Jdbc Sink test with JDBC Batches API enabled
*/
@Slf4j
public class SqliteJdbcSinkBatchTest extends SqliteJdbcSinkTest {

@Override
protected void configure(Map<String, Object> configuration) {
configuration.put("useJdbcBatch", "true");
}
}
Loading

0 comments on commit 7b52a92

Please sign in to comment.