diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java index 04b0fb6f..e3294eb4 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java @@ -18,6 +18,7 @@ import io.r2dbc.postgresql.api.CopyInBuilder; import io.r2dbc.postgresql.api.ErrorDetails; +import io.r2dbc.postgresql.api.Notice; import io.r2dbc.postgresql.api.Notification; import io.r2dbc.postgresql.api.PostgresTransactionDefinition; import io.r2dbc.postgresql.api.PostgresqlResult; @@ -31,6 +32,8 @@ import io.r2dbc.postgresql.message.backend.BackendMessage; import io.r2dbc.postgresql.message.backend.CommandComplete; import io.r2dbc.postgresql.message.backend.ErrorResponse; +import io.r2dbc.postgresql.message.backend.Field; +import io.r2dbc.postgresql.message.backend.NoticeResponse; import io.r2dbc.postgresql.message.backend.NotificationResponse; import io.r2dbc.postgresql.util.Assert; import io.r2dbc.postgresql.util.Operators; @@ -53,11 +56,14 @@ import reactor.util.annotation.Nullable; import java.time.Duration; +import java.util.EnumMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import static io.r2dbc.postgresql.client.TransactionStatus.IDLE; import static io.r2dbc.postgresql.client.TransactionStatus.OPEN; +import org.reactivestreams.Subscriber; /** * An implementation of {@link Connection} for connecting to a PostgreSQL database. @@ -78,6 +84,8 @@ final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlCo private final AtomicReference notificationAdapter = new AtomicReference<>(); + private final AtomicReference noticeAdapter = new AtomicReference<>(); + private volatile IsolationLevel isolationLevel; private volatile IsolationLevel previousIsolationLevel; @@ -181,6 +189,12 @@ public Mono close() { if (notificationAdapter != null && this.notificationAdapter.compareAndSet(notificationAdapter, null)) { notificationAdapter.dispose(); } + + NoticeAdapter noticeAdapter = this.noticeAdapter.get(); + + if (noticeAdapter != null && this.noticeAdapter.compareAndSet(noticeAdapter, null)) { + noticeAdapter.dispose(); + } }).then(Mono.empty()); } @@ -282,6 +296,24 @@ public Flux getNotifications() { return notifications.getEvents(); } + @Override + public Flux getNotices() { + NoticeAdapter notices = this.noticeAdapter.get(); + + if (notices == null) { + + notices = new NoticeAdapter(); + + if (this.noticeAdapter.compareAndSet(null, notices)) { + notices.register(this.client); + } else { + notices = this.noticeAdapter.get(); + } + } + + return notices.getEvents(); + } + @Override public PostgresqlConnectionMetadata getMetadata() { return new PostgresqlConnectionMetadata(this.client.getVersion()); @@ -486,11 +518,14 @@ private void cleanupIsolationLevel() { } /** - * Adapter to publish {@link Notification}s. + * Generic adapter that maps {@link BackendMessage}s received by subscribing + * to a {@link Client} + * @param the exposed message type + * @param the source message type */ - static class NotificationAdapter { + static abstract class BackendMessageAdapter { - private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); + private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); @Nullable private volatile Disposable subscription = null; @@ -502,9 +537,13 @@ void dispose() { } } + abstract T mapMessage(M message); + + abstract void registerSubscriber(Client client, Subscriber subscriber); + void register(Client client) { - BaseSubscriber subscriber = new BaseSubscriber() { + BaseSubscriber subscriber = new BaseSubscriber() { @Override protected void hookOnSubscribe(Subscription subscription) { @@ -512,29 +551,64 @@ protected void hookOnSubscribe(Subscription subscription) { } @Override - public void hookOnNext(NotificationResponse notificationResponse) { - NotificationAdapter.this.sink.emitNext(new NotificationResponseWrapper(notificationResponse), Sinks.EmitFailureHandler.FAIL_FAST); + public void hookOnNext(M notificationResponse) { + BackendMessageAdapter.this.sink.emitNext(mapMessage(notificationResponse), Sinks.EmitFailureHandler.FAIL_FAST); } @Override public void hookOnError(Throwable throwable) { - NotificationAdapter.this.sink.emitError(throwable, Sinks.EmitFailureHandler.FAIL_FAST); + BackendMessageAdapter.this.sink.emitError(throwable, Sinks.EmitFailureHandler.FAIL_FAST); } @Override public void hookOnComplete() { - NotificationAdapter.this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); + BackendMessageAdapter.this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); } }; this.subscription = subscriber; - client.addNotificationListener(subscriber); + registerSubscriber(client, subscriber); } - Flux getEvents() { + Flux getEvents() { return this.sink.asFlux(); } + } + + /** + * Adapter to publish {@link Notification}s. + */ + static class NotificationAdapter extends BackendMessageAdapter { + + @Override + Notification mapMessage(NotificationResponse message) { + return new NotificationResponseWrapper(message); + } + + @Override + void registerSubscriber(Client client, Subscriber subscriber) { + client.addNotificationListener(subscriber); + } + } + + /** + * Adapter to publish {@link Notice}s. + */ + static class NoticeAdapter extends BackendMessageAdapter { + + @Override + Notice mapMessage(NoticeResponse message) { + final Notice notice = new Notice(new EnumMap<>(Field.FieldType.class)); + for (Field field : message.getFields()) { + notice.fields.put(field.getType(), field.getValue()); + } + return notice; + } + @Override + void registerSubscriber(Client client, Subscriber subscriber) { + client.addNoticeListener(subscriber); + } } enum EmptyTransactionDefinition implements TransactionDefinition { diff --git a/src/main/java/io/r2dbc/postgresql/api/Notice.java b/src/main/java/io/r2dbc/postgresql/api/Notice.java new file mode 100644 index 00000000..10dc6d73 --- /dev/null +++ b/src/main/java/io/r2dbc/postgresql/api/Notice.java @@ -0,0 +1,23 @@ +package io.r2dbc.postgresql.api; + +import io.r2dbc.postgresql.message.backend.Field; + +import java.util.Map; + +/** + * Postgres notice. + */ +public class Notice { + + /** + * Notice messages by {@link Field.FieldType}. + */ + public final Map fields; + + /** + * @param fields notice messages by {@link Field.FieldType} + */ + public Notice(Map fields) { + this.fields = fields; + } +} diff --git a/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java index a1309e46..f96400aa 100644 --- a/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java @@ -110,6 +110,15 @@ default Mono copyIn(String sql, Publisher stdin) { */ Flux getNotifications(); + /** + * Return a {@link Flux} of {@link Notice} received from the connection. The stream is a hot stream producing messages as they are received. Notices received by this + * connection are published as they are received. When the client gets {@link #close() closed}, the subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport + * connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}. + * + * @return a hot {@link Flux} of {@link Notice Notices} + */ + Flux getNotices(); + /** * Cancel currently running query by sending {@link CancelRequest} to a server. * diff --git a/src/main/java/io/r2dbc/postgresql/client/Client.java b/src/main/java/io/r2dbc/postgresql/client/Client.java index e563e2d0..edafa06c 100644 --- a/src/main/java/io/r2dbc/postgresql/client/Client.java +++ b/src/main/java/io/r2dbc/postgresql/client/Client.java @@ -18,6 +18,7 @@ import io.netty.buffer.ByteBufAllocator; import io.r2dbc.postgresql.message.backend.BackendMessage; +import io.r2dbc.postgresql.message.backend.NoticeResponse; import io.r2dbc.postgresql.message.backend.NotificationResponse; import io.r2dbc.postgresql.message.backend.ReadyForQuery; import io.r2dbc.postgresql.message.frontend.CancelRequest; @@ -60,6 +61,28 @@ public interface Client { */ void addNotificationListener(Subscriber consumer); + /** + * Add a consumer of notices. Notices received by this connection are sent to the {@link Consumer notice consumer}. Note that connection errors and events such as + * disconnects are not visible to the {@link Consumer notice consumer}. + * + * @param consumer the consumer of notices + * @return a new {@link Disposable} that can be used to cancel the underlying subscription + * @throws IllegalArgumentException if {@code consumer} is {@code null} + * @since 1.1.0 + */ + Disposable addNoticeListener(Consumer consumer); + + /** + * Add a consumer of notices. Notices received by this connection are sent to the {@link Subscriber notice listener}. When the client gets {@link #close() closed}, the + * subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}. + * + * @param consumer the consumer of notices + * @return a new {@link Disposable} that can be used to cancel the underlying subscription + * @throws IllegalArgumentException if {@code consumer} is {@code null} + * @since 1.1.0 + */ + void addNoticeListener(Subscriber consumer); + /** * Release any resources held by the {@link Client}. * diff --git a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java index c7abfb40..171461cd 100644 --- a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java +++ b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java @@ -112,6 +112,7 @@ public final class ReactorNettyClient implements Client { private final Sinks.Many> requestSink = Sinks.many().unicast().onBackpressureBuffer(); private final Sinks.Many notificationProcessor = Sinks.many().multicast().directBestEffort(); + private final Sinks.Many noticeProcessor = Sinks.many().multicast().directBestEffort(); private final AtomicBoolean isClosed = new AtomicBoolean(false); @@ -186,6 +187,7 @@ public Mono close() { return Mono.defer(() -> { this.notificationProcessor.tryEmitComplete(); + this.noticeProcessor.tryEmitComplete(); drainError(EXPECTED); @@ -269,6 +271,8 @@ private boolean consumeMessage(BackendMessage message) { if (message.getClass() == NoticeResponse.class) { + this.noticeProcessor.tryEmitNext((NoticeResponse) message); + this.settings.getNoticeLogLevel().log(logger, () -> this.context.getMessage(String.format("Notice: %s", toString(((NoticeResponse) message).getFields())))); return true; } @@ -441,6 +445,16 @@ public void addNotificationListener(Subscriber consumer) { this.notificationProcessor.asFlux().subscribe(consumer); } + @Override + public Disposable addNoticeListener(Consumer consumer) { + return this.noticeProcessor.asFlux().subscribe(consumer); + } + + @Override + public void addNoticeListener(Subscriber consumer) { + this.noticeProcessor.asFlux().subscribe(consumer); + } + @Override public ByteBufAllocator getByteBufAllocator() { return this.byteBufAllocator; @@ -530,6 +544,7 @@ private void drainError(Supplier supplier) { this.messageSubscriber.close(supplier); this.notificationProcessor.tryEmitError(supplier.get()); + this.noticeProcessor.tryEmitError(supplier.get()); } private final class EnsureSubscribersCompleteChannelHandler extends ChannelDuplexHandler { diff --git a/src/test/java/io/r2dbc/postgresql/PostgresNoticeIntegrationTests.java b/src/test/java/io/r2dbc/postgresql/PostgresNoticeIntegrationTests.java new file mode 100644 index 00000000..95e60a7e --- /dev/null +++ b/src/test/java/io/r2dbc/postgresql/PostgresNoticeIntegrationTests.java @@ -0,0 +1,91 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.r2dbc.postgresql; + +import io.netty.channel.Channel; +import io.r2dbc.postgresql.api.Notice; +import io.r2dbc.postgresql.api.PostgresqlConnection; +import io.r2dbc.postgresql.api.PostgresqlResult; +import io.r2dbc.postgresql.message.backend.Field; +import io.r2dbc.postgresql.util.ConnectionIntrospector; +import io.r2dbc.spi.R2dbcNonTransientResourceException; +import java.time.Duration; +import org.junit.jupiter.api.Test; +import reactor.core.Disposable; +import reactor.test.StepVerifier; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for {@link Notice} through {@link PostgresqlConnection#getNotices()}. + */ +final class PostgresNoticeIntegrationTests extends AbstractIntegrationTests { + + private static final String RAISE_INFO_FUNCTION = + "CREATE OR REPLACE FUNCTION raise_info(text)" + + " RETURNS void AS $$" + + " BEGIN" + + " RAISE INFO '%', $1;" + + " END;" + + " $$ LANGUAGE plpgsql;"; + + @Test + void shouldReceivePubSubNotices() throws Exception { + + BlockingQueue notices = new LinkedBlockingQueue<>(); + + this.connectionFactory.create().flatMap(it -> { + it.getNotices().doOnNext(notices::add).subscribe(); + return it.createStatement(RAISE_INFO_FUNCTION).execute().then() + .then(it.createStatement("SELECT raise_info('Test Message')").execute().then()); + }).block(Duration.ofSeconds(10)); + + Notice notice = notices.poll(10, TimeUnit.SECONDS); + + assertThat(notice).isNotNull(); + assertThat(notice.fields.containsKey(Field.FieldType.MESSAGE)).isTrue(); + assertThat(notice.fields.get(Field.FieldType.MESSAGE)).isEqualTo("Test Message"); + } + + @Test + void listenShouldCompleteOnConnectionClose() { + + PostgresqlConnection connection = this.connectionFactory.create().block(); + + connection.getNotices().as(StepVerifier::create).expectSubscription() + .then(() -> connection.close().subscribe()) + .verifyComplete(); + } + + @Test + void listenShouldFailOnConnectionDisconnected() { + + PostgresqlConnection connection = this.connectionFactory.create().block(); + + connection.getNotices().as(StepVerifier::create).expectSubscription() + .then(() -> { + Channel channel = ConnectionIntrospector.of(connection).getChannel(); + channel.close(); + }) + .verifyError(R2dbcNonTransientResourceException.class); + } +} diff --git a/src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java b/src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java index 1d7709ee..2e4efbb5 100644 --- a/src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java +++ b/src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java @@ -77,6 +77,11 @@ public Flux getNotifications() { return Flux.empty(); } + @Override + public Flux getNotices() { + return Flux.empty(); + } + @Override public Mono cancelRequest() { return Mono.empty(); diff --git a/src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientIntegrationTests.java b/src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientIntegrationTests.java index f989cede..f4302d60 100644 --- a/src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientIntegrationTests.java +++ b/src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientIntegrationTests.java @@ -26,6 +26,8 @@ import io.r2dbc.postgresql.message.backend.BackendMessage; import io.r2dbc.postgresql.message.backend.CommandComplete; import io.r2dbc.postgresql.message.backend.DataRow; +import io.r2dbc.postgresql.message.backend.Field.FieldType; +import io.r2dbc.postgresql.message.backend.NoticeResponse; import io.r2dbc.postgresql.message.backend.NotificationResponse; import io.r2dbc.postgresql.message.backend.RowDescription; import io.r2dbc.postgresql.message.frontend.FrontendMessage; @@ -805,6 +807,32 @@ void handleNotify() { .verify(); } + @Test + void handleNotice() { + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); + this.client.addNoticeListener(sink::tryEmitNext); + + SERVER.getJdbcOperations().execute( + "CREATE OR REPLACE FUNCTION raise_info(text)" + + " RETURNS void AS $$" + + " BEGIN" + + " RAISE INFO '%', $1;" + + " END;" + + " $$ LANGUAGE plpgsql;" + ); + + this.client + .exchange(Mono.just(new Query("SELECT raise_info('Test Message')"))) + .blockLast(); + + io.r2dbc.postgresql.message.backend.Field expected = new io.r2dbc.postgresql.message.backend.Field(FieldType.MESSAGE, "Test Message"); + + StepVerifier.create(sink.asFlux()) + .assertNext(message -> assertThat(message.getFields()).contains(expected)) + .thenCancel() + .verify(); + } + @Test void handleTrigger() { SERVER.getJdbcOperations().execute( diff --git a/src/test/java/io/r2dbc/postgresql/client/TestClient.java b/src/test/java/io/r2dbc/postgresql/client/TestClient.java index b783bf18..53a65eba 100644 --- a/src/test/java/io/r2dbc/postgresql/client/TestClient.java +++ b/src/test/java/io/r2dbc/postgresql/client/TestClient.java @@ -18,6 +18,7 @@ import io.netty.buffer.ByteBufAllocator; import io.r2dbc.postgresql.message.backend.BackendMessage; +import io.r2dbc.postgresql.message.backend.NoticeResponse; import io.r2dbc.postgresql.message.backend.NotificationResponse; import io.r2dbc.postgresql.message.frontend.FrontendMessage; import io.r2dbc.postgresql.util.Assert; @@ -52,6 +53,7 @@ public final class TestClient implements Client { private final boolean connected; private final Sinks.Many notificationProcessor = Sinks.many().multicast().onBackpressureBuffer(); + private final Sinks.Many noticeProcessor = Sinks.many().multicast().onBackpressureBuffer(); private final Integer processId; @@ -183,10 +185,24 @@ public void addNotificationListener(Subscriber consumer) { this.notificationProcessor.asFlux().subscribe(consumer); } + @Override + public Disposable addNoticeListener(Consumer consumer) { + return this.noticeProcessor.asFlux().subscribe(consumer); + } + + @Override + public void addNoticeListener(Subscriber consumer) { + this.noticeProcessor.asFlux().subscribe(consumer); + } + public void notify(NotificationResponse notification) { this.notificationProcessor.tryEmitNext(notification); } + public void notice(NoticeResponse notice) { + this.noticeProcessor.tryEmitNext(notice); + } + public static final class Builder { private final List> windows = new ArrayList<>();