Skip to content

Conversation

davyam
Copy link

@davyam davyam commented May 18, 2022

NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes from PostgreSQL tables via Logical Replication API

Authors:
Davy Machado machado.davy@gmail.com
Gerdan Santos gerdan@gmail.com

Thank you for submitting a contribution to Apache NiFi.

Please provide a short description of the PR here:

Description of PR

The CaptureChangePostgreSQL processor retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. This processor uses a Logical Replication Connection to stream data and a SQL Connection to query system views.

This new pull request builds upon the PR #5710.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically main)?

  • Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not squash or use --force when pushing to allow for clean monitoring of changes.

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • Have you verified that the full build is successful on JDK 8?
  • Have you verified that the full build is successful on JDK 11?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.

@davyam
Copy link
Author

davyam commented May 18, 2022

@joewitt @mattyb149 @exceptionfactory

Hey guys, this is the new PR for NIFI-4239 (CaptureChangePostgreSQL processor).

Sorry for the mess in #5710 =/

Please, run the workflow. Thanks!

@gerdansantos
Copy link

Now,after two year's!!! We will go on? I believe!!!!

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating and preparing the new PR @davyam. Other reviewers are probably in a better position to evaluate some of the functionality, but I provided a few comments on style and functionality.

At a high-level, on particular question concerns the use of JSON as the output format, and one message per FlowFile. For high-volume processing, the ability to combine multiple records in a single FlowFile allows for much higher throughput. As mentioned in the detailed comments, adding support for a RecordWriter service would also make the output much more flexible, supporting JSON as well as other types.

Comment on lines +50 to +59
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These dependencies can be removed since there are included by default in the root Maven configuration.

message.put("originLSN", buffer.getLong(1));

buffer.position(9);
byte[] bytes_O = new byte[buffer.remaining()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of the underscore character should be avoided in variable names.

Suggested change
byte[] bytes_O = new byte[buffer.remaining()];
byte[] origin = new byte[buffer.remaining()];

buffer.position(0);
byte[] bytes_R = new byte[buffer.capacity()];
buffer.get(bytes_R);
String string_R = new String(bytes_R, StandardCharsets.UTF_8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String string_R = new String(bytes_R, StandardCharsets.UTF_8);
String relationDefinition = new String(bytes_R, StandardCharsets.UTF_8);

position += 4;

buffer.position(0);
byte[] bytes_Y = new byte[buffer.capacity()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
byte[] bytes_Y = new byte[buffer.capacity()];
byte[] type = new byte[buffer.capacity()];

buffer.position(0);
byte[] bytes_Y = new byte[buffer.capacity()];
buffer.get(bytes_Y);
String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String string_Y = new String(bytes_Y, StandardCharsets.UTF_8);
String typeDefinition = new String(bytes_Y, StandardCharsets.UTF_8);

Comment on lines +361 to +363
if (this.replicationReader == null || this.replicationReader.getReplicationStream() == null) {
setup(context);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This null check and call to setup() does not appear to be thread safe. Did you consider annotating the setup() method with OnScheduled instead of this approach?

continue;
}

String data = this.replicationReader.convertMessageToJSON(message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than hard-coding this implementation to JSON, this would be a good opportunity to introduce a RecordWriter Service property. That would support writing messages as JSON, or any number of other supported Record formats. Using a record-oriented approach would also make the flow design much more efficient, allowing multiple messages to be packed into a single FlowFile.

Comment on lines +403 to +410
session.transfer(listFlowFiles, REL_SUCCESS);

this.lastLSN = this.replicationReader.getLastReceiveLSN();

// Feedback is sent after the flowfiles transfer completes.
this.replicationReader.sendFeedback(this.lastLSN);

updateState(context.getStateManager());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current ProcessSession implementation does not actually transfer FlowFiles until the Session is committed. It looks like this should be adjusted to use ProcessSession.commitAsync() with a callback to send feedback and update the state after a successful commit.

Comment on lines +459 to +468
@OnShutdown
public void onShutdown(ProcessContext context) {
try {
// In case we get shutdown while still running, save off the current state,
// disconnect, and shut down gracefully.
stop(context.getStateManager());
} catch (CDCException ioe) {
throw new ProcessException(ioe);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for this method in addition to OnStopped? As indicated in the documentation, it is not guaranteed that this method will be called, and in general OnStopped should be sufficient since it is doing the same thing.

}
}

private static class DriverShim implements Driver {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason for using this DriverShim wrapper?

@davyam
Copy link
Author

davyam commented May 18, 2022

@exceptionfactory thanks for review!

I'll check all suggestions and apply the necessary adjustments as soon as possible.

About the RecordWriter Service, you're right, there are many benefits, but we see that isn't essential for now. There are a lot of processors that don't use services yet. So, as this improvement requires lots of change, we'll leave that for a future processor's version.

@exceptionfactory
Copy link
Contributor

About the RecordWriter Service, you're right, there are many benefits, but we see that isn't essential for now. There are a lot of processors that don't use services yet. So, as this improvement requires lots of change, we'll leave that for a future processor's version.

Thanks for the reply @davyam. Although record-oriented support may not be required in some cases, introducing this processor without support for a RecordWriter would lock in support for the JSON-specific output.

Understanding that it requires some changes, and that a lot of work has already gone into this component, it is also important to consider community maintainability. Components designed for more narrow use cases are difficult to maintain over time, so that's why it can be more difficult to introduce new components. Generating large numbers of small FlowFiles leads to poor flow performance, so that's why it is more important to implement that capability initially, as opposed to introducing it later. If you need some assistance on integration with record handling, perhaps this could be an opportunity for additional collaboration on this PR.

@davyam
Copy link
Author

davyam commented May 18, 2022

Generating large numbers of small FlowFiles leads to poor flow performance, so that's why it is more important to implement that capability initially, as opposed to introducing it later. If you need some assistance on integration with record handling, perhaps this could be an opportunity for additional collaboration on this PR.

@exceptionfactory I understand. OK, I'll analyze the effort required to use the RecordWriter Service.

@kuleshov01
Copy link

kuleshov01 commented May 20, 2022

I have a problem. I have a clean nifi running through docker,
I use these settings for bootstrap.config

JVM memory settings
java.arg.2=-Xms2048m
java.arg.3=-Xmx4096m

Initially, docker stat displays:
CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PINS
5017cf957d26 nifi_docker 0.60% 2.185GiB / 7.771GiB 28.12% 40.5 kB / 121kB 757MB / 995kB 109

But as soon as I start the process for a table weighing 370 MB, docker stat displays:
CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PINS
5017cf957d26 nifi_docker 26.17% 4.985GiB / 7.771GiB 64.15% 1.15 GB / 26.4 MB 767MB / 1.01 MB 113

And also I'm starting to get the error: Java heap space

Here are the settings of my process:
изображение
изображение
изображение
изображение

Comment on lines +62 to +66
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.3.3</version>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewing this dependency, given that the processor has a property for the driver location, either this dependency should be marked as provided, or the property can be removed. It seems best to avoid including this library in the NAR to support more runtime flexibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I think we should keep the property and remove this JAR from the NAR so we can support maximum compatibility across past, present, and future versions of PostgreSQL without having to maintain/update the version of the JDBC driver across releases.

@davyam
Copy link
Author

davyam commented May 22, 2022

I have a problem.

Hey, @djshura2008 you are using a previous CaptureChangePostgreSQL processor version, maybe from PR 4065. I know that because we don't have anymore the "Take a initial snapshot?" property. Since then, a lot of improvements was done, so please repeat the tests with the last version. Thanks for your support.

@kuleshov01
Copy link

kuleshov01 commented May 23, 2022

I have a problem.

Hey, @djshura2008 you are using a previous CaptureChangePostgreSQL processor version, maybe from PR 4065. I know that because we don't have anymore the "Take a initial snapshot?" property. Since then, a lot of improvements was done, so please repeat the tests with the last version. Thanks for your support.

Okay, but how do you take an initial snapshot of database and immediately start the cdc process?

@davyam
Copy link
Author

davyam commented May 23, 2022

Okay, but how do you take an initial snapshot of database and immediately start the cdc process?

There were several problems with the snapshot (delay, memory, etc.). Then, we decided to remove this option when the code was refactored. Our goal is to capture the changes. But you still can do that with a backup/dump/copy before start the processor.

@MaheshKungaria
Copy link

Hi @davyam - May I know that when we can expect the "CaptureChangePostgreSQL" processor in actual released version of Nifi. We have some requirement where we need to use this feature of Nifi. Thank you.

@davyam
Copy link
Author

davyam commented Oct 5, 2022

Hi @davyam - May I know that when we can expect the "CaptureChangePostgreSQL" processor in actual released version of Nifi. We have some requirement where we need to use this feature of Nifi. Thank you.

Hi @MaheshKungaria, we expect to complete the improvements requested by reviewers until the end of the year. So, we believe that in the NiFi v1.19 our processor will be there.

@MaheshKungaria
Copy link

Thanks for the confirmation @davyam

@MaheshKungaria
Copy link

Hi @davyam - We need Nifi CapctureChangePostgreSQL processor for one of our requirement and we checked version 1.19 doesn't have it. May I know by when we can get a stable processor of CapctureChangePostgreSQL

@janis-ax
Copy link

Hey @MaheshKungaria if it's urgent for you: I build the processor, so you can try it out by putting the nar file into your lib folder.

nifi-cdc-postgresql-nar-1.20.0-SNAPSHOT.nar.zip

@davyam
Copy link
Author

davyam commented Dec 30, 2022

Hey guys,

we are a little late, I know, other personal activities get in the way, but I'm still working on the improvements.

ASAP we will have the CapctureChangePostgreSQL processor in NiFi.

Best regards

@shilpaprotiviti
Copy link

shilpaprotiviti commented Jan 18, 2023

Hi @davyam: We need Nifi CapctureChangePostgreSQL processor for one of our requirements. I have gone by above comments, I understand you are busy in other activities as well. But can you please share any tentative date by when we can get a stable processor of CapctureChangePostgreSQL

@janis-ax
Copy link

Hi @davyam: We need Nifi CapctureChangePostgreSQL processor for one of our requirements. I have gone by above comments, I understand you are busy in other activities as well. But can you please share any tentative date by when we can get a stable processor of CapctureChangePostgreSQL

@shilpaprotiviti if the processor is so urgent for you: I build and uploaded the NAR directly here. So, you can download and use the processor. :)

@davyam
Copy link
Author

davyam commented Jan 24, 2023

@shilpaprotiviti if the processor is so urgent for you: I build and uploaded the NAR directly here. So, you can download and use the processor. :)

Thanks @janis-ax for your support!

@mqofori
Copy link

mqofori commented May 19, 2023

@davyam thanks for all your work on this.

I am using the NAR that @janis-ax helped build and I keep losing connection to my server. This usually happens after the processor has been running for a while with no active changes. When I stop, re-enter the DB server password, and start the server then the error goes away.

Do you know what could be going on or how I can resolve this? Also, should we expect a stable official version any time soon?

image

@gerdansantos
Copy link

gerdansantos commented May 22, 2023 via email

@mqofori
Copy link

mqofori commented May 23, 2023

Hello @martinson, this is happening because of the logical process of logical replication on postgresql... If you see on DEBEZIUM Kafka connect, have same problem... The workaround is create a table with only one row and send a update on row changing datetime for example... This is happening generally on PostgreSQL RDS... Is this your case ? You can send that update in the minute-by-minute solution table, for example.

-- Gerdan Rezende dos Santos∴ Cloudera, Hortonworks, PostgreSQL - Support, Training & Services +55 (61) 996 451 525
On Fri, May 19, 2023 at 12:51 PM Martinson Ofori @.> wrote: @davyam https://github.com/davyam thanks for all your work on this. I am using the NAR that @janis-ax https://github.com/janis-ax helped build and I keep losing connection to my server. This usually happens after the processor has been running for a while with no active changes. When I stop, re-enter the DB server password, and start the server then the error goes away. Do you know what could be going on or how I can resolve this? Also, should we expect a stable official version any time soon? [image: image] https://user-images.githubusercontent.com/48808182/239577713-74cd0165-bb7d-4269-b80b-284d1759f1cc.png — Reply to this email directly, view it on GitHub <#6053 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AD5RGGZ4OOHMAWKZIM4JVZ3XG6JIZANCNFSM5WGR6OMA . You are receiving this because you commented.Message ID: @.>

Thank you @gerdansantos , tried this yesterday and had a few issues but couldn't spend more time due to some other production issues I have. In principle, I think this solution should work. I will test and feedback with some more concrete result sometime early tomorrow. Thanks.

@mqofori
Copy link

mqofori commented May 23, 2023

Thanks for updating and preparing the new PR @davyam. Other reviewers are probably in a better position to evaluate some of the functionality, but I provided a few comments on style and functionality.

At a high-level, on particular question concerns the use of JSON as the output format, and one message per FlowFile. For high-volume processing, the ability to combine multiple records in a single FlowFile allows for much higher throughput. As mentioned in the detailed comments, adding support for a RecordWriter service would also make the output much more flexible, supporting JSON as well as other types.

By the way, I agree with this. After testing this for some time, enhancements can definitely be made to improve the performance and scalability of this processor. The current one-record-per-FlowFile approach is very slow for bulk operations; say delete/update say 100K+ records. Some sort of batching strategy here could be particularly useful for high-volume database operations, helping to maximize throughput and address potential performance issues as @exceptionfactory has mentioned.

@exceptionfactory
Copy link
Contributor

This pull request has generated a lot of helpful feedback and interest, but given the current fundamental issues with lack of event batching and some design concerns with event decoding, there are some significant areas to address.

Additional discussion can be continued on the associated Jira issue NIFI-4239. The Jira issue is a better place to continue the discussion until this pull request can be revisited or restarted.

Given the lack of code changes in the last year, I am closing this pull request for now. This pull request can be reopened when ready, or a new pull request that addresses the current issues is another option.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants