Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,24 @@ protected String logProxyOptionsString() {
+ String.format(" 'rootserver-list' = '%s'", METADATA.getRsList());
}

/**
* Current OceanBase connector uses timestamp (in seconds) to mark the offset during the
* transition from {@code SNAPSHOT} to {@code STREAMING} mode. Thus, if some snapshot inserting
* events are too close to the transitioning offset, snapshot inserting events might be emitted
* multiple times. <br>
* This could be safely removed after switching to incremental snapshot framework which provides
* Exactly-once guarantee.
*/
private void waitForTableInitialization() throws InterruptedException {
Thread.sleep(5000L);
}

@Test
public void testTableList() throws Exception {
inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory");
inventoryDatabase.createAndInitialize("mysql");
waitForTableInitialization();

String sourceDDL =
String.format(
"CREATE TABLE ob_source ("
Expand Down Expand Up @@ -212,6 +226,8 @@ public void testTableList() throws Exception {
public void testMetadataColumns() throws Exception {
inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory");
inventoryDatabase.createAndInitialize("mysql");
waitForTableInitialization();

String sourceDDL =
String.format(
"CREATE TABLE ob_source ("
Expand Down Expand Up @@ -297,6 +313,7 @@ public void testAllDataTypes() throws Exception {

columnTypesDatabase = new UniqueDatabase(OB_SERVER, "column_type_test");
columnTypesDatabase.createAndInitialize("mysql");
waitForTableInitialization();

String sourceDDL =
String.format(
Expand Down Expand Up @@ -488,6 +505,7 @@ public void testTimeDataTypes(String serverTimeZone) throws Exception {

columnTypesDatabase = new UniqueDatabase(OB_SERVER, "column_type_test");
columnTypesDatabase.createAndInitialize("mysql");
waitForTableInitialization();

String sourceDDL =
String.format(
Expand Down Expand Up @@ -559,6 +577,7 @@ public void testTimeDataTypes(String serverTimeZone) throws Exception {
public void testSnapshotOnly() throws Exception {
inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory");
inventoryDatabase.createAndInitialize("mysql");
waitForTableInitialization();

String sourceDDL =
String.format(
Expand Down Expand Up @@ -611,7 +630,7 @@ public void testSnapshotOnly() throws Exception {

while (result.getJobClient().get().getJobStatus().get().equals(JobStatus.RUNNING)) {
Thread.sleep(100);
// Waiting for job to quit, in case if
// Waiting for job to finish (SNAPSHOT job will end spontaneously)
}
}
}
Loading