Skip to content

Commit 919e91b

Browse files
Transition filter_parser from single record to stream
Signed-off-by: Athish Pranav D <athishanna@gmail.com>
1 parent 7f13c7a commit 919e91b

File tree

1 file changed

+59
-63
lines changed

1 file changed

+59
-63
lines changed

lib/fluent/plugin/filter_parser.rb

Lines changed: 59 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -57,78 +57,74 @@ def configure(conf)
5757
FAILED_RESULT = [nil, nil].freeze # reduce allocation cost
5858
REPLACE_CHAR = '?'.freeze
5959

60-
def filter_with_time(tag, time, record)
61-
raw_value = @accessor.call(record)
62-
if raw_value.nil?
63-
if @emit_invalid_record_to_error
64-
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
65-
end
66-
if @reserve_data
67-
return time, handle_parsed(tag, record, time, {})
68-
else
69-
return FAILED_RESULT
70-
end
71-
end
72-
begin
73-
# Note: https://github.com/fluent/fluentd/issues/4100
74-
# If the parser returns multiple records from one raw_value,
75-
# this returns only the first one record.
76-
# This should be fixed in the future version.
77-
result_time = nil
78-
result_record = nil
79-
80-
@parser.parse(raw_value) do |t, values|
81-
if values
82-
t = if @reserve_time
83-
time
60+
def filter_stream(tag, es)
61+
new_es = Fluent::MultiEventStream.new
62+
es.each do |time, record|
63+
begin
64+
raw_value = @accessor.call(record)
65+
if raw_value.nil?
66+
if @emit_invalid_record_to_error
67+
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
68+
end
69+
if @reserve_data
70+
new_es.add(time, handle_parsed(tag, record, time, {}))
71+
end
72+
next
73+
end
74+
begin
75+
result_time = nil
76+
result_record = nil
77+
78+
@parser.parse(raw_value) do |t, values|
79+
if values
80+
t = if @reserve_time
81+
time
82+
else
83+
t.nil? ? time : t
84+
end
85+
@accessor.delete(record) if @remove_key_name_field
86+
r = handle_parsed(tag, record, t, values)
87+
88+
if result_record.nil?
89+
result_time = t
90+
result_record = r
8491
else
85-
t.nil? ? time : t
92+
if @emit_invalid_record_to_error
93+
router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new(
94+
"Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'"
95+
))
96+
end
8697
end
87-
@accessor.delete(record) if @remove_key_name_field
88-
r = handle_parsed(tag, record, t, values)
89-
90-
if result_record.nil?
91-
result_time = t
92-
result_record = r
93-
else
94-
if @emit_invalid_record_to_error
95-
router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new(
96-
"Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'"
97-
))
98+
else
99+
if @emit_invalid_record_to_error
100+
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'"))
101+
end
102+
103+
next unless @reserve_data
104+
next unless result_record.nil?
105+
106+
result_time = time
107+
result_record = handle_parsed(tag, record, time, {})
98108
end
109+
new_es.add(result_time, result_record)
99110
end
100-
else
111+
112+
rescue Fluent::Plugin::Parser::ParserError => e
101113
if @emit_invalid_record_to_error
102-
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'"))
114+
raise e
103115
end
116+
rescue ArgumentError => e
117+
raise unless @replace_invalid_sequence
118+
raise unless e.message.index("invalid byte sequence in") == 0
104119

105-
next unless @reserve_data
106-
next unless result_record.nil?
107-
108-
result_time = time
109-
result_record = handle_parsed(tag, record, time, {})
120+
raw_value = raw_value.scrub(REPLACE_CHAR)
121+
retry
122+
rescue => e
123+
if @emit_invalid_record_to_error
124+
raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
125+
end
110126
end
111127
end
112-
113-
return result_time, result_record
114-
rescue Fluent::Plugin::Parser::ParserError => e
115-
if @emit_invalid_record_to_error
116-
raise e
117-
else
118-
return FAILED_RESULT
119-
end
120-
rescue ArgumentError => e
121-
raise unless @replace_invalid_sequence
122-
raise unless e.message.index("invalid byte sequence in") == 0
123-
124-
raw_value = raw_value.scrub(REPLACE_CHAR)
125-
retry
126-
rescue => e
127-
if @emit_invalid_record_to_error
128-
raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
129-
else
130-
return FAILED_RESULT
131-
end
132128
end
133129
end
134130

0 commit comments

Comments
 (0)