Skip to content

Commit

Permalink
fix-mysql-cdc: poll for 5 minutes only when we have not received a si…
Browse files Browse the repository at this point in the history
…ngle record (#3789)

* fix-mysql-cdc: poll for 5 minutes only when we have not received a single record

* fix format

* address review comment + bump docker version
  • Loading branch information
subodh1810 authored Jun 2, 2021
1 parent bc44cae commit cbf47ad
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.3.2",
"dockerImageTag": "0.3.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql",
"icon": "mysql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.3.2
dockerImageTag: 0.3.3
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
- sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.version=0.3.3

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,14 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class);

/**
* This is not private and final because we need to override in tests otherwise each test would
* continue to run for 5 minutes
*/
static TimeUnit sleepTimeUnit = TimeUnit.MINUTES;
private static final int SLEEP_TIME_AMOUNT = 5;
private static final WaitTime FIRST_RECORD_WAIT_TIME_MINUTES = new WaitTime(5, TimeUnit.MINUTES);
private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime(5, TimeUnit.SECONDS);

private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;
private final Optional<TargetFilePosition> targetFilePosition;
private final Supplier<Boolean> publisherStatusSupplier;
private final VoidCallable requestClose;
private boolean receivedFirstRecord;

public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> queue,
Optional<TargetFilePosition> targetFilePosition,
Expand All @@ -73,6 +70,7 @@ public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> q
this.targetFilePosition = targetFilePosition;
this.publisherStatusSupplier = publisherStatusSupplier;
this.requestClose = requestClose;
this.receivedFirstRecord = false;
}

@Override
Expand All @@ -83,7 +81,8 @@ protected ChangeEvent<String, String> computeNext() {
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
final ChangeEvent<String, String> next;
try {
next = queue.poll(SLEEP_TIME_AMOUNT, sleepTimeUnit);
WaitTime waitTime = receivedFirstRecord ? SUBSEQUENT_RECORD_WAIT_TIME_SECONDS : FIRST_RECORD_WAIT_TIME_MINUTES;
next = queue.poll(waitTime.period, waitTime.timeUnit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand All @@ -100,7 +99,7 @@ protected ChangeEvent<String, String> computeNext() {
if (shouldSignalClose(next)) {
requestClose();
}

receivedFirstRecord = true;
return next;
}
return endOfData();
Expand Down Expand Up @@ -146,4 +145,16 @@ enum SnapshotMetadata {
LAST
}

private static class WaitTime {

public final int period;
public final TimeUnit timeUnit;

public WaitTime(int period, TimeUnit timeUnit) {
this.period = period;
this.timeUnit = timeUnit;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.airbyte.protocol.models.SyncMode;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.jooq.SQLDialect;
import org.testcontainers.containers.MySQLContainer;

Expand Down Expand Up @@ -109,7 +108,6 @@ protected List<String> getRegexTests() {

@Override
protected void setup(TestDestinationEnv testEnv) {
DebeziumRecordIterator.sleepTimeUnit = TimeUnit.SECONDS;
container = new MySQLContainer<>("mysql:8.0");
container.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.SQLDialect;
Expand Down Expand Up @@ -132,7 +131,6 @@ public void setup() {
}

private void init() {
DebeziumRecordIterator.sleepTimeUnit = TimeUnit.SECONDS;
container = new MySQLContainer<>("mysql:8.0");
container.start();
source = new MySqlSource();
Expand Down

0 comments on commit cbf47ad

Please sign in to comment.