Skip to content

Commit 90c8501

Browse files
Transition filter_parser from single record to stream
Signed-off-by: Athish Pranav D <athishanna@gmail.com>
1 parent 7f13c7a commit 90c8501

File tree

1 file changed

+33
-49
lines changed

1 file changed

+33
-49
lines changed

lib/fluent/plugin/filter_parser.rb

Lines changed: 33 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -57,26 +57,32 @@ def configure(conf)
5757
FAILED_RESULT = [nil, nil].freeze # reduce allocation cost
5858
REPLACE_CHAR = '?'.freeze
5959

60-
def filter_with_time(tag, time, record)
61-
raw_value = @accessor.call(record)
62-
if raw_value.nil?
63-
if @emit_invalid_record_to_error
64-
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
65-
end
66-
if @reserve_data
67-
return time, handle_parsed(tag, record, time, {})
68-
else
69-
return FAILED_RESULT
60+
def filter_stream(tag, es)
61+
new_es = Fluent::MultiEventStream.new
62+
es.each do |time, record|
63+
begin
64+
raw_value = @accessor.call(record)
65+
if raw_value.nil?
66+
if @emit_invalid_record_to_error
67+
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
68+
end
69+
if @reserve_data
70+
new_es.add(time, handle_parsed(tag, record, time, {}))
71+
end
72+
next
73+
end
74+
filter_one_record(tag, time, record, raw_value) do |result_time, result_record|
75+
new_es.add(result_time, result_record)
76+
end
7077
end
7178
end
72-
begin
73-
# Note: https://github.com/fluent/fluentd/issues/4100
74-
# If the parser returns multiple records from one raw_value,
75-
# this returns only the first one record.
76-
# This should be fixed in the future version.
77-
result_time = nil
78-
result_record = nil
79+
new_es
80+
end
81+
82+
private
7983

84+
def filter_one_record(tag, time, record, raw_value)
85+
begin
8086
@parser.parse(raw_value) do |t, values|
8187
if values
8288
t = if @reserve_time
@@ -85,55 +91,33 @@ def filter_with_time(tag, time, record)
8591
t.nil? ? time : t
8692
end
8793
@accessor.delete(record) if @remove_key_name_field
88-
r = handle_parsed(tag, record, t, values)
89-
90-
if result_record.nil?
91-
result_time = t
92-
result_record = r
93-
else
94-
if @emit_invalid_record_to_error
95-
router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new(
96-
"Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'"
97-
))
98-
end
99-
end
10094
else
10195
if @emit_invalid_record_to_error
10296
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'"))
10397
end
104-
10598
next unless @reserve_data
106-
next unless result_record.nil?
107-
108-
result_time = time
109-
result_record = handle_parsed(tag, record, time, {})
99+
t = time
100+
values = {}
110101
end
102+
yield(t, handle_parsed(tag, record, t, values))
111103
end
112104

113-
return result_time, result_record
114105
rescue Fluent::Plugin::Parser::ParserError => e
115106
if @emit_invalid_record_to_error
116-
raise e
117-
else
118-
return FAILED_RESULT
107+
router.emit_error_event(tag, time, record, e)
119108
end
120109
rescue ArgumentError => e
121-
raise unless @replace_invalid_sequence
122-
raise unless e.message.index("invalid byte sequence in") == 0
123-
124-
raw_value = raw_value.scrub(REPLACE_CHAR)
125-
retry
126-
rescue => e
127110
if @emit_invalid_record_to_error
128-
raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
129-
else
130-
return FAILED_RESULT
111+
router.emit_error_event(tag, time, record, e)
112+
end
113+
114+
if @replace_invalid_sequence
115+
raw_value = raw_value.scrub(REPLACE_CHAR)
116+
retry
131117
end
132118
end
133119
end
134120

135-
private
136-
137121
def handle_parsed(tag, record, t, values)
138122
if values && @inject_key_prefix
139123
values = Hash[values.map { |k, v| [@inject_key_prefix + k, v] }]

0 commit comments

Comments
 (0)