-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(airbyte-cdk): Add limitation for number of partitions to PerPartitionCursor #42406
feat(airbyte-cdk): Add limitation for number of partitions to PerPartitionCursor #42406
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
55f2cb7
to
d4a303b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tolik0 shouldn't we only use the limit on the number of partitions with the fallback to global state to avoid re-processing partitions as full refresh or is the rationale that re-processing is better than blowing up the state size?
@@ -41,6 +43,7 @@ class PerPartitionCursor(DeclarativeCursor): | |||
Therefore, we need to manage state per partition. | |||
""" | |||
|
|||
DEFAULT_MAX_PARTITIONS_NUMBER = 10000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can this be private?
_DEFAULT_MAX_PARTITIONS_NUMBER = 10_000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
second question: is it safe to increase this number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the same value as in file-based; the user can have multiple streams with such states, so it is not safe to use a bigger value.
@@ -49,12 +52,15 @@ class PerPartitionCursor(DeclarativeCursor): | |||
def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter): | |||
self._cursor_factory = cursor_factory | |||
self._partition_router = partition_router | |||
self._cursor_per_partition: MutableMapping[str, DeclarativeCursor] = {} | |||
self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment in the code explaining why the cursor_per_partition needs to be ordered?
@girarda The |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎸
/approve-regression-tests
|
What
Add limitation for the number of partitions in PerPartitionCursor
How
Reused the approach from file-based CDK. Old partitions are deleted when the state overflows.
User Impact
This update should help to avoid OOM errors.
Can this PR be safely reverted and rolled back?