Skip to content

Commit b71a18f

Browse files
committed
Implementing new Buffer APIs, especially emitting data
1 parent 681fb02 commit b71a18f

File tree

4 files changed

+243
-99
lines changed

4 files changed

+243
-99
lines changed

lib/fluent/plugin/buffer.rb

Lines changed: 158 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,49 +14,55 @@
1414
# limitations under the License.
1515
#
1616

17+
require 'monitor'
18+
1719
module Fluent
1820
module Plugin
1921
class BufferError < StandardError; end
20-
class BufferChunkLimitError < BufferError; end
21-
class BufferQueueLimitError < BufferError; end
22+
class BufferOverflowError < BufferError; end
23+
class BufferChunkOverflowError < BufferError; end # A record size is larger than chunk size limit
2224

2325
# Buffer is to define an interface for all buffer plugins.
24-
# Use BasicBuffer as a superclass for 3rd party buffer plugins.
2526

26-
DEFAULT_CHUNK_SIZE = 8 * 1024 * 1024 # 8MB for memory
27-
DEFAULT_QUEUE_LENGTH = 256 # (8MB * 256 ==) 2GB for memory
27+
DEFAULT_CHUNK_BYTES_LIMIT = 8 * 1024 * 1024 # 8MB for memory
28+
DEFAULT_QUEUE_LENGTH_LIMIT = 256 # (8MB * 256 ==) 2GB for memory
29+
30+
MINIMUM_APPEND_ATTEMPT_SIZE = 10
2831

2932
# Buffers are built on 2 element:
3033
# * stage: Array of chunks under writing, specified by metadata
3134
# * queue: FIFO list of chunks, which are already fulfilled, and to be flushed
3235
# Queue of a Buffer instance is shared by variations of metadata
3336
class Buffer
3437
include Configurable
38+
include MonitorMixin
3539

3640
config_section :buffer, param_name: :buffer_config, required: false, multi: false do
37-
config_param :chunk_size, :size, default: DEFAULT_CHUNK_SIZE
38-
config_param :total_size, :size, default: DEFAULT_CHUNK_SIZE * DEFAULT_QUEUE_LENGTH
41+
config_param :chunk_bytes_limit, :size, default: DEFAULT_CHUNK_BYTES_LIMIT
42+
config_param :total_bytes_limit, :size, default: DEFAULT_CHUNK_BYTES_LIMIT * DEFAULT_QUEUE_LENGTH_LIMIT
3943

4044
config_param :flush_interval, :time, default: nil
4145

4246
# If user specify this value and (chunk_size * queue_length) is smaller than total_size,
4347
# then total_size is automatically configured to that value
44-
config_param :queue_length, :integer, default: nil
48+
config_param :queue_length_limit, :integer, default: nil
4549

4650
# optional new limitations
47-
config_param :chunk_records, :integer, default: nil
51+
config_param :chunk_records_limit, :integer, default: nil
4852

4953
# TODO: pipeline mode? to flush ASAP after emit
5054
end
5155

56+
Metadata = Struct.new(:timekey, :tag, :variables)
57+
5258
def initialize(logger)
5359
super()
5460
@log = logger
5561

56-
@chunk_size = nil
62+
@chunk_size_limit = nil
5763
@chunk_records = nil
5864

59-
@total_size = nil
65+
@total_size_limit = nil
6066
@queue_length = nil
6167

6268
@flush_interval = nil
@@ -66,34 +72,92 @@ def configure(conf)
6672
super
6773

6874
if @buffer_config
69-
@chunk_size = @buffer_config.chunk_size
70-
@chunk_records = @buffer_config.chunk_records
71-
@total_size = @buffer_config.total_size
72-
@queue_length = @buffer_config.queue_length
73-
if @queue_length && @total_size > @chunk_size * @queue_length
74-
@total_size = @chunk_size * @queue_length
75+
@chunk_bytes_limit = @buffer_config.chunk_bytes_limit
76+
@total_bytes_limit = @buffer_config.total_bytes_limit
77+
78+
@chunk_records_limit = @buffer_config.chunk_records_limit
79+
80+
@queue_length_limit = @buffer_config.queue_length_limit
81+
if @queue_length_limit && @total_bytes_limit > @chunk_bytes_limit * @queue_length_limit
82+
@total_bytes_limit = @chunk_bytes_limit * @queue_length_limit
7583
end
7684
@flush_interval = @buffer_config.flush_interval
7785
else
78-
@chunk_size = DEFAULT_CHUNK_SIZE
79-
@total_size = DEFAULT_CHUNK_SIZE * DEFAULT_QUEUE_LENGTH
80-
@queue_length = DEFAULT_QUEUE_LENGTH
86+
@chunk_bytes_limit = DEFAULT_CHUNK_BYTES_LIMIT
87+
@total_bytes_limit = DEFAULT_CHUNK_BYTES_LIMIT * DEFAULT_QUEUE_LENGTH_LIMIT
88+
@queue_length_limit = DEFAULT_QUEUE_LENGTH_LIMIT
8189
end
8290
end
8391

84-
def allow_concurrent_pop?
92+
def start
93+
super
94+
@stage, @queue = resume
95+
@queue.extend(MonitorMixin)
96+
97+
@stage_size = @queue_size = 0
98+
@metadata_list = [] # keys of @stage
99+
end
100+
101+
def storable?
102+
@total_size_limit > @stage_size + @queue_size
103+
end
104+
105+
def used?(ratio)
106+
@total_size_limit * ratio > @stage_size + @queue_size
107+
end
108+
109+
def resume
85110
raise NotImplementedError, "Implement this method in child class"
86111
end
87112

88-
def start
89-
super
113+
def metadata(key_value_pairs={})
114+
timekey = key_value_pairs.delete(:timekey)
115+
tag = key_value_pairs.delete(:tag)
116+
variables = key_value_pairs.keys.sort.map{|k| key_value_pairs[k] }
117+
118+
meta = Metadata.new(timekey, tag, variables)
119+
synchronize do
120+
if i = @metadata_list.index(meta)
121+
@metadata_list[i]
122+
else
123+
@metadata_list << meta
124+
meta
125+
end
126+
end
90127
end
91128

92-
def emit(data, metadata)
129+
# metadata MUST have consistent object_id for each variation
130+
# data MUST be Array of serialized events
131+
def emit(metadata, data)
132+
return if data.size < 1
133+
raise BufferOverflowError unless storable?
134+
135+
stored = false
136+
data_size = data.size
137+
138+
# the case whole data can be stored in staged chunk: almost all emits will success
139+
chunk = synchronize { @stage[metadata] ||= generate_chunk(metadata) }
140+
chunk.synchronize do
141+
begin
142+
chunk.append(data)
143+
unless size_over?(chunk)
144+
chunk.commit
145+
stored = true
146+
end
147+
ensure
148+
chunk.rollback
149+
end
150+
end
151+
return if stored
152+
153+
emit_step_by_step(metadata, data)
154+
end
155+
156+
def generate_chunk(metadata)
93157
raise NotImplementedError, "Implement this method in child class"
94158
end
95159

96-
def enqueue_chunk(key)
160+
def enqueue_chunk(metadata)
97161
raise NotImplementedError, "Implement this method in child class"
98162
end
99163

@@ -113,15 +177,84 @@ def stop
113177
end
114178

115179
def before_shutdown(out)
180+
# at here, buffer may be flushed w/ flush_at_shutdown
116181
end
117182

118183
def shutdown
119184
end
120185

121186
def close
187+
synchronize do
188+
@queue.synchronize do
189+
until @queue.empty?
190+
@queue.shift.close
191+
end
192+
end
193+
@stage.each_pair do |key, chunk|
194+
chunk.close
195+
end
196+
end
122197
end
123198

124199
def terminate
200+
@stage = @queue = nil
201+
end
202+
203+
def size_over?(chunk)
204+
chunk.size > @chunk_bytes_limit || (@chunk_records_limit && chunk.records > @chunk_records_limit)
205+
end
206+
207+
def emit_step_by_step(metadata, data)
208+
attempt_size = data.size / 3
209+
210+
synchronize do # critical section for buffer (stage/queue)
211+
while data.size > 0
212+
if attempt_size < MINIMUM_APPEND_ATTEMPT_SIZE
213+
attempt_size = MINIMUM_APPEND_ATTEMPT_SIZE
214+
end
215+
216+
chunk = @stage[metadata]
217+
unless chunk
218+
chunk = @stage[metadata] = generate_chunk(metadata)
219+
end
220+
221+
chunk.synchronize do # critical section for chunk (chunk append/commit/rollback)
222+
begin
223+
empty_chunk = chunk.empty?
224+
225+
attempt = data.slice(0, attempt_size)
226+
chunk.append(attempt)
227+
228+
if size_over?(chunk)
229+
chunk.rollback
230+
231+
if attempt_size <= MINIMUM_APPEND_ATTEMPT_SIZE
232+
if empty_chunk # record is too large even for empty chunk
233+
raise BufferChunkOverflowError, "minimum append butch exceeds chunk bytes limit"
234+
end
235+
# no more records for this chunk -> enqueue -> to be flushed
236+
enqueue_chunk(metadata) # `chunk` will be removed from stage
237+
attempt_size = data.size # fresh chunk may have enough space
238+
else
239+
# whole data can be processed by twice operation
240+
# ( by using apttempt /= 2, 3 operations required for odd numbers of data)
241+
attempt_size = (attempt_size / 2) + 1
242+
end
243+
244+
next
245+
end
246+
247+
chunk.commit
248+
data.slice!(0, attempt_size)
249+
# same attempt size
250+
nil # discard return value of data.slice!() immediately
251+
ensure
252+
chunk.rollback
253+
end
254+
end
255+
end
256+
end
257+
nil
125258
end
126259
end
127260
end

lib/fluent/plugin/buffer/chunk.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
# limitations under the License.
1515
#
1616

17+
require 'monitor'
18+
1719
module Fluent
1820
module Plugin
1921
class Buffer
@@ -22,7 +24,7 @@ class Chunk
2224

2325
# Chunks has 2 part:
2426
# * metadata: contains metadata which should be restored after resume (if possible)
25-
# v: [metadata_variable] (required)
27+
# v: [metadata_variable, ...] (required)
2628
# t: tag as string (optional)
2729
# k: time slice key (optional)
2830
#

0 commit comments

Comments
 (0)