Skip to content

Index Lifecycle Management Support #805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ci/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ else
extra_tag_args="--tag secure_integration"
fi

if [[ "$DISTRIBUTION" == "oss" ]]; then
extra_tag_args="$extra_tag_args --tag distribution:oss --tag ~distribution:xpack"
elif [[ "$DISTRIBUTION" == "default" ]]; then
extra_tag_args="$extra_tag_args --tag ~distribution:oss --tag distribution:xpack"
fi

case "$ES_VERSION" in
LATEST-SNAPSHOT-*)
split_latest=${ES_VERSION##*-}
Expand Down
4 changes: 4 additions & 0 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,17 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
require "logstash/outputs/elasticsearch/http_client_builder"
require "logstash/outputs/elasticsearch/common_configs"
require "logstash/outputs/elasticsearch/common"
require "logstash/outputs/elasticsearch/ilm"

# Protocol agnostic (i.e. non-http, non-java specific) configs go here
include(LogStash::Outputs::ElasticSearch::CommonConfigs)

# Protocol agnostic methods
include(LogStash::Outputs::ElasticSearch::Common)

# Methods for ILM support
include(LogStash::Outputs::ElasticSearch::Ilm)

config_name "elasticsearch"

# The Elasticsearch action to perform. Valid actions are:
Expand Down
13 changes: 8 additions & 5 deletions lib/logstash/outputs/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ def register

setup_hosts # properly sets @hosts
build_client
setup_after_successful_connection
check_action_validity
@bulk_request_metrics = metric.namespace(:bulk_requests)
@document_level_metrics = metric.namespace(:documents)
install_template_after_successful_connection
@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s))
end

Expand All @@ -38,15 +38,19 @@ def multi_receive(events)
retrying_submit(events.map {|e| event_action_tuple(e)})
end

def install_template_after_successful_connection
def setup_after_successful_connection
@template_installer ||= Thread.new do
sleep_interval = @retry_initial_interval
until successful_connection? || @stopping.true?
@logger.debug("Waiting for connectivity to Elasticsearch cluster. Retrying in #{sleep_interval}s")
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
sleep_interval = next_sleep_interval(sleep_interval)
end
install_template if successful_connection?
if successful_connection?
verify_ilm_readiness if ilm_enabled?
install_template
setup_ilm if ilm_enabled?
end
end
end

Expand Down Expand Up @@ -114,7 +118,6 @@ def maximum_seen_major_version
client.maximum_seen_major_version
end


def routing_field_name
maximum_seen_major_version >= 6 ? :routing : :_routing
end
Expand Down Expand Up @@ -353,4 +356,4 @@ def dlq_enabled?
!execution_context.dlq_writer.inner_writer.is_a?(::LogStash::Util::DummyDeadLetterQueueWriter)
end
end
end; end; end
end end end
24 changes: 23 additions & 1 deletion lib/logstash/outputs/elasticsearch/common_configs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

module LogStash; module Outputs; class ElasticSearch
module CommonConfigs

DEFAULT_INDEX_NAME = "logstash-%{+YYYY.MM.dd}"
DEFAULT_POLICY = "logstash-policy"

def self.included(mod)
# The index to write events to. This can be dynamic using the `%{foo}` syntax.
# The default value will partition your indices by day so you can more easily
Expand All @@ -10,7 +14,7 @@ def self.included(mod)
# For weekly indexes ISO 8601 format is recommended, eg. logstash-%{+xxxx.ww}.
# LS uses Joda to format the index pattern from event timestamp.
# Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here].
mod.config :index, :validate => :string, :default => "logstash-%{+YYYY.MM.dd}"
mod.config :index, :validate => :string, :default => DEFAULT_INDEX_NAME

mod.config :document_type,
:validate => :string,
Expand Down Expand Up @@ -136,6 +140,24 @@ def self.included(mod)
# Set which ingest pipeline you wish to execute for an event. You can also use event dependent configuration
# here like `pipeline => "%{INGEST_PIPELINE}"`
mod.config :pipeline, :validate => :string, :default => nil


# -----
# ILM configurations (beta)
# -----
# Flag for enabling Index Lifecycle Management integration.
mod.config :ilm_enabled, :validate => :boolean, :default => false

# Rollover alias used for indexing data. If rollover alias doesn't exist, Logstash will create it and map it to the relevant index
mod.config :ilm_rollover_alias, :validate => :string, :default => 'logstash'

# appends “{now/d}-000001” by default for new index creation, subsequent rollover indices will increment based on this pattern i.e. “000002”
# {now/d} is date math, and will insert the appropriate value automatically.
mod.config :ilm_pattern, :validate => :string, :default => '{now/d}-000001'

# ILM policy to use, if undefined the default policy will be used.
mod.config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY

end
end
end end end
14 changes: 14 additions & 0 deletions lib/logstash/outputs/elasticsearch/default-ilm-policy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"policy" : {
"phases": {
"hot" : {
"actions" : {
"rollover" : {
"max_size" : "50gb",
"max_age":"30d"
}
}
}
}
}
}
49 changes: 47 additions & 2 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -337,17 +337,62 @@ def host_to_url(h)
::LogStash::Util::SafeURI.new(raw_url)
end

def template_exists?(name)
response = @pool.head("/_template/#{name}")
def exists?(path, use_get=false)
response = use_get ? @pool.get(path) : @pool.head(path)
response.code >= 200 && response.code <= 299
end

def template_exists?(name)
exists?("/_template/#{name}")
end

def template_put(name, template)
path = "_template/#{name}"
logger.info("Installing elasticsearch template to #{path}")
@pool.put(path, nil, LogStash::Json.dump(template))
end

# ILM methods

# check whether rollover alias already exists
def rollover_alias_exists?(name)
exists?(name)
end

# Create a new rollover alias
def rollover_alias_put(alias_name, alias_definition)
logger.info("Creating rollover alias #{alias_name}")
begin
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if e.response_code == 400
logger.info("Rollover Alias #{alias_name} already exists. Skipping")
return
end
raise e
end
end

def get_xpack_info
get("/_xpack")
end

def get_ilm_endpoint
@pool.get("/_ilm/policy")
end

def ilm_policy_exists?(name)
exists?("/_ilm/policy/#{name}", true)
end

def ilm_policy_put(name, policy)
path = "_ilm/policy/#{name}"
logger.info("Installing ILM policy #{policy} to #{path}")
@pool.put(path, nil, LogStash::Json.dump(policy))
end


# Build a bulk item for an elasticsearch update action
def update_action_builder(args, source)
if args[:_script]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ def format_url(url, path_and_query=nil)
end

request_uri.query = new_query_parts.join("&") unless new_query_parts.empty?
request_uri.path = "#{request_uri.path}/#{parsed_path_and_query.path}".gsub(/\/{2,}/, "/")

request_uri.path = "#{request_uri.path}/#{parsed_path_and_query.raw_path}".gsub(/\/{2,}/, "/")

request_uri
end

Expand Down
84 changes: 84 additions & 0 deletions lib/logstash/outputs/elasticsearch/ilm.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
module LogStash; module Outputs; class ElasticSearch
module Ilm

ILM_POLICY_PATH = "default-ilm-policy.json"

def setup_ilm
return unless ilm_enabled?
@logger.info("Using Index lifecycle management - this feature is currently in beta.")
@logger.warn "Overwriting supplied index name with rollover alias #{@ilm_rollover_alias}" if @index != LogStash::Outputs::ElasticSearch::CommonConfigs::DEFAULT_INDEX_NAME
@index = ilm_rollover_alias

maybe_create_rollover_alias
maybe_create_ilm_policy
end

def ilm_enabled?
@ilm_enabled
end

def verify_ilm_readiness
return unless ilm_enabled?

# Check the Elasticsearch instance for ILM readiness - this means that the version has to be a non-OSS release, with ILM feature
# available and enabled.
begin
xpack = client.get_xpack_info
features = xpack["features"]
ilm = features.nil? ? nil : features["ilm"]
raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not installed on your Elasticsearch cluster" if features.nil? || ilm.nil?
raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not available in your Elasticsearch cluster" unless ilm['available']
raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not enabled in your Elasticsearch cluster" unless ilm['enabled']

unless ilm_policy_default? || client.ilm_policy_exists?(ilm_policy)
raise LogStash::ConfigurationError, "The specified ILM policy #{ilm_policy} does not exist on your Elasticsearch instance"
end

rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
# Check xpack endpoint: If no xpack endpoint, then this version of Elasticsearch is not compatible
if e.response_code == 404
raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not installed on your Elasticsearch cluster"
elsif e.response_code == 400
raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not installed on your Elasticsearch cluster"
else
raise e
end
end
end

private

def ilm_policy_default?
ilm_policy == LogStash::Outputs::ElasticSearch::DEFAULT_POLICY
end

def maybe_create_ilm_policy
if ilm_policy_default? && !client.ilm_policy_exists?(ilm_policy)
client.ilm_policy_put(ilm_policy, policy_payload)
end
end

def maybe_create_rollover_alias
client.rollover_alias_put(rollover_alias_target, rollover_alias_payload) unless client.rollover_alias_exists?(ilm_rollover_alias)
end

def rollover_alias_target
"<#{ilm_rollover_alias}-#{ilm_pattern}>"
end

def rollover_alias_payload
{
'aliases' => {
ilm_rollover_alias =>{
'is_write_index' => true
}
}
}
end

def policy_payload
policy_path = ::File.expand_path(ILM_POLICY_PATH, ::File.dirname(__FILE__))
LogStash::Json.load(::IO.read(policy_path))
end
end
end end end
22 changes: 21 additions & 1 deletion lib/logstash/outputs/elasticsearch/template_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ def self.install_template(plugin)
return unless plugin.manage_template
plugin.logger.info("Using mapping template from", :path => plugin.template)
template = get_template(plugin.template, plugin.maximum_seen_major_version)
add_ilm_settings_to_template(plugin, template) if plugin.ilm_enabled?
plugin.logger.info("Attempting to install template", :manage_template => template)
install(plugin.client, plugin.template_name, template, plugin.template_overwrite)
install(plugin.client, template_name(plugin), template, plugin.template_overwrite)
rescue => e
plugin.logger.error("Failed to install template.", :message => e.message, :class => e.class.name, :backtrace => e.backtrace)
end
Expand All @@ -21,6 +22,25 @@ def self.install(client, template_name, template, template_overwrite)
client.template_install(template_name, template, template_overwrite)
end

def self.add_ilm_settings_to_template(plugin, template)
plugin.logger.info("Overwriting index patterns, as ILM is enabled.")
# Overwrite any index patterns, and use the rollover alias. Use 'index_patterns' rather than 'template' for pattern
# definition - remove any existing definition of 'template'
template.delete('template') if template.include?('template')
template['index_patterns'] = "#{plugin.ilm_rollover_alias}-*"
if template['settings'] && (template['settings']['index.lifecycle.name'] || template['settings']['index.lifecycle.rollover_alias'])
plugin.logger.info("Overwriting index lifecycle name and rollover alias as ILM is enabled.")
end
template['settings'].update({ 'index.lifecycle.name' => plugin.ilm_policy, 'index.lifecycle.rollover_alias' => plugin.ilm_rollover_alias})
end

# Template name - if template_name set, use it
# if not and ILM is enabled, use the rollover alias
# else use the default value of template_name
def self.template_name(plugin)
plugin.ilm_enabled? && !plugin.original_params.key?('template_name') ? plugin.ilm_rollover_alias : plugin.template_name
end

def self.default_template_path(es_major_version)
template_version = es_major_version == 1 ? 2 : es_major_version
default_template_name = "elasticsearch-template-es#{template_version}x.json"
Expand Down
Loading