Skip to content

Commit

Permalink
add support for pipeline.ordered setting for java execution (elastic#…
Browse files Browse the repository at this point in the history
…11524)

reuse rubyArray for single element batches

rename preserveBatchOrder to preserveEventOrder

allow boolean and string values for the pipeline.ordered setting, reorg validation

update docs

yml typo

Update docs/static/running-logstash-command-line.asciidoc

Co-Authored-By: Karen Metts <35154725+karenzone@users.noreply.github.com>

Update docs/static/running-logstash-command-line.asciidoc

Co-Authored-By: Karen Metts <35154725+karenzone@users.noreply.github.com>

java execution specs and spec support

docs corrections per review

typo

close not shutdown

Ruby pipeline spec
  • Loading branch information
colinsurprenant authored Jan 29, 2020
1 parent 13cf267 commit 0bc9fa5
Show file tree
Hide file tree
Showing 15 changed files with 301 additions and 15 deletions.
9 changes: 9 additions & 0 deletions config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@
#
# pipeline.unsafe_shutdown: false
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" or "false".
# "auto" will automatically enable ordering if the 'pipeline.workers' setting
# is also set to '1'.
# "true" will enforce ordering on the pipeline and prevent logstash from starting
# if there are multiple workers.
# "false" will disable any extra processing necessary for preserving ordering.
#
pipeline.ordered: auto
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
Expand Down
14 changes: 13 additions & 1 deletion docs/static/running-logstash-command-line.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ With this command, Logstash concatenates three config files, `/tmp/one`, `/tmp/t
backing up, or that the CPU is not saturated, consider increasing this number to better utilize
machine processing power. The default is the number of the host's CPU cores.

*`--pipeline.ordered ORDERED`*::
Preserves events order. Possible values are `auto` (default), `true` and `false`.
This setting
will work only when also using a single worker for the pipeline.
Note that when enabled, it may impact the performance of the filters
and ouput processing.
The `auto` option will automatically enable ordering if the
`pipeline.workers` setting is set to `1`.
Use `true` to enable ordering on the pipeline and prevent logstash
from starting if there are multiple workers.
Use `false` to disable any extra processing necessary for preserving
ordering.

*`-b, --pipeline.batch.size SIZE`*::
Size of batches the pipeline is to work in. This option defines the maximum number of events an
individual worker thread will collect from inputs before attempting to execute its filters and outputs.
Expand Down Expand Up @@ -189,4 +202,3 @@ With this command, Logstash concatenates three config files, `/tmp/one`, `/tmp/t

*`-h, --help`*::
Print help

16 changes: 16 additions & 0 deletions docs/static/settings-file.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,22 @@ The `logstash.yml` file includes the following settings.
| (Beta) Load Java plugins in independent classloaders to isolate their dependencies.
| `false`

| `pipeline.ordered`
a|
Set the pipeline event ordering.Valid options are:

* `auto`
* `true`
* `false`

`auto` will automatically enable ordering if the `pipeline.workers` setting is also set to `1`.
`true` will enforce ordering on the pipeline and prevent logstash from starting
if there are multiple workers.
`false` will disable the processing required to preserve order. Ordering will not be
guaranteed, but you save the processing cost of preserving order.

| `auto`

| `path.config`
| The path to the Logstash config for the main pipeline. If you specify a directory or wildcard,
config files are read from the directory in alphabetical order.
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ module Environment
Setting::Boolean.new("pipeline.reloadable", true),
Setting::Boolean.new("pipeline.plugin_classloaders", false),
Setting::Boolean.new("pipeline.separate_logs", false),
Setting::CoercibleString.new("pipeline.ordered", "auto", true, ["auto", "true", "false"]),
Setting.new("path.plugins", Array, []),
Setting::NullableString.new("interactive", nil, false),
Setting::Boolean.new("config.debug", false),
Expand Down
19 changes: 18 additions & 1 deletion logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def start_workers
maybe_setup_out_plugins

pipeline_workers = safe_pipeline_worker_count
@preserve_event_order = preserve_event_order?(pipeline_workers)
batch_size = settings.get("pipeline.batch.size")
batch_delay = settings.get("pipeline.batch.delay")

Expand Down Expand Up @@ -488,7 +489,8 @@ def init_worker_loop
@flushRequested,
@flushing,
@shutdownRequested,
@drain_queue)
@drain_queue,
@preserve_event_order)
rescue => e
@logger.error(
"Worker loop initialization error",
Expand All @@ -509,4 +511,19 @@ def default_logging_keys(other_keys = {})
keys[:thread] ||= thread.inspect if thread
keys
end

def preserve_event_order?(pipeline_workers)
case settings.get("pipeline.ordered")
when "auto"
if settings.set?("pipeline.workers") && settings.get("pipeline.workers") == 1
@logger.warn("'pipeline.ordered' is enabled and is likely less efficient, consider disabling if preserving event order is not necessary")
return true
end
when "true"
fail("enabling the 'pipeline.ordered' setting requires the use of a single pipeline worker") if pipeline_workers > 1
return true
end

false
end
end; end
9 changes: 9 additions & 0 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ def start_workers
maybe_setup_out_plugins

pipeline_workers = safe_pipeline_worker_count
verify_event_ordering!(pipeline_workers)
batch_size = settings.get("pipeline.batch.size")
batch_delay = settings.get("pipeline.batch.delay")

Expand Down Expand Up @@ -653,4 +654,12 @@ def default_logging_keys(other_keys = {})
def draining_queue?
@drain_queue ? !filter_queue_client.empty? : false
end

def verify_event_ordering!(pipeline_workers)
# the Ruby execution keep event order by design but when using a single worker only
if settings.get("pipeline.ordered") == "true" && pipeline_workers > 1
fail("enabling the 'pipeline.ordered' setting requires the use of a single pipeline worker")
end
end

end; end
5 changes: 5 additions & 0 deletions logstash-core/lib/logstash/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ class LogStash::Runner < Clamp::StrictCommand
:attribute_name => "pipeline.workers",
:default => LogStash::SETTINGS.get_default("pipeline.workers")

option "--pipeline.ordered", "ORDERED",
I18n.t("logstash.runner.flag.pipeline-ordered"),
:attribute_name => "pipeline.ordered",
:default => LogStash::SETTINGS.get_default("pipeline.ordered")

option ["--java-execution"], :flag,
I18n.t("logstash.runner.flag.java-execution"),
:attribute_name => "pipeline.java_execution",
Expand Down
23 changes: 23 additions & 0 deletions logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class Settings
"pipeline.reloadable",
"pipeline.system",
"pipeline.workers",
"pipeline.ordered",
"queue.checkpoint.acks",
"queue.checkpoint.interval",
"queue.checkpoint.writes",
Expand Down Expand Up @@ -464,6 +465,28 @@ def validate(value)
end
end

# The CoercibleString allows user to enter any value which coerces to a String.
# For example for true/false booleans; if the possible_strings are ["foo", "true", "false"]
# then these options in the config file or command line will be all valid: "foo", true, false, "true", "false"
#
class CoercibleString < Coercible
def initialize(name, default=nil, strict=true, possible_strings=[], &validator_proc)
@possible_strings = possible_strings
super(name, Object, default, strict, &validator_proc)
end

def coerce(value)
value.to_s
end

def validate(value)
super(value)
unless @possible_strings.empty? || @possible_strings.include?(value)
raise ArgumentError.new("Invalid value \"#{value}\". Options are: #{@possible_strings.inspect}")
end
end
end

class ExistingFilePath < Setting
def initialize(name, default=nil, strict=true)
super(name, ::String, default, strict) do |file_path|
Expand Down
12 changes: 12 additions & 0 deletions logstash-core/locales/en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,18 @@ en:
Sets the ID of the pipeline.
pipeline-workers: |+
Sets the number of pipeline workers to run.
pipeline-ordered: |+
Preserve events order. Possible values are `auto` (default), `true` and `false`.
This setting
will only work when also using a single worker for the pipeline.
Note that when enabled, it may impact the performance of the filters
and ouput processing.
The `auto` option will automatically enable ordering if the
`pipeline.workers` setting is set to `1`.
Use `true` to enable ordering on the pipeline and prevent logstash
from starting if there are multiple workers.
Use `false` to disable any extra processing necessary for preserving
ordering.
java-execution: |+
Use Java execution engine.
plugin-classloaders: |+
Expand Down
14 changes: 13 additions & 1 deletion logstash-core/spec/conditionals_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
require 'support/pipeline/pipeline_helpers'

module ConditionalFanciness
include PipelineHelpers
def description
return self.metadata[:description]
end
Expand Down Expand Up @@ -63,6 +62,19 @@ def multi_receive(events)

describe "conditionals in filter" do
extend ConditionalFanciness
extend PipelineHelpers

let(:settings) do
# settings is used by sample_one.
# This was originally set directly in sample_one and
# pipeline.workers was also set to 1. I am preserving
# this setting here for the sake of minimizing change
# but unsure if this is actually required.

s = LogStash::SETTINGS.clone
s.set_value("pipeline.workers", 1)
s
end

describe "simple" do
config <<-CONFIG
Expand Down
11 changes: 11 additions & 0 deletions logstash-core/spec/logstash/filters/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ def filter(event)

describe LogStash::Filters::NOOP do
extend PipelineHelpers
let(:settings) do
# settings is used by sample_one.
# This was originally set directly in sample_one and
# pipeline.workers was also set to 1. I am preserving
# this setting here for the sake of minimizing change
# but unsure if this is actually required.

s = LogStash::SETTINGS.clone
s.set_value("pipeline.workers", 1)
s
end

describe "adding multiple values to one field" do
config <<-CONFIG
Expand Down
Loading

0 comments on commit 0bc9fa5

Please sign in to comment.