-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Resolve tap-dbt incremental replication #392
fix: Resolve tap-dbt incremental replication #392
Conversation
Caution requires further testing against GitLab Cloud. |
@s7clarke10 let me know how things look on that front when you've done more testing. |
Hi @edgarrmondragon , okay had the right idea but the wrong implementation. I was chat with @mjsqu and he had pointed me in the right direction for the fix. I have been able to take the failed state and successfully test a run to completion without the error with the fix in place for an incremental run. I believe we are now ready to go for this fix on tap-dbt. Thanks 😊 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved - thank you!
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @s7clarke10 and @mjsqu, much appreciated!
Hi @edgarrmondragon , thanks for your review and approval. Here is some additional logging of variable just before the line that was changed. I am working on the assumption that ingestion is somehow extracting data from the point where the data was last ingested - i.e. the SDK logic is querying from the point in the timestamp. Actual Value from the State File for last-timestamp
Confirming the starting timestamp print(f'XXXXXXXX Start from context = {self.get_starting_timestamp(context)}') XXXXXXXX Start from context = 2024-09-09 11:01:05.436229+00:00 Verifying the starting timestamp via another method print(f'XXXXXXXX Start from context = {self.get_starting_replication_key_value(context)}') result XXXXXXXX Start from context = 2024-09-09 11:01:05.436229+00:00 Test Scenario 1 - Verifying the proposed fix print(f'XXXXXXXX Record Finished At = {record['finished_at']}') result XXXXXXXX Record Finished At = 2024-09-26 22:03:53.116865+00:00 |
Incremental replication from tap-dbt is failing with the following error on line 120 in the streams.py
record_last_received_datetime = datetime.datetime.fromisoformat( self.replication_key, ) ValueError: Invalid isoformat string: 'finished_at'
I believe when the re-run occurs, it is reading the current state and obtaining the
record_last_received_datetime
from the state dictionary. I believe the code is currently point to the theself.replication_key
rather than theself.replication_key_value
which contains the timestamp.To prove the scenario I wrote a simple python program to parse the JSON from the current state which was saved from the initial ingestion.
The result emulate the issue.