-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[Feature] Add batch event consumption #13261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Add batch event consumption #13261
Conversation
|
Mooved here @Megafredo |
|
Hello @Renizmy, thank you for the switch! |
|
Fixed, sorry |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #13261 +/- ##
==========================================
+ Coverage 31.11% 31.28% +0.16%
==========================================
Files 2922 2929 +7
Lines 193795 194733 +938
Branches 39561 39718 +157
==========================================
+ Hits 60294 60916 +622
- Misses 133501 133817 +316
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Megafredo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @Renizmy, thanks for your work!
This new method for streams that allows batch processing will make a lot of people happy!
|
@Renizmy FYI, we'd like to improve a bit and refactor the code before merging ! :) |
|
Hi @Renizmy, Thank you for your contribution. As @helene-nguyen mentioned, we'd like the code to be refactored before merging. The main concern is that the new class ( Instead of creating a new class and method, we suggest implementing a Then each batch-capable connector (in regards of the targeted API) could be able to use this adapter to receive batch of message instead individual message. Usage (assuming wrapper is named self.helper.listen_stream(message_callback=self.process_message)
--->
batch_callback = self.helper.create_batch_callback(self.process_message_batch, self.batch_size, self.batch_timeout, self.max_batches_per_minute)
self.helper.listen_stream(message_callback=batch_callback)Would you be open to making this change? |
xfournet
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update! I made some comments, I will resume the PR after theses first feebacks have been processed.
|
Hi @xfournet , Thanks for the review! All points addressed: Changes to the rate limiter have led to simplifications. I haven't implemented any code related to RL for basic stream consumption (out of scope?). |
1c222ef to
2cb4539
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds batch event consumption capabilities to the OpenCTI Python client, enabling connectors to accumulate and process multiple events together. The feature includes rate limiting support using a sliding window algorithm and proper cleanup mechanisms for thread-safe operations.
Changes:
- Adds
RateLimiterandRateLimitedCallbackclasses for rate-limited event processing with sliding window algorithm - Adds
BatchCallbackWrapperclass for accumulating events into batches based on size/timeout triggers - Modifies
ListenStreamto support callbacks with custom state management (bypassing automatic state updates) - Adds factory methods
create_batch_callback()andcreate_rate_limiter()to OpenCTIConnectorHelper - Refactors
listen_stream()to extract parameter resolution into_resolve_stream_parameters()
Comments suppressed due to low confidence (1)
client-python/pycti/connector/opencti_connector_helper.py:1
- The docstring contains malformed content. Lines 2822-2832 appear to be duplicate or misplaced parameter declarations that should not be inside the docstring. These lines should be removed as they are not valid docstring formatting and create confusion. The docstring should end at line 2821 after the :rtype: dict line.
"""OpenCTI Connector Helper module.
476538d to
b08096d
Compare
0d81560 to
e499ed4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 21 comments.
client-python/tests/01-unit/connector_helper/test_batch_callback_wrapper.py
Show resolved
Hide resolved
16b399c to
2b2b7b5
Compare
c949aa6 to
070eaa7
Compare
Related to: #13372