Skip to content

Incrementally transfer chunks per token to improve handover #1764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ $(EXES):
protos: $(PROTO_GOS)

%.pb.go:
protoc -I $(GOPATH)/src:./vendor:./$(@D) --gogoslick_out=plugins=grpc,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)
protoc -I $(GOPATH)/src:./vendor:./$(@D):. --gogoslick_out=paths=source_relative,plugins=grpc,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)

lint:
misspell -error docs
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,18 @@ It also talks to a KVStore and has it's own copies of the same flags used by the

How many times a LEAVING ingester tries to find a PENDING ingester during the [hand-over process](../guides/ingester-handover.md). Each attempt takes a second or so. Negative value or zero disables hand-over process completely. (default 10)

- `-ingester.join-incremental-transfer`

Enables incremental transfer of chunks when joining the ring. When enabled, a joining ingester will insert its tokens into the ring one at a time. For each token that is inserted, the ingester will request chunks from its peers. The mechanism used to determine what chunks an ingester requests is based on the inserted token and its position in the ring.

When this flag is enabled, the hand-over process is disabled and the ingester.join-after flag is ignored.

- `-ingester.leave-incremental-transfer`

Enables incremental transfer of chunks when leaving the ring. When enabled, a leaving ingester will remove its tokens from the ring one at a time. For each token that is removed, the ingester will send chunks to its peers. The mechanism used to determine what chunks an ingester sends is based on the leaving token and its position in the ring.

When this flag is enabled, the hand-over process is disabled. Flushing chunks will still occur to flush any data that could not be transferred.

- `-ingester.normalise-tokens`

Deprecated. New ingesters always write "normalised" tokens to the ring. Normalised tokens consume less memory to encode and decode; as the ring is unmarshalled regularly, this significantly reduces memory usage of anything that watches the ring.
Expand Down
39 changes: 39 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,46 @@ lifecycler:
# CLI flag: -ingester.tokens-file-path
[tokens_file_path: <string> | default = ""]

# Request chunks from neighboring ingesters on join. Disables the handoff
# process when set and ignores the -ingester.join-after flag.
# CLI flag: -ingester.join-incremental-transfer
[join_incremental_transfer: <boolean> | default = false]

# Send chunks to neighboring ingesters on leave. Takes precedence over chunk
# flushing when set and disables handoff.
# CLI flag: -ingester.leave-incremental-transfer
[leave_incremental_transfer: <boolean> | default = false]

# Minimum amount of time to wait before incrementally joining the ring. Allows
# time to receieve ring updates so two ingesters do not join at once.
# CLI flag: -ingester.min-incremental-join-jitter
[min_incremental_join_jitter: <duration> | default = 0s]

# Maximum amount of time to wait before incrementally joining the ring. Allows
# time to receieve ring updates so two ingesters do not join at once.
# CLI flag: -ingester.max-incremental-join-jitter
[max_incremental_join_jitter: <duration> | default = 2s]

# How long after the incremental join process to notify the target ingesters
# to clean up any blocked token ranges.
# CLI flag: -ingester.transfer-finish-delay
[transfer_finish_delay: <duration> | default = 5s]

token_checker:
# Period with which to check that all in-memory streams fall within expected
# token ranges. 0 to disable.
# CLI flag: -token-checker.check-on-interval
[check_on_interval: <duration> | default = 0s]

# Number of times to try and transfer chunks before falling back to flushing.
# Negative value or zero disables hand-over.
# CLI flag: -ingester.max-transfer-retries
[max_transfer_retries: <int> | default = 10]

# Period after which write blocks on ranges expire.
# CLI flag: -ingester.range-block-period
[range_block_period: <duration> | default = 1m0s]

# Period with which to attempt to flush chunks.
# CLI flag: -ingester.flush-period
[flushcheckperiod: <duration> | default = 1m0s]
Expand Down Expand Up @@ -497,6 +532,10 @@ lifecycler:
# CLI flag: -ingester.spread-flushes
[spreadflushes: <boolean> | default = false]

# Check tokens for streams that are created or appended to.
# CLI flag: -ingester.check-tokens
[check_tokens: <boolean> | default = false]

# Period with which to update the per-user ingestion rates.
# CLI flag: -ingester.rate-update-period
[rateupdateperiod: <duration> | default = 15s]
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func newCompactor(
// Initialize the compactors ring if sharding is enabled.
if compactorCfg.ShardingEnabled {
lifecyclerCfg := compactorCfg.ShardingRing.ToLifecyclerConfig()
lifecycler, err := ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ring.CompactorRingKey, false)
lifecycler, err := ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), nil, "compactor", ring.CompactorRingKey, false)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize compactor ring lifecycler")
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
if !canJoinDistributorsRing {
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true)
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, nil, "distributor", ring.DistributorRingKey, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -288,7 +288,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
// Validates a single series from a write request. Will remove labels if
// any are configured to be dropped for the user ID.
// Returns the validated series with it's labels/samples, and any error.
func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, userID string) (client.PreallocTimeseries, error) {
func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, userID string, token uint32) (client.PreallocTimeseries, error) {
labelsHistogram.Observe(float64(len(ts.Labels)))
if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil {
return emptyPreallocSeries, err
Expand All @@ -307,6 +307,7 @@ func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, user
TimeSeries: &client.TimeSeries{
Labels: ts.Labels,
Samples: samples,
Token: token,
},
},
nil
Expand Down Expand Up @@ -384,7 +385,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
return nil, err
}

validatedSeries, err := d.validateSeries(ts, userID)
validatedSeries, err := d.validateSeries(ts, userID, key)

// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
Expand Down
Loading