-
Notifications
You must be signed in to change notification settings - Fork 3.1k
fix(ingest/datahub): Create Structured property templates in advance and batch processing #13355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
treff7es
commented
Apr 29, 2025
•
edited
Loading
edited
- Structured properties ingested first to make sure template created in advance.
- Add process database data in batches to make sure it won't run out of memory on db side
Codecov ReportAttention: Patch coverage is ✅ All tests successful. No failed tests found.
📢 Thoughts on this report? Let us know! |
) | ||
|
||
time.sleep( | ||
self.config.structured_properties_template_cache_invalidation_interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This time should be based on on how many structured properties exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, assuming green CI.
Ideally we could use something like: https://docs.datahub.com/docs/advanced/api-tracing to know when structured properties have been processed.
) | ||
|
||
query_timeout: Optional[int] = Field( | ||
hidden_from_docs=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be hidden from docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
logger.debug("Fetching soft-deleted URNs") | ||
|
||
# Use server-side cursor implementation | ||
rows = self.execute_server_cursor(self.soft_deleted_urns_query, params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@treff7es this query should also take into account the execution dates + limits & offsets. We can have ingestion sources emitting a lot of soft deletes so the disk issue on the server is also applicable here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add an integration test for this now that the logic is getting more complex.
A couple comments but think this looks good once they're addressed
def _get_rows( | ||
self, from_createdon: datetime, stop_time: datetime | ||
self, | ||
from_createdon: datetime, | ||
stop_time: datetime, | ||
set_structured_properties_filter: bool = False, | ||
) -> Iterable[Dict[str, Any]]: | ||
params = { | ||
"exclude_aspects": list(self.config.exclude_aspects), | ||
"since_createdon": from_createdon.strftime(DATETIME_FORMAT), | ||
} | ||
yield from self.execute_server_cursor(self.query, params) | ||
# Query and yield rows for this date batch | ||
yield from self._get_rows_for_date_range( | ||
from_createdon, | ||
stop_time, | ||
set_structured_properties_filter, | ||
limit=self.config.database_query_batch_size, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem like we need both these functions anymore
) | ||
|
||
query_timeout: Optional[int] = Field( | ||
hidden_from_docs=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
{"" if self.config.include_soft_deleted_entities else " AND (removed = false or removed is NULL)"} | ||
{structured_prop_filter} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: feels like either both expressions should be factored into a variable or neither
if start_date != last_createdon: | ||
start_date = last_createdon | ||
offset = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If start_date == last_createdon
then we need to update offset, for sure, no?
🔴 Meticulous spotted visual differences in 36 of 612 screens tested: view and approve differences detected. Meticulous evaluated ~10 hours of user flows against your PR. Last updated for commit 8afa715. This comment will update as new commits are pushed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nicely done!
duplicate_found = True | ||
break | ||
if first_iteration: | ||
start_date = row.get("createdon", start_date) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
8afa715
to
3f01d68
Compare