Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace global hub transport with cloudevents #310

Closed
yanmxa opened this issue Jan 30, 2023 · 10 comments
Closed

Replace global hub transport with cloudevents #310

yanmxa opened this issue Jan 30, 2023 · 10 comments

Comments

@yanmxa
Copy link
Member

yanmxa commented Jan 30, 2023

1. Kafka offset committer in Global Hub manager

image

When the global hub manager receives a message through transport, there is not only a goroutine for the consumer client but also a committer goroutine that commits the Kafka offset periodically. The consumer receives the message and forwards it to the message handler to process the message, after processing the message, the committer updates the offset in Kafka. The process of consuming messages and committing the offset is asynchronous.

But when we use cloudevents to deliver messages, although based on Kafka protocol, we cannot directly update the offset through the Kafka client after receiving the message. Instead, the cloudevents client receives the message and returns an ACK or NACK result to decide whether to update the kafka offset. The message consuming and the offset updating are synchronous in the whole process.

Now the question are:

  1. The above two ways manual commit kafka offset, so that when the consumer crashes and restarts, it can continue to consume the previously received messages. Is this manual commit offset method necessary in global hub?
  2. If we use cloudevents, what are the possible impacts of synchronously updating the Kafka offset on the global hub?
  3. Do these effects really prevent us from using cloudevents instead of the original transport?
@vMaroon vMaroon self-assigned this Jan 31, 2023
@nirrozenbaum
Copy link
Collaborator

nirrozenbaum commented Feb 1, 2023

I see @vMaroon has assigned himself. he was in charge of the initial Kafka work, so he probably has the best answers.
having said that, If I remember correctly I think we had discussions about cloud events when we started the collaboration and we agreed that it should be possible in the spec path, but will have some issues in the status path.

@yanmxa continuing the other thread we discussed about performance, this is correct here as well.
in the status path, we need to keep in mind that the scale is extremely high, 1M MCs and 100M policies status updates.
we intentionally separated the offset commit to a different goroutine, in order to achieve the best performance we can get.
if we'll have to commit offset synchronously after every bundle processing, it will slow the process rate and of course as a result may reduce the performance significantly.

As I suggested in the different thread we discussed, I suggest here as well to try running high scale simulations in order to understand the huge performance effect a change can make (it's true about any change, not this one specifically).
it's highly important in order to understand which changes are acceptable and which aren't.

here are the original performance results we were able to achieve:

image

@nirrozenbaum
Copy link
Collaborator

high scale simulations setup:

image

@vMaroon
Copy link
Member

vMaroon commented Feb 2, 2023

As @nirrozenbaum mentioned, regardless of what we say here, any substantial change to the status-path flow should be followed with a set of high scale tests, otherwise it would not be right to claim the same scalability as the system underwent iterations of improvements and compactions to achieve a very efficient and compact data flow.

  1. The first major issue in the proposition is synchronous reading from Kafka:

    • Since the conflation-logic is a consumer of the transport through go-channels, it means that synchronous reading from Kafka would propagate all the way to the conflators, unless you decouple them with a middle-layer of conflation, which is a recursive problem.
    • Regardless, at high scale, it is against best practices to go for synchronous, please keep in mind that the demand here is of 100k+ managed clusters, and the natural next step is to go for 1M+ and so on - and the current design supports more than 1M+.
  2. Cloud-events: if it is beyond structuring a message, keep the following in mind:

    1. Delta messages: while the tests showed that the system can still manage extreme scales without the delta-messaging mechanism, the bandwidth required is considerably larger. Therefore delta-messages should not be dropped.
    2. Message fragmentation: with the Kafka message size limit being 1MB, the transport layer manages fragmentation transparently, while satisfying one of the major assumptions of messages committed are certainly processed (or irrelevant) - as compactly as possible.
      • This means, on the same Kafka partition, there can be multiple open streams of fragmented messages, and their offsets are changed such that if two or more streams intersect, committing one when complete then crashing does not cause loss of fragments of the rest of the intersected streams.

The above are the tips of each point, if requested, I can dive into reasoning and more details.

@vMaroon vMaroon removed their assignment Feb 2, 2023
@yanmxa
Copy link
Member Author

yanmxa commented Feb 7, 2023

@vMaroon Very much appreciate your information!

  1. As you said, the main purpose of using delta messages here is to reduce bandwidth.
  2. The way message fragmentation is used to assemble messages and then commit message offset asynchronously is very clever. I had to think about how to deliver and consume chunked messages in cloudevents.
  3. I am doing a simple A/B test, just synchronizing managed clusters from leaf hub to hub of hubs database. cloud you help to check the following test result to see if it is reasonable?

100 MCs: 100 RHs with 1000 MCs

Scenarioes\Times 1 2 3
Kafka 4 seconds 4 seconds 4 seconds
Cloudevents 3 seconds 6 seconds 4 seconds

1M MCs: 1000 RHs with 1000 MCs

Scenarioes\Times 1 2 3
Kafka 49 seconds 47 seconds 48 seconds
Cloudevents 59 seconds 57 seconds 60 seconds

@yanmxa
Copy link
Member Author

yanmxa commented Feb 9, 2023

Transport Spec Path

image

Updates:

  1. Transport Producer: reduce methods of the producer interface
  2. Transport Consumer: only responsible for delivering messages, not handling bundle-related logic
  3. Global Hub Agent: add a dispatcher to register syncers for different bundles, and dispatch messages to the corresponding syncer according to the received message ID

Transport Status Path

image

image

Updates

  1. Transport Producer: reduce methods of the producer interface, the status producer is consistent with the interface of the spec.
  2. Transport Consumer: align with the spec consumer. It's only responsible for receiving and forwarding messages, not for processing messages
  • Bundle syncers don't need to be registered to a specific consumer. Add a TransportDispatcher so that the consumer only forwards the message to the dispatcher, which invokes the registered syncer for processing the message based on the MsgID.
  • Remove the asynchoronous offset committer. The cloudevents client receives the message and returns an ACK or NACK result to decide whether to update the kafka offset. The message offset updating are synchronous in the process.

A/B Testing

Synchronize managed clusters from Regional Hubs to Global Hub database.

100 K MCs: 100 RHs with 1000 MCs

Scenarioes\Times 1 2 3
Kafka 4 seconds 4 seconds 4 seconds
Cloudevents(kafka) 3 seconds 6 seconds 4 seconds

1 M MCs: 1000 RHs with 1000 MCs

Scenarioes\Times 1 2 3
Kafka 49 seconds 47 seconds 48 seconds
Cloudevents(kafka) 59 seconds 57 seconds 60 seconds

Improvements

  1. Increase the size of messages that can be sent at a time.
  2. Since the message is sent in chunks, the manager consumer locks the receiver when assembling chunks after receiving the event. Reducing the use of locks should reduce the delivery time.
  3. Considering compressed data may reduce the transfer time.

@yanmxa
Copy link
Member Author

yanmxa commented Feb 10, 2023

Test Results after Increasing Transport Message Limit Size to 940 KB

1 M MCs: 1000 RHs with 1000 MCs

Scenarioes\Times 1 2 3
Kafka 54 seconds 55 seconds 53 seconds
Cloudevents(kafka) 51 seconds 50 seconds 52 seconds

Conclusion: From the test results so far, using cloudevents to replace the original transport does not cause significant performance degradation

@vMaroon
Copy link
Member

vMaroon commented Feb 10, 2023

@yanmxa these results seem fine on the first look. Did you try testing the load / rotation of 100 policies with the setup above? It's highly suggested to do so.

@yanmxa
Copy link
Member Author

yanmxa commented Feb 16, 2023

@vMaroon Since we focus on the performance change of transport, I only compare and test the cases of 1 M Polices and 1 M managed cluster in HoH initialization.

  • 10 Policies
  • 100 RHs with 1000 MCs
  • Total: 100 K MCs and 1 M Policies
Scenarioes\Times 1 2 3
Kafka 12 seconds 15 seconds 13 seconds
Cloudevents(kafka) 18 seconds 16 seconds 12 seconds

@yanmxa
Copy link
Member Author

yanmxa commented Feb 17, 2023

ref: cloudevents/sdk-go#846

@yanmxa
Copy link
Member Author

yanmxa commented Feb 24, 2023

For the transport status path, we added a transportFormt in the API to support both kafka directly and cloudevents.
ref: #319

@yanmxa yanmxa closed this as completed Feb 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants