Skip to content

Commit

Permalink
Stream for subscribing to Postgres connection notices
Browse files Browse the repository at this point in the history
[resolves pgjdbc#570]

Postgres may send notice messages through a connection, which may
contain log information or metadata related to the submitted commands.

Notices travel on the same backend messaging subsystem used by
notifications. The implementation processes NoticeResponse in a way
similar to NotificationResponse, and exposes a coherent API.
  • Loading branch information
Francesco Komauli committed Jan 13, 2023
1 parent 87a058e commit e327091
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 10 deletions.
94 changes: 84 additions & 10 deletions src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.r2dbc.postgresql.api.CopyInBuilder;
import io.r2dbc.postgresql.api.ErrorDetails;
import io.r2dbc.postgresql.api.Notice;
import io.r2dbc.postgresql.api.Notification;
import io.r2dbc.postgresql.api.PostgresTransactionDefinition;
import io.r2dbc.postgresql.api.PostgresqlResult;
Expand All @@ -31,6 +32,8 @@
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.Field;
import io.r2dbc.postgresql.message.backend.NoticeResponse;
import io.r2dbc.postgresql.message.backend.NotificationResponse;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
Expand All @@ -53,11 +56,14 @@
import reactor.util.annotation.Nullable;

import java.time.Duration;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static io.r2dbc.postgresql.client.TransactionStatus.IDLE;
import static io.r2dbc.postgresql.client.TransactionStatus.OPEN;
import org.reactivestreams.Subscriber;

/**
* An implementation of {@link Connection} for connecting to a PostgreSQL database.
Expand All @@ -78,6 +84,8 @@ final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlCo

private final AtomicReference<NotificationAdapter> notificationAdapter = new AtomicReference<>();

private final AtomicReference<NoticeAdapter> noticeAdapter = new AtomicReference<>();

private volatile IsolationLevel isolationLevel;

private volatile IsolationLevel previousIsolationLevel;
Expand Down Expand Up @@ -181,6 +189,12 @@ public Mono<Void> close() {
if (notificationAdapter != null && this.notificationAdapter.compareAndSet(notificationAdapter, null)) {
notificationAdapter.dispose();
}

NoticeAdapter noticeAdapter = this.noticeAdapter.get();

if (noticeAdapter != null && this.noticeAdapter.compareAndSet(noticeAdapter, null)) {
noticeAdapter.dispose();
}
}).then(Mono.empty());
}

Expand Down Expand Up @@ -282,6 +296,24 @@ public Flux<Notification> getNotifications() {
return notifications.getEvents();
}

@Override
public Flux<Notice> getNotices() {
NoticeAdapter notices = this.noticeAdapter.get();

if (notices == null) {

notices = new NoticeAdapter();

if (this.noticeAdapter.compareAndSet(null, notices)) {
notices.register(this.client);
} else {
notices = this.noticeAdapter.get();
}
}

return notices.getEvents();
}

@Override
public PostgresqlConnectionMetadata getMetadata() {
return new PostgresqlConnectionMetadata(this.client.getVersion());
Expand Down Expand Up @@ -486,11 +518,14 @@ private void cleanupIsolationLevel() {
}

/**
* Adapter to publish {@link Notification}s.
* Generic adapter that maps {@link BackendMessage}s received by subscribing
* to a {@link Client}
* @param <T> the exposed message type
* @param <M> the source message type
*/
static class NotificationAdapter {
static abstract class BackendMessageAdapter<T, M extends BackendMessage> {

private final Sinks.Many<Notification> sink = Sinks.many().multicast().directBestEffort();
private final Sinks.Many<T> sink = Sinks.many().multicast().directBestEffort();

@Nullable
private volatile Disposable subscription = null;
Expand All @@ -502,39 +537,78 @@ void dispose() {
}
}

abstract T mapMessage(M message);

abstract void registerSubscriber(Client client, Subscriber<M> subscriber);

void register(Client client) {

BaseSubscriber<NotificationResponse> subscriber = new BaseSubscriber<NotificationResponse>() {
BaseSubscriber<M> subscriber = new BaseSubscriber<M>() {

@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}

@Override
public void hookOnNext(NotificationResponse notificationResponse) {
NotificationAdapter.this.sink.emitNext(new NotificationResponseWrapper(notificationResponse), Sinks.EmitFailureHandler.FAIL_FAST);
public void hookOnNext(M notificationResponse) {
BackendMessageAdapter.this.sink.emitNext(mapMessage(notificationResponse), Sinks.EmitFailureHandler.FAIL_FAST);
}

@Override
public void hookOnError(Throwable throwable) {
NotificationAdapter.this.sink.emitError(throwable, Sinks.EmitFailureHandler.FAIL_FAST);
BackendMessageAdapter.this.sink.emitError(throwable, Sinks.EmitFailureHandler.FAIL_FAST);
}

@Override
public void hookOnComplete() {
NotificationAdapter.this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
BackendMessageAdapter.this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
}
};

this.subscription = subscriber;
client.addNotificationListener(subscriber);
registerSubscriber(client, subscriber);
}

Flux<Notification> getEvents() {
Flux<T> getEvents() {
return this.sink.asFlux();
}
}

/**
* Adapter to publish {@link Notification}s.
*/
static class NotificationAdapter extends BackendMessageAdapter<Notification, NotificationResponse> {

@Override
Notification mapMessage(NotificationResponse message) {
return new NotificationResponseWrapper(message);
}

@Override
void registerSubscriber(Client client, Subscriber<NotificationResponse> subscriber) {
client.addNotificationListener(subscriber);
}
}

/**
* Adapter to publish {@link Notice}s.
*/
static class NoticeAdapter extends BackendMessageAdapter<Notice, NoticeResponse> {

@Override
Notice mapMessage(NoticeResponse message) {
final Notice notice = new Notice(new EnumMap<>(Field.FieldType.class));
for (Field field : message.getFields()) {
notice.fields.put(field.getType(), field.getValue());
}
return notice;
}

@Override
void registerSubscriber(Client client, Subscriber<NoticeResponse> subscriber) {
client.addNoticeListener(subscriber);
}
}

enum EmptyTransactionDefinition implements TransactionDefinition {
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/r2dbc/postgresql/api/Notice.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.r2dbc.postgresql.api;

import io.r2dbc.postgresql.message.backend.Field;

import java.util.Map;

/**
* Postgres notice.
*/
public class Notice {

/**
* Notice messages by {@link Field.FieldType}.
*/
public final Map<Field.FieldType, String> fields;

/**
* @param fields notice messages by {@link Field.FieldType}
*/
public Notice(Map<Field.FieldType, String> fields) {
this.fields = fields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ default Mono<Long> copyIn(String sql, Publisher<ByteBuf> stdin) {
*/
Flux<Notification> getNotifications();

/**
* Return a {@link Flux} of {@link Notice} received from the connection. The stream is a hot stream producing messages as they are received. Notices received by this
* connection are published as they are received. When the client gets {@link #close() closed}, the subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport
* connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
*
* @return a hot {@link Flux} of {@link Notice Notices}
*/
Flux<Notice> getNotices();

/**
* Cancel currently running query by sending {@link CancelRequest} to a server.
*
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/r2dbc/postgresql/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.netty.buffer.ByteBufAllocator;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.NoticeResponse;
import io.r2dbc.postgresql.message.backend.NotificationResponse;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.CancelRequest;
Expand Down Expand Up @@ -60,6 +61,28 @@ public interface Client {
*/
void addNotificationListener(Subscriber<NotificationResponse> consumer);

/**
* Add a consumer of notices. Notices received by this connection are sent to the {@link Consumer notice consumer}. Note that connection errors and events such as
* disconnects are not visible to the {@link Consumer notice consumer}.
*
* @param consumer the consumer of notices
* @return a new {@link Disposable} that can be used to cancel the underlying subscription
* @throws IllegalArgumentException if {@code consumer} is {@code null}
* @since 1.1.0
*/
Disposable addNoticeListener(Consumer<NoticeResponse> consumer);

/**
* Add a consumer of notices. Notices received by this connection are sent to the {@link Subscriber notice listener}. When the client gets {@link #close() closed}, the
* subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
*
* @param consumer the consumer of notices
* @return a new {@link Disposable} that can be used to cancel the underlying subscription
* @throws IllegalArgumentException if {@code consumer} is {@code null}
* @since 1.1.0
*/
void addNoticeListener(Subscriber<NoticeResponse> consumer);

/**
* Release any resources held by the {@link Client}.
*
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public final class ReactorNettyClient implements Client {
private final Sinks.Many<Publisher<FrontendMessage>> requestSink = Sinks.many().unicast().onBackpressureBuffer();

private final Sinks.Many<NotificationResponse> notificationProcessor = Sinks.many().multicast().directBestEffort();
private final Sinks.Many<NoticeResponse> noticeProcessor = Sinks.many().multicast().directBestEffort();

private final AtomicBoolean isClosed = new AtomicBoolean(false);

Expand Down Expand Up @@ -186,6 +187,7 @@ public Mono<Void> close() {
return Mono.defer(() -> {

this.notificationProcessor.tryEmitComplete();
this.noticeProcessor.tryEmitComplete();

drainError(EXPECTED);

Expand Down Expand Up @@ -269,6 +271,8 @@ private boolean consumeMessage(BackendMessage message) {

if (message.getClass() == NoticeResponse.class) {

this.noticeProcessor.tryEmitNext((NoticeResponse) message);

this.settings.getNoticeLogLevel().log(logger, () -> this.context.getMessage(String.format("Notice: %s", toString(((NoticeResponse) message).getFields()))));
return true;
}
Expand Down Expand Up @@ -441,6 +445,16 @@ public void addNotificationListener(Subscriber<NotificationResponse> consumer) {
this.notificationProcessor.asFlux().subscribe(consumer);
}

@Override
public Disposable addNoticeListener(Consumer<NoticeResponse> consumer) {
return this.noticeProcessor.asFlux().subscribe(consumer);
}

@Override
public void addNoticeListener(Subscriber<NoticeResponse> consumer) {
this.noticeProcessor.asFlux().subscribe(consumer);
}

@Override
public ByteBufAllocator getByteBufAllocator() {
return this.byteBufAllocator;
Expand Down Expand Up @@ -530,6 +544,7 @@ private void drainError(Supplier<? extends Throwable> supplier) {
this.messageSubscriber.close(supplier);

this.notificationProcessor.tryEmitError(supplier.get());
this.noticeProcessor.tryEmitError(supplier.get());
}

private final class EnsureSubscribersCompleteChannelHandler extends ChannelDuplexHandler {
Expand Down
Loading

0 comments on commit e327091

Please sign in to comment.