Skip to content

Buffer: Can't write already compressed MessagePack EventStream as is #4146

Closed
@daipom

Description

@daipom

Describe the bug

in_forward can receive a compressed MessagePack EventStream from out_forward.

On the in_forward side, I think there is no way to process the compressed data as is (i.e. without decompressing).
I get an unexpected error when I set compress gzip of Buffer on the in_forward side.

[warn]: #0 emit transaction failed: error_class=ArgumentError error="unknown keyword: :packer" location="/home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:297:in `to_compressed_msgpack_stream'" tag="test"
[error]: #0 unexpected error on reading data host="127.0.0.1" port=51396 error_class=ArgumentError error="unknown keyword: :packer"

To me, this looks like a bug of Buffer and CompressedMessagePackEventStream.

A possible use case is a two-stage transfer.

  • Forwarder1(out_forward) -> Forwarder2(in_forward, out_forward) -> Aggregator(in_forward)

In this case, Forwarder2 should process the data of CompressedMessagePackEventStream as is (i.e. without decompressing) and re-transfer the data to Aggregator.

However, if enabling compressing both in Forwarder1 and Forwarder2, an unexpected error occurs in Forwarder2.

Note: if disabling the compression only in Forwarder2, the data is decompressed in Forwarder2. This decompression is completely useless.

To Reproduce

Make the out_forward side Fluentd and the in_forward side Fluentd work respectively with the settings in Your Configuration.

The assumed use case is this, as above.

  • Forwarder1(out_forward) -> Forwarder2(in_forward, out_forward) -> Aggregator(in_forward)

But we can easily reproduce this error by 2 Fluentd as those settings.
(Of course, if we start 3 Fluentd like the above and enable the compression setting for both out_forward in Forwarder1 and Forwarder2, this reproduces as well)

Expected behavior

When enabling the compression of the buffer and it tries to process an already compressed MessagePack EventStream, it should process the data as is and not raise an unexpected error.

Your Environment

- Fluentd version: 1.16.0
- TD Agent version: none
- Operating system: Ubuntu 20.04.6 LTS
- Kernel version: 5.15.0-69-generic

Your Configuration

# out_forward side
<source>
  @type sample
  tag test
</source>

<match test.**>
  @type forward
  compress gzip
  <buffer>
    @type file
    path /test/fluentd/forwarder/buffer
    flush_mode interval
    flush_interval 10s
  </buffer>
  <server>
    host localhost
    port 24224
  </server>
</match>

# in_forward side
<source>
  @type forward
</source>

<match test.**>
  @type null
  <buffer>
    @type file
    path /test/fluentd/aggregator/buffer
    flush_mode interval
    flush_interval 10s
    compress gzip
  </buffer>
</match>

Your Error Log

# in_forward side
2023-04-12 13:21:37 +0900 [info]: init supervisor logger path=nil rotate_age=nil rotate_size=nil
2023-04-12 13:21:37 +0900 [info]: parsing config file is succeeded path="/test/fluentd/aggregator/fluent.conf"
2023-04-12 13:21:37 +0900 [info]: gem 'fluentd' version '1.16.0'
2023-04-12 13:21:37 +0900 [info]: using configuration file: <ROOT>
  <source>
    @type forward
  </source>
  <match test.**>
    @type null
    <buffer>
      @type "file"
      path "/test/fluentd/aggregator/buffer"
      flush_mode interval
      flush_interval 10s
      compress gzip
    </buffer>
  </match>
</ROOT>
2023-04-12 13:21:37 +0900 [info]: starting fluentd-1.16.0 pid=107781 ruby="3.2.0"
2023-04-12 13:21:37 +0900 [info]: spawn command to main:  cmdline=["/home/daipom/.rbenv/versions/3.2.0/bin/ruby", "-r/home/daipom/.rbenv/versions/3.2.0/lib/ruby/site_ruby/3.2.0/bundler/setup", "-Eascii-8bit:ascii-8bit", "/home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/bin/fluentd", "-c", "/test/fluentd/aggregator/fluent.conf", "--under-supervisor"]
2023-04-12 13:21:37 +0900 [info]: #0 init worker0 logger path=nil rotate_age=nil rotate_size=nil
2023-04-12 13:21:37 +0900 [info]: adding match pattern="test.**" type="null"
2023-04-12 13:21:37 +0900 [info]: adding source type="forward"
2023-04-12 13:21:37 +0900 [info]: #0 starting fluentd worker pid=107802 ppid=107781 worker=0
2023-04-12 13:21:37 +0900 [info]: #0 listening port port=24224 bind="0.0.0.0"
2023-04-12 13:21:37 +0900 [info]: #0 fluentd worker is now running worker=0
2023-04-12 13:21:57 +0900 [warn]: #0 emit transaction failed: error_class=ArgumentError error="unknown keyword: :packer" location="/home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:297:in `to_compressed_msgpack_stream'" tag="test"
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:297:in `to_compressed_msgpack_stream'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1017:in `block in <class:Output>'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:670:in `block in write_once'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/3.2.0/monitor.rb:202:in `synchronize'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/3.2.0/monitor.rb:202:in `mon_synchronize'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:661:in `write_once'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:345:in `block in write'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:343:in `each'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:343:in `write'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1095:in `block in handle_stream_simple'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:977:in `write_guard'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1094:in `handle_stream_simple'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:967:in `execute_chunking'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:897:in `emit_buffered'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event_router.rb:115:in `emit_stream'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:318:in `on_message'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:226:in `block in handle_connection'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:263:in `block (3 levels) in read_messages'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:262:in `feed_each'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:262:in `block (2 levels) in read_messages'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:271:in `block in read_messages'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/server.rb:632:in `on_read_without_connection'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:123:in `on_readable'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:186:in `on_readable'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in `run_once'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in `run'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/event_loop.rb:93:in `block in start'
  2023-04-12 13:21:57 +0900 [warn]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2023-04-12 13:21:57 +0900 [error]: #0 unexpected error on reading data host="127.0.0.1" port=51396 error_class=ArgumentError error="unknown keyword: :packer"
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:297:in `to_compressed_msgpack_stream'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1017:in `block in <class:Output>'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:670:in `block in write_once'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/3.2.0/monitor.rb:202:in `synchronize'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/3.2.0/monitor.rb:202:in `mon_synchronize'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:661:in `write_once'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:345:in `block in write'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:343:in `each'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/buffer.rb:343:in `write'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1095:in `block in handle_stream_simple'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:977:in `write_guard'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:1094:in `handle_stream_simple'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:967:in `execute_chunking'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/output.rb:897:in `emit_buffered'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event_router.rb:115:in `emit_stream'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:318:in `on_message'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:226:in `block in handle_connection'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:263:in `block (3 levels) in read_messages'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:262:in `feed_each'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:262:in `block (2 levels) in read_messages'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:271:in `block in read_messages'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/server.rb:632:in `on_read_without_connection'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:123:in `on_readable'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:186:in `on_readable'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in `run_once'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.0/lib/ruby/gems/3.2.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in `run'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/event_loop.rb:93:in `block in start'
  2023-04-12 13:21:57 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
^C2023-04-12 13:22:03 +0900 [info]: Received graceful stop
2023-04-12 13:22:04 +0900 [info]: #0 fluentd worker is now stopping worker=0
2023-04-12 13:22:04 +0900 [info]: #0 shutting down fluentd worker worker=0
2023-04-12 13:22:04 +0900 [info]: #0 shutting down input plugin type=:forward plugin_id="object:be0"
2023-04-12 13:22:04 +0900 [info]: #0 shutting down output plugin type=:null plugin_id="object:bb8"
2023-04-12 13:22:04 +0900 [info]: Worker 0 finished with status 0

Additional context

No response

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions