Description
Proposal: New Pulsar Broker Load Balancer
Motivation
As previously shared with the community, we observed many improvement areas around the Pulsar load balancer[1]. Since the improvement requires significant changes, first, we would like to share the overall goals for this project and the high-level components to design. This doc will highlight the architecture of the new broker load balancer.
Goals
We set up the project goals in the following areas.
User-facing goals
Logic
- Balance cluster utilization as uniform as possible with minimal delays
Logs / Metrics
- Show transparent load balance decisions with logs and metrics.
Admin API / Configurations
- Provide ways to override system decisions.
- Reduce the number of configurations to tune.
- Provide the better configuration default values with explanations.
- (second-phase) Provide ways to set custom load balance strategy.
Internal Implementation goals
Logic
- Keep the three major load balance logics but make them more efficient and faster. We will discuss detailed algorithm improvements separately.
- Topic(Bundle)-broker-assignment: improve the randomization and assignment distribution
- Bundle-split: revisit the current threshold-based strategy
- Bundle-unload: revisit the frequency of unloading
Implementation
- Distribute bundle-broker assignment and bundle-split decisions to local brokers.
- Synchronize bundle unloading decisions by the leader broker.
- Reduce load data replication among brokers.
- Replace load data’s metadata stores with topic table-views.
- Remove the client dependency in the load balance logic.
- Remove the client redirection in the assignment logic.
- Add the bundle transfer option in the unload logic instead of relying on clients’ broker discovery calls
- Minimize the topic unavailability from bundle unloading with the bundle transfer option
- Introduce the bundle state channel(table-view) to make bundle load balance operations consistent and fault-tolerant among brokers.
- Isolate the new load balancer code in new classes.
- Replace the bundle ownership metadata store(ZK znodes) with the bundle state channel.
Logs / Metrics
- Add meaningful logs and metrics for all major load balance events.
- Add documentation about how to read load balancer metrics and logs.
Admin API / Configurations
- Add an admin CLI to transfer a bundle to a specific broker.
- Add necessary configurations to override the load balance decisions.
- Dynamically adjust internal configuration thresholds based on the load data.
- Make the admin APIs fault-tolerant and easy to monitor.
Testing
- Document the testing plan and coverage status.
- Add unit tests for happy and unhappy cases.
- Add global load balance logic tests, and compare the current load manager with the new load manager.
API Changes
We will add the transfer unload option --dest
to specifically unload the topic(bundle) to the destination broker.
pulsar-admin topics unload persistent://tenant/namespace/topic --dest ${destination_broker}
Implementation (High-Level Components)
New Load Manager
- It refactors the existing load balance logic with better modularity.
- It isolates the new code in the new classes without breaking the existing logic.
- This new load manager will be disabled in the first releases until proven stable.
Load Data Models
LocalBrokerData: broker’s factual data
- e.g.) {webServiceUrl, pulsarServiceUrl, …}
- Persisted in MetadataStore(ZK)
BrokerLoadData: broker’s load data
- e.g.) {cpu, memory, io, msgIn/Out, ...}
- Published in BrokerLoadDataStore(TableView)
BundlesLoadData: bundle’s load data
- e.g.) { bundleName, msgIn/Out, ...}
- Cached in the local broker only
TopBundlesLoadData: top-n high-loaded bundle load data from the broker
- e.g.) {brokerUrl, high_load_bundles :[{bundleName, …}], …}
- Published in TopBundlesLoadDataStore(TableView)
Load Data Write-Read Flow
LocalBrokerData
Write:
- Upon init, each broker stores LocalBrokerData in its ephemeral znode in MetaDataStore(ZK) to monitor live brokers(same as now)
- The broker-level load data moved to BrokerLoadData
Read:
- All brokers check LocalBrokerData to confirm the list of live brokers.
BrokerLoadData
Write:
- Each broker periodically computes local load(BrokerLoadData) and publishes it to BrokerLoadDataStore(TableView)(non-persistent)
- Because non-persistent TableView can often lose data, we will add a TTL policy to tombstone old KVs in BrokerLoadDataStore.
Read:
- All brokers consume BrokerLoadDataStore
- With aggregated BrokerLoadData, all brokers perform bundle assignments without going through the leader.
BundlesLoadData
Write:
- Each broker monitors the allocated bundles' load and stores them in the local cache BundlesLoadData(In-memory-HashMap).
- BundlesLoadData will not be replicated to other brokers’ caches.
Read:
- Each broker locally reads BundlesLoadData and computes top n high load bundles, TopBundlesLoadData.
- With the local BundlesLoadData, all brokers perform bundle splits without going through the leader.
TopBundlesLoadData
Write:
- Each broker periodically compute TopBundlesLoadData and publishes it to TopBundlesLoadDataStore(TableView)(non-persistent)
- We will add a TTL policy to tombstone old KVs in TopBundlesLoadDataStore.
Read:
- Only the leader broker consumes TopBundlesLoadDataStore
- With the aggregated TopBundlesLoadData and BrokerLoadData, the leader initiates bundle unload(transfer) operations.
Load Data Flow
Major Modifications on Bundle Split, Unload, and Assignment Flow
- With the local BundlesLoadData, all brokers perform bundle splits without going through the leader. By default, newly split bundles will be the target to unload(transfer).
- With aggregated BrokerLoadData, all brokers perform bundle assignments without going through the leader.
- With aggregated TopBundlesLoadData and BrokerLoadData, the leader makes decisions to unload(transfer) bundles.
- We will add a new bundle unload option, transfer, which transfers bundles from one broker to another.
- We will introduce a global channel(Bundle State Channel) to share consistent/linearized bundle state changes with brokers.
Bundle State Channel
This bundle state channel is a persistent topic table-view used as a WAL to broadcast the total order of all bundle state changes in the cluster. All brokers will asynchronously consume messages in this channel in the same order and react to bundle state changes(sequential consistency). With the table-view compaction, the bundle state channel will eventually materialize the current bundle-broker ownership. Read operations on this channel can be deferred(e.g., clients’ topic lookup requests) in a few seconds, depending on the current state of the bundle.
Bundle State Lifecycles
We define the following states and actions and linearize the bundle state changes.
(This is a high-level design to explain the concept here. The final version may differ.)
Bundle Actions
- Own: Own the bundle ownership
- The owner broker is selected by the local load manager.
- Transfer: Transfer the bundle ownership to the destination broker.
- The source broker internally disables the bundle ownership.
- The destination broker owns the bundle.
- Return: Return deferred client connections with the destination broker URL
- Close the connections if already being served
- Split: Split the target(parent) bundle into child bundles.
- Create: Create the child bundle entries in the channel, initially assigned to the local broker.
- Discard: Discard the bundle entry in the channel(tombstone operation)
- Unload: Unload the bundle ownership from the owner broker
- Disable the bundle ownership
- Close client connections under the bundle
- Run the Discard action
Bundle States
- Assigned: assigned to a broker
- Assigning: in the process of assigning the ownership
- Splitting: in the process of splitting a bundle range.
- Unassigned: unassigned to any broker (removed from the channel)
*New client connections to the bundle are deferred(with timeouts) in the Assigning state.
Bundle State Change Examples
The bundle state channel can be used like the followings.
Bundle Transfer Example
(State, Action) Sequence:
(Assigned, Transfer) => (Assigning, Return) => (Assigned,)
- The leader finds target bundles from TopBundlesLoadData and initiates a bundle unload(a transfer) by broadcasting the unload state change to the bundle state channel, keyed by the bundleName.
e.g. {key:bundleName, value:{flow:transfer, action:transfer, state:assigning, from:A, to:B}}} - All brokers will consume the state change message in the channel.
- Upon consuming the message from the channel, if any state change involves the local broker, the broker performs its role and updates the state back in the channel to continue the state change. If there are conflicting state changes with the ongoing one, ignore them.
- Meanwhile, if other brokers(broker C) receive lookup requests for the bundle, the client's connections will be deferred(with timeouts) until they receive the “Return” action. When the “Return” action is broadcasted, all brokers will return the pending connections with the owner broker’s URL. Also, the existing connections from the source broker will be closed.
Bundle Split Example
(State, Action) Sequence:
(Assigned, Split) => (Splitting, Unload | Create) => {(Unassigned, ) | (Assigned, ), (Assigned, )}
- Each owner broker monitors local BundlesLoadData and initiates a bundle split by broadcasting the transfer state change to the bundle state channel, keyed by the bundleName.
e.g. {key:bundleName, value:{flow: split, action:split, state: splitting, from: A, to: B, transfer: true}}} - Same as Bundle Transfer Example step 2.
- Same as Bundle Transfer Example step 3.
a. After the “Split,” the owner broadcasts the children bundles’ ownership creation(state=assigned) and the parent bundle’s ownership unload(empty message).
b. By default, the owner publishes a message to the TopBundlesLoadData store asking the leader to unload(or transfer) the children bundles.
Bundle Assignment Example
(State, Action) Sequence:
(Unassigned, Own) => (Assigning, Return) => (Assigned,)
- When requested by clients, the first connected brokers check if any broker in the state channel owns the bundle. Return the owner broker URL if found. Else, initiate a bundle assignment by broadcasting the assignment state change.
e.g. {key:bundleName, value:{flow: assignment, action:own, state:assigning, to: B}}} - Same as Bundle Transfer Example step 2.
- Same as Bundle Transfer Example step 3.
- Same as Bundle Transfer Example step 4.
Bundle-Broker Ownership State
Because the bundle state channel shows the current bundle-broker ownership, we can remove the redundant bundle ownership store(ZK znodes). Each broker will look up the bundle ownership channel to check which broker currently owns the requested bundles or is in the ownership assignment/unload(transfer) process. Besides, before return, the broker availability metadata store(LocalBrokerData znode existence) could be checked to confirm the owner brokers' availability further.
Bundle State Channel Owner Selection and Discovery
Bundle State Channel(BSC) is another topic, and because of its circular dependency, we can't use the BundleStateChannel to find the owner broker of the BSC topic. For example, when a cluster starts, each broker needs to initiate BSC TopicLookUp(to find the owner broker) in order to consume the messages in BSC. However, initially, each broker does not know which broker owns the BSC.
The ZK leader election can be a good option to break this circular dependency, like the followings.
Channel Owner Selection
The cluster can use the ZK leader election to select the owner broker. If the owner becomes unavailable, one of the followers will become the new owner. We can elect the owner for each bundle state partition.
Channel Owner Discovery
Then, in brokers’ TopicLookUp logic, we will add a special case to return the current leader(the elected BSC owner) for the BSC topics.
Conflict State Resolution(Race Conditions)
Without distributed locks, we can resolve conflicting state changes by a conflict state resolution algorithm in an optimistic and eventual manner. Brokers can take the first valid state change in the linearized view as the winner state and ignore the later ones.
One caveat is that because the current table-view compaction takes only the last ones as the result values, we need to introduce an internal compaction algo for this channel to follow the conflict resolution algorithm(the first valid state change as the result value).
Bundle State Conflict Resolution Algorithm Example
For each bundle:
// A[i] is a linearized bundle state change action at i, and
// S is the current bundle state after A[i-1],
// where the sequence number i monotonically increases.
for each A[i] and S:
// no arrows in the state diagram
If A[i] is invalid from S:
Reject A[i]
Else: Accept A[i]
For instance, let’s say for bundle x, there are two conflicting assignments initiated. The linearized state change messages will be like the following.
(own, to:B), (own, to:A)
By the conflict resolution algorithm, the second state change (own, to:A) will be ignored by all brokers(and by the compaction algorithm). Eventually, the “return” message will be broadcasted by declaring that the owner is “B.”
(own, to:B), (own, to:A), (return, to:B)
Let’s take another example. Let’s say bundle x is already assigned to broker B, but another broker initiates the “own” action(before consuming the “return” action). This last “own” state change will be ignored since this action “own” is invalid from the previous state “assigned.” (in the above state diagram, there is no “own” action arrow from the “assigned” state.)
(own, to:B), (return, to:B), (own, to:A)
Failure Recovery
When a broker is down
When state change participants(brokers) are suddenly unavailable, the state change could become an orphan, as the participants do not play the role. For these orphan state changes, the leader broker will run orphan state clean-up logic. For instance, the leader can add the bundle state clean-up logic in the broker unavailability notification handler(znode watcher) in order to clean the pending bundle state changes and ownerships from unavailable brokers. Also, to make the clean-up logic further fault-tolerant, the leader broker will run the clean-up function when it initializes. Additionally, we could make the leader periodically call the clean-up in a separate monitor thread(we shouldn’t redundantly call this cleanup too often).
When the entire ZK is down and comes back
Every broker will be notified when its ZK session undergoes the connection issue. Then, the brokers will be in the "safe" mode, serving the existing topics as-is, but not allowing the ZK-related operations. The leader won't run the bundle cleanup, transfer, nor unload logic in this case when it knows ZK is down.
When ZK comes back, each broker will know ZK sessions are re-established. They will wait 2-3 mins for all brokers to complete the ZK hand-shaking. Then, they will recover the bundle state table-view and return to the normal mode.
Bundle State and Load Data TableView Scalability
Expected read/write traffic:
Write: there will be relatively fewer messages from the write path with occasional spikes
Read: the fan-out broadcast could cause bottlenecks when the cluster is enormous.
This bundle state channel is relatively lightweight from the producers because bundle state change is relatively less frequent. Still, message dispatch to consumers could be heavier if the cluster is very large. The same issue can happen to other table-views(BrokerLoadDataStorage) introduced in this proposal. We could consider the following methods to scale the table views’ produce/consume rates in a large cluster.
Split Broker Cluster to multiple clusters
Simply, one can split a massive broker cluster into multiple clusters with different endpoints. The bookkeeper and configuration layer can be shared among the broker clusters.
Partitioned Table-View (short-term)
One can make the table views based on partitioned topics. Then, we can distribute message load to multiple partition owner brokers.
Sharding (long-term)
As the conventional scalability method, one could shard the cluster to multiple groups of brokers. Then, we can create a separate channel for each shard of brokers. This means we need an additional discovery layer to map topics to broker shards(also need to align with Namespace Isolation Policies)
We need to mention that this metadata sync scalability issue is not new in Pulsar, as the current Pulsar uses n-replication. For instance, all brokers' and all bundles' load metadata are replicated to all brokers via ZK watchers. Currently, distributed ZK servers send znode watch notifications to its clients(brokers). In this proposal, multiple table-view owner brokers(with partitioned table-views) can dispatch metadata change messages to the participants(brokers).
We think this metadata sync scalability is relatively low-priority, as only a few customers run Pulsar clusters on such a large scale. We could ask the customers first to split the cluster into multiple clusters and then enable partitioned table views. It is not practical for a single cluster to have thousands of brokers. However, we still want to ensure this design is seamlessly extensible, as a two-way-door decision.
Reject Alternatives
- why we can not enhance current load balancer
As the PIP changes almost every place (data models, event handlers, cache/storage, logs/metrics), creating a new load balancer and isolating the new code is safer and cleaner. Then, customers could safely enable/disable the new load balancer
by a configuration before deprecating the old one.
It gives the flexibility to start fresh without the existing baggage of choices and try a significantly different approach. The current ModularLoadManagerImpl will not go away. Once the new load manager will be ready and considered stable enough, there might be a new discussion on whether to change the default implementation. Even then, users will still be able to opt for the old load manager.
Modification Summary
The followings exclude logic and algorithm modifications as this pip does not focus on the logic and algorithm improvement.
Goals | Before | After |
---|---|---|
Make load balance operations fault-tolerant and consistent among brokers | The leader broker sends load balance commands to the owner brokers via RPC with retries. | We introduce a global bundle state channel(a persistent topic table-view), where a total order of bundle commands is reliably persisted and broadcasted by all brokers. |
Distribute load balance operations | The leader broker decides on bundle assignment, unload, and splitting. The owner brokers run the unload and split operations notified via RPC. | Each broker decides and runs bundle assignment and split operations. The leader decides bundle unload(transfer), and the owner brokers run the unload operation, notified via the bundle state channel. |
Reduce load data replication among brokers | All brokers’ and all bundles’ load data are stored in ZK and replicated to all brokers via ZK watchers. | All brokers’ load data is replicated to all brokers via a non-persistent topic(table-view). Only top-n bundles’ load data from each broker is replicated to the leader broker via a non-persistent topic(table-view). |
Minimize the topic unavailability from unloading | After topic connections are closed, clients reconnect to a new broker, and the new broker initiates a new topic assignment. The leader broker assigns a new owner, and eventually, the client will be redirected to the new owner broker. | We introduce a new unload option, “transfer”, where the new owner is pre-assigned before the topic connections are closed. Clients immediately redirect to the new owner broker without the client-initiated topic assignments. |
Share bundle-broker ownership metadata among brokers for owner broker discovery | The bundle-broker ownership data are stored in ZK. All brokers read bundle ownership info upon TopicLookUp requests(with caching local bundle ownership info). | The global ownership data is stored in the bundle state channel(a persistent topic table-view). With compaction, all brokers read its latest global ownership table-view(cached in memory) upon TopicLookUp requests. |
Show transparent load balance decisions with logs and metrics | Emit logs best-effort basis. | We design logging/metrics as separate logical components. We document and share major log messages and metrics for all important load balance events |
Post Update
Added ServiceConfiguration
### --- Load balancer extension --- ###
# Option to enable the debug mode for the load balancer logics.
# The debug mode prints more logs to provide more information such as load balance states and decisions.
# (only used in load balancer extension logics)
loadBalancerDebugModeEnabled=false
# The target standard deviation of the resource usage across brokers
# (100% resource usage is 1.0 load).
# The shedder logic tries to distribute bundle load across brokers to meet this target std.
# The smaller value will incur load balancing more frequently.
# (only used in load balancer extension TransferSheddeer)
loadBalancerBrokerLoadTargetStd=0.25
# Threshold to the consecutive count of fulfilled shedding(unload) conditions.
# If the unload scheduler consecutively finds bundles that meet unload conditions
# many times bigger than this threshold, the scheduler will shed the bundles.
# The bigger value will incur less bundle unloading/transfers.
# (only used in load balancer extension TransferSheddeer)
loadBalancerSheddingConditionHitCountThreshold=3
# Option to enable the bundle transfer mode when distributing bundle loads.
# On: transfer bundles from overloaded brokers to underloaded
# -- pre-assigns the destination broker upon unloading).
# Off: unload bundles from overloaded brokers
# -- post-assigns the destination broker upon lookups).
# (only used in load balancer extension TransferSheddeer)
loadBalancerTransferEnabled=true
# Maximum number of brokers to unload bundle load for each unloading cycle.
# The bigger value will incur more unloading/transfers for each unloading cycle.
# (only used in load balancer extension TransferSheddeer)
loadBalancerMaxNumberOfBrokerSheddingPerCycle=3
# Delay (in seconds) to the next unloading cycle after unloading.
# The logic tries to give enough time for brokers to recompute load after unloading.
# The bigger value will delay the next unloading cycle longer.
# (only used in load balancer extension TransferSheddeer)
loadBalanceSheddingDelayInSeconds=180
# Broker load data time to live (TTL in seconds).
# The logic tries to avoid (possibly unavailable) brokers with out-dated load data,
# and those brokers will be ignored in the load computation.
# When tuning this value, please consider loadBalancerReportUpdateMaxIntervalMinutes.
#The current default is loadBalancerReportUpdateMaxIntervalMinutes * 2.
# (only used in load balancer extension TransferSheddeer)
loadBalancerBrokerLoadDataTTLInSeconds=1800
# Max number of bundles in bundle load report from each broker.
# The load balancer distributes bundles across brokers,
# based on topK bundle load data and other broker load data.
# The bigger value will increase the overhead of reporting many bundles in load data.
# (only used in load balancer extension logics)
loadBalancerMaxNumberOfBundlesInBundleLoadReport=10
# Service units'(bundles) split interval. Broker periodically checks whether
# some service units(e.g. bundles) should split if they become hot-spots.
# (only used in load balancer extension logics)
loadBalancerSplitIntervalMinutes=1
# Max number of bundles to split to per cycle.
# (only used in load balancer extension logics)
loadBalancerMaxNumberOfBundlesToSplitPerCycle=10
# Threshold to the consecutive count of fulfilled split conditions.
# If the split scheduler consecutively finds bundles that meet split conditions
# many times bigger than this threshold, the scheduler will trigger splits on the bundles
# (if the number of bundles is less than loadBalancerNamespaceMaximumBundles).
# (only used in load balancer extension logics)
loadBalancerNamespaceBundleSplitConditionHitCountThreshold=3
# After this delay, the service-unit state channel tombstones any service units (e.g., bundles)
# in semi-terminal states. For example, after splits, parent bundles will be `deleted`,
# and then after this delay, the parent bundles' state will be `tombstoned`
# in the service-unit state channel.
# Pulsar does not immediately remove such semi-terminal states
# to avoid unnecessary system confusion,
# as the bundles in the `tombstoned` state might temporarily look available to reassign.
# Rarely, one could lower this delay in order to aggressively clean
# the service-unit state channel when there are a large number of bundles.
# minimum value = 30 secs
# (only used in load balancer extension logics)
loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds=3600
# Option to automatically unload namespace bundles with affinity(isolation)
# or anti-affinity group policies.
# Such bundles are not ideal targets to auto-unload as destination brokers are limited.
# (only used in load balancer extension logics)
loadBalancerSheddingBundlesWithPoliciesEnabled = false
# Time to wait before fixing any stuck in-flight service unit states.
# The leader monitor fixes any in-flight service unit(bundle) states
# by reassigning the ownerships if stuck too long, longer than this period.
# (only used in load balancer extension logics)
loadBalancerInFlightServiceUnitStateWaitingTimeInMillis = 30000;
# The service unit(bundle) state channel is periodically monitored
# by the leader broker at this interval
# to fix any orphan bundle ownerships, stuck in-flight states, and other cleanup jobs.
# `loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds` * 1000 must be bigger than
# `loadBalancerInFlightServiceUnitStateWaitingTimeInMillis`.
# (only used in load balancer extension logics)
loadBalancerServiceUnitStateMonitorIntervalInSeconds = 60;