-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: log based replication #249
Conversation
_sdc_deleted_at fixes deletion (flushing) of old logs
tap_postgres/client.py
Outdated
# TODO: escape special characters | ||
return f"{schema_name}.{table_name}" | ||
|
||
def standardize_lsn(self, lsn_string: str | None) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point to the docs for what this is converting https://www.postgresql.org/docs/current/datatype-pg-lsn.html#:~:text=This%20type%20is%20a%20representation,for%20example%2C%2016%2FB374D848%20.
I'm surprised the wal2json
plugin doesn't do this for you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there's a way around it. I refactored and removed the standardize_lsn()
function entirely.
tap_postgres/client.py
Outdated
) | ||
row.update({"_sdc_lsn": message.data_start}) | ||
elif message_payload["action"] in truncate_actions: | ||
self.logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to flood our logs, and it's really not a warning as there's nothing to warn about, maybe a debug?
tap_postgres/client.py
Outdated
message.payload, | ||
) | ||
elif message_payload["action"] in transaction_actions: | ||
self.logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to flood our logs, and it's really not a warning as there's nothing to warn about, maybe a debug?
tap_postgres/client.py
Outdated
) | ||
return psycopg2.connect( | ||
connection_string, | ||
application_name="tappostgres", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
application_name="tappostgres", | |
application_name="tap_postgres", |
tap_postgres/tap.py
Outdated
@@ -282,6 +291,16 @@ def __init__( | |||
+ " configuration option determines where that file is created." | |||
), | |||
), | |||
th.Property( | |||
"replication_method", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"replication_method", | |
"default_replication_method", |
tap_postgres/client.py
Outdated
schema = table_schema.to_dict() | ||
|
||
replication_key = None | ||
if replication_method == "LOG_BASED": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't happen at discovery, this should happen after discovery in the LogBased Stream Class
tap_postgres/client.py
Outdated
jsonschema_type: dict = self.to_jsonschema_type( | ||
typing.cast(sqlalchemy.types.TypeEngine, column_def["type"]), | ||
) | ||
table_schema.append( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should happen in the Log Based Stream class, not here.
No description provided.