Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -1609,6 +1611,7 @@ private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, Output
private final int fetchSize;
private final boolean disableAutoCommit;

private Lock connectionLock = new ReentrantLock();
private @Nullable DataSource dataSource;
private @Nullable Connection connection;
private @Nullable KV<@Nullable String, String> reportedLineage;
Expand Down Expand Up @@ -1637,8 +1640,13 @@ private Connection getConnection() throws SQLException {
Connection connection = this.connection;
if (connection == null) {
DataSource validSource = checkStateNotNull(this.dataSource);
connection = checkStateNotNull(validSource).getConnection();
this.connection = connection;
connectionLock.lock();
try {
connection = validSource.getConnection();
this.connection = connection;
} finally {
connectionLock.unlock();
}

// report Lineage if not haven't done so
KV<@Nullable String, String> schemaWithTable =
Expand Down Expand Up @@ -2663,6 +2671,7 @@ abstract Builder<T, V> setMaxBatchBufferingDuration(
Metrics.distribution(WriteFn.class, "milliseconds_per_batch");

private final WriteFnSpec<T, V> spec;
private Lock connectionLock = new ReentrantLock();
private @Nullable DataSource dataSource;
private @Nullable Connection connection;
private @Nullable PreparedStatement preparedStatement;
Expand Down Expand Up @@ -2700,7 +2709,13 @@ private Connection getConnection() throws SQLException {
Connection connection = this.connection;
if (connection == null) {
DataSource validSource = checkStateNotNull(dataSource);
connection = validSource.getConnection();
connectionLock.lock();
try {
connection = validSource.getConnection();
} finally {
connectionLock.unlock();
}

connection.setAutoCommit(false);
preparedStatement =
connection.prepareStatement(checkStateNotNull(spec.getStatement()).get());
Expand Down
Loading