Skip to content

Conversation

@JNSimba
Copy link
Member

@JNSimba JNSimba commented Nov 1, 2024

https://issues.apache.org/jira/browse/FLINK-36649

Oracle When reading via OracleIncrementalSource, the connection is occasionally closed.

14:57:56,432 INFO  org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher
14:57:56,597 INFO  io.debezium.jdbc.JdbcConnection                              [pool-14-thread-1] [] - Connection gracefully closed
14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper         [debezium-snapshot-reader-0] [] - Mining session stopped due to the java.sql.SQLException: 关闭的 Resultset: getLong
14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler                            [debezium-snapshot-reader-0] [] - Producer failure
java.sql.SQLException: 关闭的 Resultset: getLong
    at oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254) ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0]
    at io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373) ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
    at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
    at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
    at io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372) ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
    at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353) ~[flink-connector-oracle-cdc/:?]
    at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258) ~[flink-connector-oracle-cdc/:?]
    at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139) ~[flink-connector-oracle-cdc/:?]
    at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106) ~[flink-connector-oracle-cdc/:?]
    at org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112) ~[flink-cdc-base/:?]
    at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99) ~[flink-cdc-base/:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322]
    at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] 
 

reason:
This is because after split is read, the reader will be closed, at which point LogMinerStreamingChangeEventSource will perform captureSessionMemoryStatistics to obtain statistical information.

Finally, in the code

public <T> T queryAndMap(String query, StatementFactory statementFactory, ResultSetMapper<T> mapper) throws SQLException {
    Objects.requireNonNull(mapper, "Mapper must be provided");
    Connection conn = connection();      // Check if the conn is connected
    try (Statement statement = statementFactory.createStatement(conn);) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("running '{}'", query);
        }
        try (ResultSet resultSet = statement.executeQuery(query);) {
            //When you get here, split executes the close method to close the connection, and an error will be reported
            return mapper.apply(resultSet);
        }
    }
} 

solve:

  1. we can regenerate a connection before calling the captureSessionMemoryStatistics(connection) method, but this will be time-consuming. In my local test, it took 6 seconds. (Not recommended)

  2. Since captureSessionMemoryStatistics is just statistical information, I think it can be placed before process, so that it can ensure that the connection is no longer in use when split reader close.(I think this is better)

@JNSimba
Copy link
Member Author

JNSimba commented Nov 1, 2024

@leonardBang @ruanhang1993 PTAL, Thanks

@leonardBang
Copy link
Contributor

https://issues.apache.org/jira/browse/FLINK-36649

@JNSimba Thanks for your contribution, could add a little description for this PR? You can copy from your jira issue, the jira issue description is well reported.

And, a test is welcome for this PR.

@JNSimba
Copy link
Member Author

JNSimba commented Nov 5, 2024

@leonardBang Thanks for your reviewd, I added a description. I see that there are similar cases already in ITCase, such as testConsumingAllEvents in OracleConnectorITCase class.
But it is strange that these ITCases have never reported errors, but they appear frequently in my local area, and from the code point of view, this situation may occur.

Copy link
Contributor

@ruanhang1993 ruanhang1993 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@ruanhang1993 ruanhang1993 merged commit 7f6d911 into apache:master Nov 8, 2024
30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment