Skip to content

fix(ingest/datahub): Create Structured property templates in advance and batch processing #13355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

treff7es
Copy link
Contributor

@treff7es treff7es commented Apr 29, 2025

  • Structured properties ingested first to make sure template created in advance.
  • Add process database data in batches to make sure it won't run out of memory on db side

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Apr 29, 2025
Copy link

codecov bot commented Apr 29, 2025

Codecov Report

Attention: Patch coverage is 57.97101% with 29 lines in your changes missing coverage. Please review.

✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
...ngestion/source/datahub/datahub_database_reader.py 57.57% 28 Missing ⚠️
...datahub/ingestion/source/datahub/datahub_source.py 0.00% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

)

time.sleep(
self.config.structured_properties_template_cache_invalidation_interval
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This time should be based on on how many structured properties exist.

Copy link
Collaborator

@pedro93 pedro93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, assuming green CI.

Ideally we could use something like: https://docs.datahub.com/docs/advanced/api-tracing to know when structured properties have been processed.

@datahub-cyborg datahub-cyborg bot added pending-submitter-merge and removed needs-review Label for PRs that need review from a maintainer. labels Apr 29, 2025
@treff7es treff7es requested a review from pedro93 April 29, 2025 16:19
@treff7es treff7es changed the title fix(ingest/datahub): Create Structured property templates in advance fix(ingest/datahub): Create Structured property templates in advance and batch processing Apr 29, 2025
)

query_timeout: Optional[int] = Field(
hidden_from_docs=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be hidden from docs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

logger.debug("Fetching soft-deleted URNs")

# Use server-side cursor implementation
rows = self.execute_server_cursor(self.soft_deleted_urns_query, params)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@treff7es this query should also take into account the execution dates + limits & offsets. We can have ingestion sources emitting a lot of soft deletes so the disk issue on the server is also applicable here.

Copy link
Collaborator

@asikowitz asikowitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add an integration test for this now that the logic is getting more complex.

A couple comments but think this looks good once they're addressed

Comment on lines 288 to 300
def _get_rows(
self, from_createdon: datetime, stop_time: datetime
self,
from_createdon: datetime,
stop_time: datetime,
set_structured_properties_filter: bool = False,
) -> Iterable[Dict[str, Any]]:
params = {
"exclude_aspects": list(self.config.exclude_aspects),
"since_createdon": from_createdon.strftime(DATETIME_FORMAT),
}
yield from self.execute_server_cursor(self.query, params)
# Query and yield rows for this date batch
yield from self._get_rows_for_date_range(
from_createdon,
stop_time,
set_structured_properties_filter,
limit=self.config.database_query_batch_size,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem like we need both these functions anymore

)

query_timeout: Optional[int] = Field(
hidden_from_docs=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

Comment on lines +145 to +146
{"" if self.config.include_soft_deleted_entities else " AND (removed = false or removed is NULL)"}
{structured_prop_filter}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: feels like either both expressions should be factored into a variable or neither

Comment on lines +273 to +278
if start_date != last_createdon:
start_date = last_createdon
offset = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If start_date == last_createdon then we need to update offset, for sure, no?

Copy link

alwaysmeticulous bot commented May 6, 2025

🔴 Meticulous spotted visual differences in 36 of 612 screens tested: view and approve differences detected.

Meticulous evaluated ~10 hours of user flows against your PR.

Last updated for commit 8afa715. This comment will update as new commits are pushed.

Copy link
Collaborator

@asikowitz asikowitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely done!

duplicate_found = True
break
if first_iteration:
start_date = row.get("createdon", start_date)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata pending-submitter-merge publish-docker
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants