Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Resolve tap-dbt incremental replication #392

Conversation

s7clarke10
Copy link
Contributor

@s7clarke10 s7clarke10 commented Sep 23, 2024

Incremental replication from tap-dbt is failing with the following error on line 120 in the streams.py

   record_last_received_datetime = datetime.datetime.fromisoformat(
                    self.replication_key,
                )
   ValueError: Invalid isoformat string: 'finished_at'

I believe when the re-run occurs, it is reading the current state and obtaining the record_last_received_datetime from the state dictionary. I believe the code is currently point to the the self.replication_key rather than the self.replication_key_value which contains the timestamp.

To prove the scenario I wrote a simple python program to parse the JSON from the current state which was saved from the initial ingestion.

import json
import pendulum
from typing import cast
import datetime
my_state = '{"completed": {"singer_state": {"bookmarks": {"connections": {"partitions": [{"context": {"account_id": "5"}}]}, "environments": {"partitions": [{"context": {"account_id": "5"}}]}, "jobs": {"partitions": [{"context": {"account_id": "5"}}]}, "projects": {"partitions": [{"context": {"account_id": "5"}}]}, "repositories": {"partitions": [{"context": {"account_id": "5"}}]}, "runs": {"partitions": [{"context": {"account_id": "5"}, "replication_key": "finished_at", "replication_key_value": "2024-09-09 11:01:05.436229+00:00"}]}, "users": {"partitions": [{"context": {"account_id": "5"}}]}, "accounts": {}}}}, "partial": {}}'

my_state_dict = json.loads(my_state)

replication_key_value = my_state_dict['completed']['singer_state']['bookmarks']['runs']['partitions'][0]['replication_key_value']
replication_key       = my_state_dict['completed']['singer_state']['bookmarks']['runs']['partitions'][0]['replication_key']
print(f'Last replication_key_value as a string = {replication_key_value}')
print(f'Last replication_key       as a string = {replication_key}')

# Use pendulum for replication_key_value timestamp conversion
pendulum_last_received_datetime: pendulum.DateTime = cast(pendulum.DateTime, pendulum.parse(replication_key_value))

print(f'Pendulum = {pendulum_last_received_datetime}')

# Use datetime for replication_key_value timestamp conversion
new_last_received_datetime = datetime.datetime.fromisoformat(replication_key_value)

print(f'Datetime = {new_last_received_datetime}')

# Use Monkey Patch backport for replication_key_value timestamp conversion
if 1==1:
    from backports.datetime_fromisoformat import MonkeyPatch

    MonkeyPatch.patch_fromisoformat()
    
monkeypatch_last_received_datetime = datetime.datetime.fromisoformat(replication_key_value)

print(f'Monkey Patch Datetime = {monkeypatch_last_received_datetime}')

# Use replication_key for timestamp conversion - this should break
incorrect_datetime_key = datetime.datetime.fromisoformat(replication_key)

The result emulate the issue.

(venv)  test_tap_gitlab]$ python test_gitlab_replication.py 
Last replication_key_value as a string = 2024-09-09 11:01:05.436229+00:00
Last replication_key       as a string = finished_at
Pendulum = 2024-09-09 11:01:05.436229+00:00
Datetime = 2024-09-09 11:01:05.436229+00:00
Monkey Patch Datetime = 2024-09-09 11:01:05.436229+00:00
Traceback (most recent call last):
  File "/home/me/test_tap_gitlab/test_gitlab_replication.py", line 35, in <module>
    incorrect_datetime_key = datetime.datetime.fromisoformat(replication_key)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Invalid isoformat string: 'finished_at'

@s7clarke10 s7clarke10 changed the title Resolve tap-dbt incremental replication fix: Resolve tap-dbt incremental replication Sep 23, 2024
@s7clarke10 s7clarke10 changed the title fix: Resolve tap-dbt incremental replication Draft: Resolve tap-dbt incremental replication Sep 23, 2024
@s7clarke10
Copy link
Contributor Author

Caution requires further testing against GitLab Cloud.

@edgarrmondragon
Copy link
Member

edgarrmondragon commented Sep 23, 2024

Caution requires further testing against GitLab Cloud.

@s7clarke10 let me know how things look on that front when you've done more testing.

@s7clarke10 s7clarke10 changed the title Draft: Resolve tap-dbt incremental replication fix: Resolve tap-dbt incremental replication Sep 24, 2024
@s7clarke10
Copy link
Contributor Author

Hi @edgarrmondragon , okay had the right idea but the wrong implementation. I was chat with @mjsqu and he had pointed me in the right direction for the fix. I have been able to take the failed state and successfully test a run to completion without the error with the fix in place for an incremental run.

I believe we are now ready to go for this fix on tap-dbt. Thanks 😊

Copy link
Contributor

@mjsqu mjsqu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved - thank you!

Copy link

Copy link
Member

@edgarrmondragon edgarrmondragon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @s7clarke10 and @mjsqu, much appreciated!

@edgarrmondragon edgarrmondragon merged commit d7bce20 into MeltanoLabs:main Sep 26, 2024
9 checks passed
@s7clarke10
Copy link
Contributor Author

Hi @edgarrmondragon , thanks for your review and approval.

Here is some additional logging of variable just before the line that was changed. I am working on the assumption that ingestion is somehow extracting data from the point where the data was last ingested - i.e. the SDK logic is querying from the point in the timestamp.

Actual Value from the State File for last-timestamp

  • 2024-09-09 11:01:05.436229+00:00

Confirming the starting timestamp

print(f'XXXXXXXX Start from context    = {self.get_starting_timestamp(context)}')
XXXXXXXX Start from context    = 2024-09-09 11:01:05.436229+00:00

Verifying the starting timestamp via another method

print(f'XXXXXXXX Start from context    = {self.get_starting_replication_key_value(context)}')

result

XXXXXXXX Start from context    = 2024-09-09 11:01:05.436229+00:00

Test Scenario 1 - Verifying the proposed fix

print(f'XXXXXXXX Record Finished At    = {record['finished_at']}')

result

XXXXXXXX Record Finished At    = 2024-09-26 22:03:53.116865+00:00

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants