Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose API to subscribe to Postgres notice messages #570

Closed
fkomauli opened this issue Dec 1, 2022 · 7 comments
Closed

Expose API to subscribe to Postgres notice messages #570

fkomauli opened this issue Dec 1, 2022 · 7 comments
Labels
type: enhancement A general enhancement
Milestone

Comments

@fkomauli
Copy link
Contributor

fkomauli commented Dec 1, 2022

Add a notices flux to PostgresqlConnection to receive Postgres notice messages.

Postgres may send notice messages through a connection, which may contain log information or metadata related to the submitted commands.

Example use case

A Postgres foreign data wrapper implementation may access external tables stored in multiple files (CSVs, proprietary formats, etc.), and it must send their metadata along with extracted records (type, size, data ranges, etc.). A straightforward implementation would use Postgres notices to send text messages to the client (e.g. in a parsable format like JSON).

Implementation

Notices travel on the same backend messaging subsystem used for notifications. The implementation needs to process NoticeResponses in a way similar to NotificationResponses.

This implementation choice, with the relative integration tests, is already available in a forked repository.

Example usage

import io.r2dbc.postgresql.api.PostgresqlConnection;
import io.r2dbc.postgresql.message.backend.Field;
import reactor.core.Disposable;
import java.util.function.Consumer;

public class Example {

    public static Disposable tryRegisterNoticesToConnection(PostgresqlConnection pgsql, Consumer<String> onNotice) {
        return pgsql.getNotices()
                .filter(notice -> notice.fields.containsKey(Field.FieldType.MESSAGE))
                .doOnNext(notice -> onNotice.accept(notice.fields.get(Field.FieldType.MESSAGE)))
                .subscribe();
    }
}

Other

I will open a pull request with the already implemented changes as soon as you accept to evaluate this feature

Best regards

@fkomauli fkomauli added the type: enhancement A general enhancement label Dec 1, 2022
@davecramer
Copy link
Member

FWIW, it's possible to lose NOTICE messages. I prefer logical replication for this problem. But that would entail putting the data into a table

@mp911de
Copy link
Collaborator

mp911de commented Dec 1, 2022

Do you have more details on that? I was under the assumption that notice messages are side products in the form of out-of-band data as result of exchanging commands with Postgres (such as conversion warnings).

Is there an API to capture these with PGJDBC?

@fkomauli
Copy link
Contributor Author

fkomauli commented Dec 1, 2022

Reading notices with JDBC is a bit convoluted, as they are stored in ResultSets, Statements and Connection chains of warnings:

private static void discoverAndClearNotices(ResultSet rs, BiConsumer<SqlWarningSource, SQLWarning> onNotice) throws SQLException {
    for (var warning = PgResultSetWarnings.from(rs); warning != null; warning = warning.getNextWarning()) {
        onNotice.accept(SqlWarningSource.ResultSet, warning);
    }
    PgResultSetWarnings.clear(rs);

    final var statement = rs.getStatement();
    for (var warning = statement.getWarnings(); warning != null; warning = warning.getNextWarning()) {
        onNotice.accept(SqlWarningSource.Statement, warning);
    }
    statement.clearWarnings();

    final var connection = statement.getConnection();
    for (var warning = connection.getWarnings(); warning != null; warning = warning.getNextWarning()) {
        onNotice.accept(SqlWarningSource.Connection, warning);
    }
    connection.clearWarnings();
}

(To correctly access and clean up ResultSet notices you need to bypass package protection):

package org.postgresql.jdbc;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;

public class PgResultSetWarnings {

    private PgResultSetWarnings() {}

    public static SQLWarning from(ResultSet rs) throws SQLException {
        if (rs.isWrapperFor(PgResultSet.class)) {
            return rs.unwrap(PgResultSet.class).warnings;
        }
        return rs.getWarnings();
    }

    public static void clear(ResultSet rs) throws SQLException {
        if (rs.isWrapperFor(PgResultSet.class)) {
            rs.unwrap(PgResultSet.class).warnings = null;
        } else {
            rs.clearWarnings();
        }
    }
}

(Actually, the choice to use R2dbc instead of Jdbc was forced by the implementation in Spring Data that forbids streaming records from Postgres connections, not by the implementation quirks above)

fkomauli pushed a commit to optionfactory/r2dbc-postgresql that referenced this issue Dec 1, 2022
[resolves pgjdbc#570]

Postgres may send notice messages through a connection, which may
contain log information or metadata related to the submitted commands.

Notices travel on the same backend messaging subsystem used by
notifications. The implementation processes NoticeResponse in a way
similar to NotificationResponse, and exposes a coherent API.
fkomauli pushed a commit to optionfactory/r2dbc-postgresql that referenced this issue Dec 2, 2022
[resolves pgjdbc#570]

Postgres may send notice messages through a connection, which may
contain log information or metadata related to the submitted commands.

Notices travel on the same backend messaging subsystem used by
notifications. The implementation processes NoticeResponse in a way
similar to NotificationResponse, and exposes a coherent API.
@mp911de
Copy link
Collaborator

mp911de commented Dec 7, 2022

We could probably allow generic listeners for BackendMessage so you can consume all sorts of messages for greater serviceability. A stream for NotificationResponse makes sense because we pretend it is a stream of Pub/Sub messages that is independent from backpressure/demand (obviously it isn't). For NoticeResponse, I would like to keep the exposed API minimal and approach the issue through a generic listener and retrofit addNotificationListener(Consumer<NotificationResponse> consumer).

fkomauli pushed a commit to optionfactory/r2dbc-postgresql that referenced this issue Jan 13, 2023
[resolves pgjdbc#570]

Postgres may send notice messages through a connection, which may
contain log information or metadata related to the submitted commands.

Notices travel on the same backend messaging subsystem used by
notifications. The implementation processes NoticeResponse in a way
similar to NotificationResponse, and exposes a coherent API.
@fkomauli
Copy link
Contributor Author

There are two issues bugging me about this:

  1. a generic listener system for BackendMessage could have significant performance impact; a good implementation would require dynamic lookup to avoid dispatching messages for which there are no registered listeners.
  2. apart from NotificationResponse and NoticeResponse, there is maybe ParameterStatus message that is not used by the driver itself and is meant for consumption by the application; exposing backend messages that are meant for consumption by the driver seems wrong, and could give the wrong idea that those are meant to be used in application code.

Given the above, we could address the code duplication in the proposed PR, by abstracting the common logic between NotificationResponse and NoticeResponse, while still:

  • exposing a high level API for the specific use cases, that is PostgresqlConnection::getNotifications, PostgresqlConnection::getNotices and maybe (maybe!) PostgresqlConnection::getParameterStatusChanges
  • statically dispatching only the appropriate BackendMessages instances, thus avoiding dynamic lookups and potential performance issues.

fkomauli pushed a commit to optionfactory/r2dbc-postgresql that referenced this issue Jan 13, 2023
[resolves pgjdbc#570]

Postgres may send notice messages through a connection, which may
contain log information or metadata related to the submitted commands.

Notices travel on the same backend messaging subsystem used by
notifications. The implementation processes NoticeResponse in a way
similar to NotificationResponse, and exposes a coherent API.
fkomauli pushed a commit to optionfactory/r2dbc-postgresql that referenced this issue Jan 13, 2023
[resolves pgjdbc#570]

Postgres may send notice messages through a connection, which may
contain log information or metadata related to the submitted commands.

Notices travel on the same backend messaging subsystem used by
notifications. The implementation processes NoticeResponse in a way
similar to NotificationResponse, and exposes a coherent API.
@fkomauli
Copy link
Contributor Author

fkomauli commented Jun 7, 2023

Hi, do you have any update on this issue? What's your opinion about my last considerations?

fkomauli pushed a commit to optionfactory/r2dbc-postgresql that referenced this issue Jun 12, 2023
[resolves pgjdbc#570]

Postgres may send notice messages through a connection, which may
contain log information or metadata related to the submitted commands.

Notices travel on the same backend messaging subsystem used by
notifications. The implementation processes NoticeResponse in a way
similar to NotificationResponse, and exposes a coherent API.
fkomauli pushed a commit to optionfactory/r2dbc-postgresql that referenced this issue Jun 19, 2023
[resolves pgjdbc#570]

Postgres may send notice messages through a connection, which may
contain log information or metadata related to the submitted commands.

Notices travel on the same backend messaging subsystem used by
notifications. The implementation processes NoticeResponse in a way
similar to NotificationResponse, and exposes a coherent API.
@mp911de mp911de added this to the 1.0.2.RELEASE milestone Jul 11, 2023
@mp911de
Copy link
Collaborator

mp911de commented Jul 11, 2023

With the availability of the Segment API, it makes sense to expose notices through flatMap(Result.Message) to consume these within the context of a query. We only need to pass these thru. Consuming would work similar to:

connection.createStatement("DO language plpgsql $$\n" +
        "BEGIN\n" +
        "  RAISE NOTICE 'hello, world!';\n" +
        "END\n" +
        "$$;").execute().flatMap(it -> it.flatMap(segment -> {
        if (segment instanceof Result.Message) {
            return Mono.just(((Result.Message) segment).message());
        }
        return Mono.empty();
    })).as(StepVerifier::create)
    .expectNext("hello, world!")
    .verifyComplete();

Let me know whether that works for you.

mp911de added a commit that referenced this issue Jul 11, 2023
We now pass-thru NoticeResponse so that notices can be consumed as Result.Message.

[#570]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

3 participants