diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java index eebc8ef17dfa0..ad7af129271f1 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java @@ -31,11 +31,11 @@ void assertFetchRecordsTimeoutCorrectly() { SimpleMemoryPipelineChannel simpleMemoryPipelineChannel = new SimpleMemoryPipelineChannel(10, new EmptyAckCallback()); long startMills = System.currentTimeMillis(); simpleMemoryPipelineChannel.fetchRecords(1, 1, TimeUnit.MILLISECONDS); - long endMills = System.currentTimeMillis(); - assertTrue(endMills - startMills >= 1 && endMills - startMills < 50); + long delta = System.currentTimeMillis() - startMills; + assertTrue(delta >= 1 && delta < 50, "Delta is not in [1,50) : " + delta); startMills = System.currentTimeMillis(); simpleMemoryPipelineChannel.fetchRecords(1, 500, TimeUnit.MILLISECONDS); - endMills = System.currentTimeMillis(); - assertTrue(endMills - startMills >= 500 && endMills - startMills < 600); + delta = System.currentTimeMillis() - startMills; + assertTrue(delta >= 500 && delta < 650, "Delta is not in [500,650) : " + delta); } } diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java index a8c5499c6586b..17532f617dcff 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java @@ -32,6 +32,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Date; import java.sql.SQLException; import java.util.LinkedList; @@ -252,7 +253,7 @@ private String readStringSegment(final ByteBuffer data, final int startPosition, for (int i = 0; i < offset; i++) { result[i] = data.get(startPosition + i); } - return new String(result); + return new String(result, StandardCharsets.UTF_8); } private String readNextString(final ByteBuffer data) { diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java index 7689aa706f14b..8ed9a27ed28b3 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java @@ -28,6 +28,7 @@ import org.postgresql.replication.LogSequenceNumber; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.SQLException; import static org.hamcrest.CoreMatchers.instanceOf; @@ -47,7 +48,7 @@ class TestDecodingPluginTest { @Test void assertDecodeWriteRowEvent() { ByteBuffer data = ByteBuffer.wrap(("table public.test: INSERT: data[character varying]:' 1 2 3'' 😊中' t_json_empty[json]:'{}' t_json[json]:'{\"test\":\"中中{中中}' 中\"}'" - + " t_jsonb[jsonb]:'{\"test\":\"😊Emoji中\"}'").getBytes()); + + " t_jsonb[jsonb]:'{\"test\":\"😊Emoji中\"}'").getBytes(StandardCharsets.UTF_8)); WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber); assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber)); assertThat(actual.getTableName(), is("test")); @@ -59,7 +60,8 @@ void assertDecodeWriteRowEvent() { @Test void assertDecodeUpdateRowEvent() { - ByteBuffer data = ByteBuffer.wrap("table public.test: UPDATE: unicode[character varying]:' 1 2 3'' 😊中 ' t_json_empty[json]:'{}' t_json[json]:'{\"test\":\"中中{中中}' 中\"}'".getBytes()); + ByteBuffer data = ByteBuffer.wrap("table public.test: UPDATE: unicode[character varying]:' 1 2 3'' 😊中 ' t_json_empty[json]:'{}' t_json[json]:'{\"test\":\"中中{中中}' 中\"}'" + .getBytes(StandardCharsets.UTF_8)); UpdateRowEvent actual = (UpdateRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber); assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber)); assertThat(actual.getTableName(), is("test")); @@ -70,7 +72,7 @@ void assertDecodeUpdateRowEvent() { @Test void assertDecodeDeleteRowEvent() { - ByteBuffer data = ByteBuffer.wrap("table public.test: DELETE: data[integer]:1".getBytes()); + ByteBuffer data = ByteBuffer.wrap("table public.test: DELETE: data[integer]:1".getBytes(StandardCharsets.UTF_8)); DeleteRowEvent actual = (DeleteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber); assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber)); assertThat(actual.getTableName(), is("test")); @@ -79,7 +81,7 @@ void assertDecodeDeleteRowEvent() { @Test void assertDecodeWriteRowEventWithByteA() { - ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[bytea]:'\\xff00ab'".getBytes()); + ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[bytea]:'\\xff00ab'".getBytes(StandardCharsets.UTF_8)); WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber); assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber)); assertThat(actual.getTableName(), is("test")); @@ -88,13 +90,13 @@ void assertDecodeWriteRowEventWithByteA() { @Test void assertDecodeUnknownTableType() { - ByteBuffer data = ByteBuffer.wrap("unknown".getBytes()); + ByteBuffer data = ByteBuffer.wrap("unknown".getBytes(StandardCharsets.UTF_8)); assertThat(new TestDecodingPlugin(null).decode(data, logSequenceNumber), instanceOf(PlaceholderEvent.class)); } @Test void assertDecodeUnknownRowEventType() { - ByteBuffer data = ByteBuffer.wrap("table public.test: UNKNOWN: data[character varying]:'1 2 3'''".getBytes()); + ByteBuffer data = ByteBuffer.wrap("table public.test: UNKNOWN: data[character varying]:'1 2 3'''".getBytes(StandardCharsets.UTF_8)); assertThrows(IngestException.class, () -> new TestDecodingPlugin(null).decode(data, logSequenceNumber)); } @@ -102,13 +104,14 @@ void assertDecodeUnknownRowEventType() { void assertDecodeTime() throws SQLException { TimestampUtils timestampUtils = mock(TimestampUtils.class); when(timestampUtils.toTime(null, "1 2 3'")).thenThrow(new SQLException("")); - ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[time without time zone]:'1 2 3'''".getBytes()); + ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[time without time zone]:'1 2 3'''".getBytes(StandardCharsets.UTF_8)); assertThrows(DecodingException.class, () -> new TestDecodingPlugin(new PostgreSQLTimestampUtils(timestampUtils)).decode(data, logSequenceNumber)); } @Test void assertDecodeInsertWithNullValue() { - ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: id[integer]:123 col0[integer]:null col1[character varying]:null col2[character varying]:'nonnull'".getBytes()); + ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: id[integer]:123 col0[integer]:null col1[character varying]:null col2[character varying]:'nonnull'" + .getBytes(StandardCharsets.UTF_8)); AbstractWALEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber); assertThat(actual, instanceOf(WriteRowEvent.class)); WriteRowEvent actualWriteRowEvent = (WriteRowEvent) actual; @@ -120,7 +123,7 @@ void assertDecodeInsertWithNullValue() { @Test void assertDecodeJsonValue() { - ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: id[integer]:123 ".getBytes()); + ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: id[integer]:123 ".getBytes(StandardCharsets.UTF_8)); AbstractWALEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber); assertThat(actual, instanceOf(WriteRowEvent.class)); }