Skip to content

Commit

Permalink
Improve PostgreSQL TestDecodingPlugin encoding (#28781)
Browse files Browse the repository at this point in the history
* Improve SimpleMemoryPipelineChannelTest

* Use UTF-8 for PostgreSQL TestDecodingPlugin
  • Loading branch information
sandynz authored Oct 17, 2023
1 parent 5c6a968 commit d1f4df8
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ void assertFetchRecordsTimeoutCorrectly() {
SimpleMemoryPipelineChannel simpleMemoryPipelineChannel = new SimpleMemoryPipelineChannel(10, new EmptyAckCallback());
long startMills = System.currentTimeMillis();
simpleMemoryPipelineChannel.fetchRecords(1, 1, TimeUnit.MILLISECONDS);
long endMills = System.currentTimeMillis();
assertTrue(endMills - startMills >= 1 && endMills - startMills < 50);
long delta = System.currentTimeMillis() - startMills;
assertTrue(delta >= 1 && delta < 50, "Delta is not in [1,50) : " + delta);
startMills = System.currentTimeMillis();
simpleMemoryPipelineChannel.fetchRecords(1, 500, TimeUnit.MILLISECONDS);
endMills = System.currentTimeMillis();
assertTrue(endMills - startMills >= 500 && endMills - startMills < 600);
delta = System.currentTimeMillis() - startMills;
assertTrue(delta >= 500 && delta < 650, "Delta is not in [500,650) : " + delta);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.SQLException;
import java.util.LinkedList;
Expand Down Expand Up @@ -252,7 +253,7 @@ private String readStringSegment(final ByteBuffer data, final int startPosition,
for (int i = 0; i < offset; i++) {
result[i] = data.get(startPosition + i);
}
return new String(result);
return new String(result, StandardCharsets.UTF_8);
}

private String readNextString(final ByteBuffer data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.postgresql.replication.LogSequenceNumber;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;

import static org.hamcrest.CoreMatchers.instanceOf;
Expand All @@ -47,7 +48,7 @@ class TestDecodingPluginTest {
@Test
void assertDecodeWriteRowEvent() {
ByteBuffer data = ByteBuffer.wrap(("table public.test: INSERT: data[character varying]:' 1 2 3'' 😊中' t_json_empty[json]:'{}' t_json[json]:'{\"test\":\"中中{中中}' 中\"}'"
+ " t_jsonb[jsonb]:'{\"test\":\"😊Emoji中\"}'").getBytes());
+ " t_jsonb[jsonb]:'{\"test\":\"😊Emoji中\"}'").getBytes(StandardCharsets.UTF_8));
WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
Expand All @@ -59,7 +60,8 @@ void assertDecodeWriteRowEvent() {

@Test
void assertDecodeUpdateRowEvent() {
ByteBuffer data = ByteBuffer.wrap("table public.test: UPDATE: unicode[character varying]:' 1 2 3'' 😊中 ' t_json_empty[json]:'{}' t_json[json]:'{\"test\":\"中中{中中}' 中\"}'".getBytes());
ByteBuffer data = ByteBuffer.wrap("table public.test: UPDATE: unicode[character varying]:' 1 2 3'' 😊中 ' t_json_empty[json]:'{}' t_json[json]:'{\"test\":\"中中{中中}' 中\"}'"
.getBytes(StandardCharsets.UTF_8));
UpdateRowEvent actual = (UpdateRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
Expand All @@ -70,7 +72,7 @@ void assertDecodeUpdateRowEvent() {

@Test
void assertDecodeDeleteRowEvent() {
ByteBuffer data = ByteBuffer.wrap("table public.test: DELETE: data[integer]:1".getBytes());
ByteBuffer data = ByteBuffer.wrap("table public.test: DELETE: data[integer]:1".getBytes(StandardCharsets.UTF_8));
DeleteRowEvent actual = (DeleteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
Expand All @@ -79,7 +81,7 @@ void assertDecodeDeleteRowEvent() {

@Test
void assertDecodeWriteRowEventWithByteA() {
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[bytea]:'\\xff00ab'".getBytes());
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[bytea]:'\\xff00ab'".getBytes(StandardCharsets.UTF_8));
WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
Expand All @@ -88,27 +90,28 @@ void assertDecodeWriteRowEventWithByteA() {

@Test
void assertDecodeUnknownTableType() {
ByteBuffer data = ByteBuffer.wrap("unknown".getBytes());
ByteBuffer data = ByteBuffer.wrap("unknown".getBytes(StandardCharsets.UTF_8));
assertThat(new TestDecodingPlugin(null).decode(data, logSequenceNumber), instanceOf(PlaceholderEvent.class));
}

@Test
void assertDecodeUnknownRowEventType() {
ByteBuffer data = ByteBuffer.wrap("table public.test: UNKNOWN: data[character varying]:'1 2 3'''".getBytes());
ByteBuffer data = ByteBuffer.wrap("table public.test: UNKNOWN: data[character varying]:'1 2 3'''".getBytes(StandardCharsets.UTF_8));
assertThrows(IngestException.class, () -> new TestDecodingPlugin(null).decode(data, logSequenceNumber));
}

@Test
void assertDecodeTime() throws SQLException {
TimestampUtils timestampUtils = mock(TimestampUtils.class);
when(timestampUtils.toTime(null, "1 2 3'")).thenThrow(new SQLException(""));
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[time without time zone]:'1 2 3'''".getBytes());
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[time without time zone]:'1 2 3'''".getBytes(StandardCharsets.UTF_8));
assertThrows(DecodingException.class, () -> new TestDecodingPlugin(new PostgreSQLTimestampUtils(timestampUtils)).decode(data, logSequenceNumber));
}

@Test
void assertDecodeInsertWithNullValue() {
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: id[integer]:123 col0[integer]:null col1[character varying]:null col2[character varying]:'nonnull'".getBytes());
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: id[integer]:123 col0[integer]:null col1[character varying]:null col2[character varying]:'nonnull'"
.getBytes(StandardCharsets.UTF_8));
AbstractWALEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual, instanceOf(WriteRowEvent.class));
WriteRowEvent actualWriteRowEvent = (WriteRowEvent) actual;
Expand All @@ -120,7 +123,7 @@ void assertDecodeInsertWithNullValue() {

@Test
void assertDecodeJsonValue() {
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: id[integer]:123 ".getBytes());
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: id[integer]:123 ".getBytes(StandardCharsets.UTF_8));
AbstractWALEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual, instanceOf(WriteRowEvent.class));
}
Expand Down

0 comments on commit d1f4df8

Please sign in to comment.