Skip to content

iterative rebalance tolerance #253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/topicmappr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Flags:
--partition-size-threshold int Size in megabytes where partitions below this value will not be moved in a rebalance (default 512)
--storage-threshold float Percent below the harmonic mean storage free to target for partition offload (0 targets a brokers) (default 0.2)
--storage-threshold-gb float Storage free in gigabytes to target for partition offload (those below the specified value); 0 [default] defers target selection to --storage-threshold
--tolerance float Percent distance from the mean storage free to limit storage scheduling (default 0.1)
--tolerance float Percent distance from the mean storage free to limit storage scheduling (0 performs automatic tolerance selection)
--topics string Rebuild topics (comma delim. list) by lookup in ZooKeeper
--verbose Verbose output
--zk-metrics-prefix string ZooKeeper namespace prefix for Kafka metrics (default "topicmappr")
Expand Down
2 changes: 1 addition & 1 deletion cmd/topicmappr/commands/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

const (
indent = "\x20\x20"
div = 1073741824.00
div = 1 << 30
)

var (
Expand Down
159 changes: 107 additions & 52 deletions cmd/topicmappr/commands/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@ var rebalanceCmd = &cobra.Command{
Run: rebalance,
}

// Rebalance may be configured to run a series
// of rebalance plans. A rebalanceResults holds
// any relevant output along with metadata that
// hints at the quality of the output, such as
// the resulting storage utilization range.
type rebalanceResults struct {
storageRange float64
tolerance float64
partitionMap *kafkazk.PartitionMap
relocations map[int][]relocation
brokers kafkazk.BrokerMap
}

func init() {
rootCmd.AddCommand(rebalanceCmd)

Expand All @@ -26,7 +39,7 @@ func init() {
rebalanceCmd.Flags().String("brokers", "", "Broker list to scope all partition placements to ('-1' automatically expands to all currently mapped brokers)")
rebalanceCmd.Flags().Float64("storage-threshold", 0.20, "Percent below the harmonic mean storage free to target for partition offload (0 targets a brokers)")
rebalanceCmd.Flags().Float64("storage-threshold-gb", 0.00, "Storage free in gigabytes to target for partition offload (those below the specified value); 0 [default] defers target selection to --storage-threshold")
rebalanceCmd.Flags().Float64("tolerance", 0.10, "Percent distance from the mean storage free to limit storage scheduling")
rebalanceCmd.Flags().Float64("tolerance", 0.0, "Percent distance from the mean storage free to limit storage scheduling (0 performs automatic tolerance selection)")
rebalanceCmd.Flags().Int("partition-limit", 30, "Limit the number of top partitions by size eligible for relocation per broker")
rebalanceCmd.Flags().Int("partition-size-threshold", 512, "Size in megabytes where partitions below this value will not be moved in a rebalance")
rebalanceCmd.Flags().Bool("locality-scoped", false, "Disallow a relocation to traverse rack.id values among brokers")
Expand Down Expand Up @@ -58,30 +71,21 @@ func rebalance(cmd *cobra.Command, _ []string) {
partitionMeta := getPartitionMeta(cmd, zk)

// Get the current partition map.
partitionMap, err := kafkazk.PartitionMapFromZK(Config.topics, zk)
partitionMapIn, err := kafkazk.PartitionMapFromZK(Config.topics, zk)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

partitionMapOrig := partitionMap.Copy()

// Print topics matched to input params.
printTopics(partitionMap)

// Get a mapping of broker IDs to topics, partitions.
mappings := partitionMap.Mappings()
printTopics(partitionMapIn)

// Get a broker map.
brokers := kafkazk.BrokerMapFromPartitionMap(partitionMap, brokerMeta, false)
brokersIn := kafkazk.BrokerMapFromPartitionMap(partitionMapIn, brokerMeta, false)

// Validate all broker params, get a copy of the
// broker IDs targeted for partition offloading.
offloadTargets := validateBrokersForRebalance(cmd, brokers, brokerMeta)

// Store a copy of the original
// broker map, post updates.
brokersOrig := brokers.Copy()
offloadTargets := validateBrokersForRebalance(cmd, brokersIn, brokerMeta)

partitionLimit, _ := cmd.Flags().GetInt("partition-limit")
partitionSizeThreshold, _ := cmd.Flags().GetInt("partition-size-threshold")
Expand All @@ -91,52 +95,103 @@ func rebalance(cmd *cobra.Command, _ []string) {
otm[id] = struct{}{}
}

// Bundle planRelocationsForBrokerParams.
params := planRelocationsForBrokerParams{
relos: map[int][]relocation{},
mappings: mappings,
brokers: brokers,
partitionMeta: partitionMeta,
plan: relocationPlan{},
topPartitionsLimit: partitionLimit,
partitionSizeThreshold: partitionSizeThreshold,
offloadTargetsMap: otm,
}
resultsByRange := []rebalanceResults{}

for i := 0.01; i < 0.99; i += 0.01 {
partitionMap := partitionMapIn.Copy()

// Whether we're using a fixed tolerance
// (non 0.00) set via flag or an iterative value.
tolFlag, _ := cmd.Flags().GetFloat64("tolerance")
var tol float64

if tolFlag == 0.00 {
tol = i
} else {
tol = tolFlag
}

// Sort offloadTargets by storage free ascending.
sort.Sort(offloadTargetsBySize{t: offloadTargets, bm: brokers})

// Iterate over offload targets, planning
// at most one relocation per iteration.
// Continue this loop until no more relocations
// can be planned.
for exhaustedCount := 0; exhaustedCount < len(offloadTargets); {
params.pass++
for _, sourceID := range offloadTargets {
// Update the source broker ID
params.sourceID = sourceID

relos := planRelocationsForBroker(cmd, params)

// If no relocations could be planned,
// increment the exhaustion counter.
if relos == 0 {
exhaustedCount++
// Bundle planRelocationsForBrokerParams.
params := planRelocationsForBrokerParams{
relos: map[int][]relocation{},
mappings: partitionMap.Mappings(),
brokers: brokersIn.Copy(),
partitionMeta: partitionMeta,
plan: relocationPlan{},
topPartitionsLimit: partitionLimit,
partitionSizeThreshold: partitionSizeThreshold,
offloadTargetsMap: otm,
tolerance: tol,
}

// Sort offloadTargets by storage free ascending.
sort.Sort(offloadTargetsBySize{t: offloadTargets, bm: params.brokers})

// Iterate over offload targets, planning
// at most one relocation per iteration.
// Continue this loop until no more relocations
// can be planned.
for exhaustedCount := 0; exhaustedCount < len(offloadTargets); {
params.pass++
for _, sourceID := range offloadTargets {
// Update the source broker ID
params.sourceID = sourceID

relos := planRelocationsForBroker(cmd, params)

// If no relocations could be planned,
// increment the exhaustion counter.
if relos == 0 {
exhaustedCount++
}
}
}

// Update the partition map with the relocation plan.
applyRelocationPlan(cmd, partitionMap, params.plan)

// Populate the output.
resultsByRange = append(resultsByRange, rebalanceResults{
storageRange: params.brokers.StorageRange(),
tolerance: tol,
partitionMap: partitionMap,
relocations: params.relos,
brokers: params.brokers,
})

// Break early if we're using a fixed tolerance value.
if tolFlag != 0.00 {
break
}
}

// Print planned relocations.
printPlannedRelocations(offloadTargets, params.relos, partitionMeta)
// Sort the rebalance results by range ascending.
sort.Slice(resultsByRange, func(i, j int) bool {
switch{
case resultsByRange[i].storageRange < resultsByRange[j].storageRange:
return true
case resultsByRange[i].storageRange > resultsByRange[j].storageRange:
return false
}

return resultsByRange[i].tolerance < resultsByRange[j].tolerance
})

// Chose the results with the lowest range.
m := resultsByRange[0]
partitionMapOut, brokersOut, relos := m.partitionMap, m.brokers, m.relocations

// Update the partition map with the relocation plan.
applyRelocationPlan(cmd, partitionMap, params.plan)
// Print parameters used for rebalance decisions.
printRebalanceParams(cmd, resultsByRange, brokersIn, m.tolerance)

// Print planned relocations.
printPlannedRelocations(offloadTargets, relos, partitionMeta)

// Print map change results.
printMapChanges(partitionMapOrig, partitionMap)
printMapChanges(partitionMapIn, partitionMapOut)

// Print broker assignment statistics.
errs := printBrokerAssignmentStats(cmd, partitionMapOrig, partitionMap, brokersOrig, brokers)
errs := printBrokerAssignmentStats(cmd, partitionMapIn, partitionMapOut, brokersIn, brokersOut)

// Handle errors that are possible
// to be overridden by the user (aka
Expand All @@ -145,8 +200,8 @@ func rebalance(cmd *cobra.Command, _ []string) {

// Ignore no-ops; rebalances will naturally have
// a high percentage of these.
partitionMapOrig, partitionMap = skipReassignmentNoOps(partitionMapOrig, partitionMap)
partitionMapIn, partitionMapOut = skipReassignmentNoOps(partitionMapIn, partitionMapOut)

// Write maps.
writeMaps(cmd, partitionMap)
writeMaps(cmd, partitionMapOut)
}
47 changes: 32 additions & 15 deletions cmd/topicmappr/commands/rebalance_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type planRelocationsForBrokerParams struct {
topPartitionsLimit int
partitionSizeThreshold int
offloadTargetsMap map[int]struct{}
tolerance float64
}

// relocationPlan is a mapping of topic,
Expand Down Expand Up @@ -173,12 +174,27 @@ func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap,
}
}

fmt.Printf("\n%s:\n", selectorMethod.String())

// Exit if no target brokers were found.
if len(offloadTargets) == 0 {
fmt.Printf("%s[none]\n", indent)
os.Exit(0)
} else {
for _, id := range offloadTargets {
fmt.Printf("%s%d\n", indent, id)
}
}

return offloadTargets
}

func printRebalanceParams(cmd *cobra.Command, results []rebalanceResults, brokers kafkazk.BrokerMap, tol float64) {
// Print rebalance parameters as a result of
// input configurations and brokers found
// to be beyond the storage threshold.
fmt.Println("\nRebalance parameters:")

tol, _ := cmd.Flags().GetFloat64("tolerance")
pst, _ := cmd.Flags().GetInt("partition-size-threshold")
mean, hMean := brokers.Mean(), brokers.HMean()

Expand All @@ -192,24 +208,24 @@ func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap,
fmt.Printf("%s%sSources limited to <= %.2fGB\n", indent, indent, mean*(1+tol)/div)
fmt.Printf("%s%sDestinations limited to >= %.2fGB\n", indent, indent, mean*(1-tol)/div)

fmt.Printf("\n%s:\n", selectorMethod.String())
verbose, _ := cmd.Flags().GetBool("verbose")

// Exit if no target brokers were found.
if len(offloadTargets) == 0 {
fmt.Printf("%s[none]\n", indent)
os.Exit(0)
} else {
for _, id := range offloadTargets {
fmt.Printf("%s%d\n", indent, id)
// Print the top 10 rebalance results
// in verbose.
if verbose {
fmt.Printf("%s-\n%sTop 10 rebalance map results\n", indent, indent)
for i := range results {
fmt.Printf("%stolerance: %.2f -> range: %.2fGB\n",
indent, results[i].tolerance, results[i].storageRange/div)
if i == 10 {
break
}
}
}

return offloadTargets
}

func planRelocationsForBroker(cmd *cobra.Command, params planRelocationsForBrokerParams) int {
verbose, _ := cmd.Flags().GetBool("verbose")
tolerance, _ := cmd.Flags().GetFloat64("tolerance")
localityScoped, _ := cmd.Flags().GetBool("locality-scoped")

relos := params.relos
Expand All @@ -219,8 +235,9 @@ func planRelocationsForBroker(cmd *cobra.Command, params planRelocationsForBroke
plan := params.plan
sourceID := params.sourceID
topPartitionsLimit := params.topPartitionsLimit
partitionSizeThreshold := float64(params.partitionSizeThreshold * 1048576)
partitionSizeThreshold := float64(params.partitionSizeThreshold * 1 << 20)
offloadTargetsMap := params.offloadTargetsMap
tolerance := params.tolerance

// Use the arithmetic mean for target
// thresholds.
Expand All @@ -239,8 +256,8 @@ func planRelocationsForBroker(cmd *cobra.Command, params planRelocationsForBroke
}

if verbose {
fmt.Printf("\n[pass %d] Broker %d has a storage free of %.2fGB. Top partitions:\n",
params.pass, sourceID, brokers[sourceID].StorageFree/div)
fmt.Printf("\n[pass %d with tolerance %.2f] Broker %d has a storage free of %.2fGB. Top partitions:\n",
params.pass, tolerance, sourceID, brokers[sourceID].StorageFree/div)

for _, p := range topPartn {
pSize, _ := partitionMeta.Size(p)
Expand Down