Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for "inferred deletes" on incrementally synced connections without CDC #6383

Closed
archaean opened this issue Sep 22, 2021 · 2 comments
Closed
Labels
area/connectors Connector related issues type/enhancement New feature or request wontfix This will not be worked on

Comments

@archaean
Copy link
Contributor

Tell us about the problem you're trying to solve

It would be convenient for Airbyte to support "inferred deletes" on "Incremental Sync" database connections that do not utilize CDC (e.g. not setup to use a replication slot on a Postgres database).

Describe the solution you’d like

Disclaimer - After thinking through this for a while I realize that this feature might be Dead on Arrival, but I hadn't seen any mention of it and wanted to pickle my thoughts and get Airbyte developer feedback.

Pre-emptive TLDR

"Inferred deletes" can be constructed by a simple set operation: the set of destination primary keys minus the set of source primary keys for a given table.

Implementation - Create a new deletion propagation worker that:

  • collects and batches all the distinct primary keys from the destination table
  • runs the above set operation on the source system (with custom but similar SQL code added per connector)
  • sends tombstone records to the destination for any pk produced by the set operation

Despite being the best I could come up with, this implementation is quite computationally expensive on both the worker node and the source system.

The "How"

In general, inferring deletes on a table is a simple set operation between the destination and source table. It can be achieved by taking the set of destination primary keys minus the set of source primary keys yielding the set of primary keys that need to be deleted (sent to the destination system as tombstones).

WITH source_keys AS (
    SELECT DISTINCT (composite, key)
    FROM source_table),

     destination_keys AS (
         SELECT DISTINCT (composite, key)
         FROM destination_table
         MINUS
         SELECT (composite, key)
         FROM destination_table
         WHERE deleted = TRUE
     )

SELECT *
FROM destination_keys
MINUS
SELECT *
FROM source_keys;

The "Where"

The problem here isn't "how" to infer deletes, but "where". Specifically, for the set operation to be performed, both sets need to exist in the same place (the source set in its' entirety and the destination set potentially partitioned or sent in batches). This place could be one of 3 basic locations: the source system, completely in memory on the worker node, or the destination system.

Destination System

This seems like a bad idea idea for 2 reasons:

  • In order for the set operation to be performed, ALL of the primary keys from the source system would have to be moved over destination system (essentially a FULL_REFRESH with less data) which in some ways defeats the purpose of syncing using INCREMENTAL over a FULL_REFRESH for the table.
  • The inferred delete processing would rely heavily on destination system instead of solely on the source system (which currently and trivially handles tombstones when CDC is utilized, e.g. postgres replication slot).

A Worker Node (both sets in memory)

While this might work in some cases, storing the whole of a table's primary keys as a set in memory will likely become untenable or at least very difficult to manage. The worker node's memory requirements would need to scale with the size of the source table that it is processing, which adds - at the very least - additional deployment concerns.

It did occurred to me, since the memory requirement of the set was the main issue, why not potentially employ a Bloom filter here to handle the majority of set operation... But alas, this would not be an appropriate use case. That is, Bloom filters provide the response "definitely not in the set" or "possibly in the set" and the vast majority of the results would return "possibly in the set" which is essentially a cache miss requiring access to the actual set. I don't know of any "reverse" bloom filters ("definitely in the set" or "possibly not in the set") that provide the same memory advantages. So this would be a non solution.

Source System

Pushing the destination set into the source system to perform the set operation is likely a non-starter because the source system could be read-only and not something to which Airbyte could write.

However, it is possible to "send" pks to the source system to perform a set operation (in number of batches), without actually storing the data on the source system, namely using a CTE/SQL query:

WITH batch_of_dest_pks AS (
    SELECT 'composite_1', 'key_1'
    UNION ALL
    SELECT 'composite_2', 'key_2'
    UNION ALL
    ...
    SELECT 'composite_n', 'key_n'
),
     source_keys AS (
         SELECT DISTINCT (composite, key)
         FROM source_table
     )
SELECT *
FROM batch_of_dest_pks
MINUS
SELECT *
FROM source_keys;

All that said, the most feasible solution - for most databases - would be to create a new Worker (DeletionPropagationWorker) that periodically:

  • Collects the distinct primary keys from the destination system
  • For each batch of primary keys construct a CTE/SQL query and run that on the source system:
  • For any resulting PK that comes out of the operation propagate a tombstone record to the destination system.

This would likely also require a new optionally implementable interface to be created by each connector so that it could determine how the CTE/SQL query is constructed and run on the source system.

Describe the alternative you’ve considered or used

  • A periodic FULL_REFRESH of the table will by its nature propagate deletes
  • Using CDC from the source system (e.g. reading the WAL from a postgres replication slot can trivially propagate tombstone records to the destination system.

Any way I think about this, even my "feasible" solution, amounts to a significant computational burden and substantial data flowing back from the destination system (something that I assume doesn't happen extensively, currently). In a zero sum game, doing "inferred deletes" could take up substantial processing resources that could otherwise be devoted to moving data from source to destination, or to a lesser extent compete with operational queries on the source system.

The alternatives of doing a FULL_REFRESH or leveraging CDC seem far more appealing.

Maybe I missed something, which would be good to hear, but I wanted to get confirmation of likelihood of this feature.

Are you willing to submit a PR?

Yes, though I suspect this a larger architectural issue that would have many downstream impacts, which Airbyte's code governance would like more control over.

@archaean archaean added the type/enhancement New feature or request label Sep 22, 2021
@sherifnada
Copy link
Contributor

@archaean thanks for the really thorough write up! You've touched on a lot of the right points here.

I think my first inclination would be to understand the motivation behind such a solution and see if CDC is a better fit for it since in theory that's what it's made for.

Can you tell me more about the motivation behind the request?

@sherifnada sherifnada added the area/connectors Connector related issues label Sep 24, 2021
@archaean
Copy link
Contributor Author

archaean commented Sep 24, 2021

@sherifnada I think this turned into more of an anti-request for a feature, mostly cataloging my reasoning as to why Airbyte would not support this, but I will expand upon my motivation.

I think the motivation for having deletes propagate to the destination system is pretty straight-forward as it is just as critical to representing the "current state" of a database as DEDUPE_HISTORY is by removing old instances of the same primary key. Further, without certain optimizations (e.g. implementing all redshift best practices which, depending on the cluster could see potentially an additional order of magnitude speed up), a FULL_REFRESH can be prohibitively expensive given internal or external time-based SLAs.

Now, with respect to CDC, that is an option in our case (Postgres), but it doesn't come without a cost (at least an additional replication slot) and additional complexity:

  • The replication slot needs to be managed effectively otherwise the WAL log could grow and nuke the space on your postgres instance.
  • Additional configuration and management to make sure that the publication matches all the tables you want to copy.
  • It won't work with views and that is currently the only work-around to deidentifying data moving through airflow - which in combination with no small t between the E and the L and sub-optimal Redshift loads it could be a non-starter for us.
  • And the other limitations already mentioned concisely here.

Also, Airbyte is a batch-only system which can't fully leverage a CDC connection the same way, say Kafka / Kafka Connect could using a Lambda or Kappa Architecture. Don't get me wrong, the simplicity that Airbyte provides is what is appealing here, but it is easy to balk at the idea of enabling a replication slot - and all its associated maintenance - "just to propagate deletes in an incremental batch process".

@sherifnada sherifnada added the wontfix This will not be worked on label Oct 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues type/enhancement New feature or request wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

2 participants