Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_file: Support placeholders in symlink_path parameter #2254

Merged
merged 1 commit into from
Feb 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion lib/fluent/plugin/out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class FileOutput < Output
attr_accessor :last_written_path # for tests

module SymlinkBufferMixin
def output_plugin_for_symlink=(output_plugin)
@_output_plugin_for_symlink = output_plugin
end

def symlink_path=(path)
@_symlink_path = path
end
Expand All @@ -83,7 +87,7 @@ def generate_chunk(metadata)
# These chunks will be enqueued immediately, and will be flushed soon.
latest_metadata = metadata_list.select{|m| m.timekey }.sort_by(&:timekey).last
if chunk.metadata == latest_metadata
FileUtils.ln_sf(chunk.path, @_symlink_path)
FileUtils.ln_sf(chunk.path, @_output_plugin_for_symlink.extract_placeholders(@_symlink_path, chunk))
end
chunk
end
Expand Down Expand Up @@ -161,6 +165,7 @@ def configure(conf)
else
@buffer.extend SymlinkBufferMixin
@buffer.symlink_path = @symlink_path
@buffer.output_plugin_for_symlink = self
end
end

Expand Down
47 changes: 36 additions & 11 deletions test/plugin/test_out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def setup
end

TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_file#{ENV['TEST_ENV_NUMBER']}")
SYMLINK_PATH = File.expand_path("#{TMP_DIR}/current")

CONFIG = %[
path #{TMP_DIR}/out_file_test
Expand Down Expand Up @@ -663,15 +662,43 @@ def parse_system(text)
end
end

test 'symlink' do
omit "Windows doesn't support symlink" if Fluent.windows?
conf = CONFIG + %[
symlink_path #{SYMLINK_PATH}
]
symlink_path = "#{SYMLINK_PATH}"
SYMLINK_PATH = File.expand_path("#{TMP_DIR}/current")

sub_test_case 'symlink' do
test 'static symlink' do
omit "Windows doesn't support symlink" if Fluent.windows?
conf = CONFIG + %[
symlink_path #{SYMLINK_PATH}
]
symlink_path = "#{SYMLINK_PATH}"

d = create_driver(conf)
begin
run_and_check(d, symlink_path)
ensure
FileUtils.rm_rf(symlink_path)
end
end

test 'symlink with placeholders' do
omit "Windows doesn't support symlink" if Fluent.windows?
conf = %[
path #{TMP_DIR}/${tag}/out_file_test
symlink_path #{SYMLINK_PATH}-${tag}
<buffer tag,time>
</buffer>
]
symlink_path = "#{SYMLINK_PATH}-tag"

d = create_driver(conf)
begin
run_and_check(d, symlink_path)
ensure
FileUtils.rm_rf(symlink_path)
end
end

d = create_driver(conf)
begin
def run_and_check(d, symlink_path)
d.run(default_tag: 'tag') do
es = Fluent::OneEventStream.new(event_time("2011-01-02 13:14:15 UTC"), {"a"=>1})
d.feed(es)
Expand All @@ -688,8 +715,6 @@ def parse_system(text)
meta = d.instance.metadata('tag', event_time("2011-01-03 14:15:16 UTC"), {})
assert_equal d.instance.buffer.instance_eval{ @stage[meta].path }, File.readlink(symlink_path)
end
ensure
FileUtils.rm_rf(symlink_path)
end
end

Expand Down