Skip to content

Commit 64977b3

Browse files
Add additional entry in the record for parsed result
Signed-off-by: Athish Pranav D <athishanna@gmail.com>
1 parent d02a383 commit 64977b3

File tree

3 files changed

+99
-5
lines changed

3 files changed

+99
-5
lines changed

include/fluent-bit/flb_msgpack_append_message.h

+8
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,12 @@ int flb_msgpack_append_message_to_record(char **result_buffer,
3636
char *message_buffer,
3737
size_t message_size,
3838
int message_type);
39+
40+
int flb_msgpack_append_map_to_record(char **result_buffer,
41+
size_t *result_size,
42+
flb_sds_t message_key_name,
43+
char *base_object_buffer,
44+
size_t base_object_size,
45+
char *map_data,
46+
size_t map_size);
3947
#endif

plugins/filter_parser/filter_parser.c

+33-5
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@
2828
#include <fluent-bit/flb_kv.h>
2929
#include <fluent-bit/flb_log_event_decoder.h>
3030
#include <fluent-bit/flb_log_event_encoder.h>
31+
#include <fluent-bit/flb_msgpack_append_message.h>
3132
#include <msgpack.h>
3233

3334
#include <string.h>
3435
#include <fluent-bit.h>
35-
#include <time.h>
3636

3737
#include "filter_parser.h"
3838

@@ -186,6 +186,8 @@ static int cb_parser_filter(const void *data, size_t bytes,
186186
int key_len;
187187
const char *val_str;
188188
int val_len;
189+
char *parsed_buf;
190+
size_t parsed_size;
189191
char *out_buf;
190192
size_t out_size;
191193
struct flb_time parsed_time;
@@ -229,6 +231,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
229231
&log_decoder,
230232
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
231233
out_buf = NULL;
234+
parsed_buf = NULL;
232235
append_arr_i = 0;
233236

234237
flb_time_copy(&tm, &log_event.timestamp);
@@ -276,7 +279,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
276279
flb_time_zero(&parsed_time);
277280

278281
parse_ret = flb_parser_do(fp->parser, val_str, val_len,
279-
(void **) &out_buf, &out_size,
282+
(void **) &parsed_buf, &parsed_size,
280283
&parsed_time);
281284
if (parse_ret >= 0) {
282285
/*
@@ -320,13 +323,13 @@ static int cb_parser_filter(const void *data, size_t bytes,
320323
&log_encoder, log_event.metadata);
321324
}
322325

323-
if (out_buf != NULL) {
326+
if (parsed_buf != NULL) {
327+
324328
if (ctx->reserve_data) {
325329
char *new_buf = NULL;
326330
int new_size;
327331
int ret;
328-
329-
ret = flb_msgpack_expand_map(out_buf, out_size,
332+
ret = flb_msgpack_expand_map(parsed_buf, parsed_size,
330333
append_arr, append_arr_len,
331334
&new_buf, &new_size);
332335
if (ret == -1) {
@@ -339,6 +342,30 @@ static int cb_parser_filter(const void *data, size_t bytes,
339342
return FLB_FILTER_NOTOUCH;
340343
}
341344

345+
out_buf = new_buf;
346+
out_size = new_size;
347+
}
348+
else {
349+
out_buf = strdup(parsed_buf);
350+
out_size = parsed_size;
351+
}
352+
if (ctx->hash_value_field) {
353+
char *new_buf = NULL;
354+
size_t new_size;
355+
int ret;
356+
ret = flb_msgpack_append_map_to_record(&new_buf, &new_size,
357+
flb_sds_create("parsed"),
358+
out_buf, out_size,
359+
parsed_buf,parsed_size);
360+
if ( ret != FLB_MAP_EXPAND_SUCCESS){
361+
flb_plg_error(ctx->ins, "cannot append parsed entry to record");
362+
363+
flb_log_event_decoder_destroy(&log_decoder);
364+
flb_log_event_encoder_destroy(&log_encoder);
365+
flb_free(append_arr);
366+
367+
return FLB_FILTER_NOTOUCH;
368+
}
342369
flb_free(out_buf);
343370
out_buf = new_buf;
344371
out_size = new_size;
@@ -351,6 +378,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
351378
}
352379

353380
flb_free(out_buf);
381+
flb_free(parsed_buf);
354382
ret = FLB_FILTER_MODIFIED;
355383
}
356384
else {

src/flb_msgpack_append_message.c

+58
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,61 @@ int flb_msgpack_append_message_to_record(char **result_buffer,
8080

8181
return result;
8282
}
83+
84+
int flb_msgpack_append_map_to_record(char **result_buffer,
85+
size_t *result_size,
86+
flb_sds_t message_key_name,
87+
char *base_object_buffer,
88+
size_t base_object_size,
89+
char *map_data,
90+
size_t map_size)
91+
{
92+
msgpack_unpacked unpacker;
93+
msgpack_object_kv *new_map_entries[1];
94+
msgpack_object_kv message_entry;
95+
char *modified_data_buffer;
96+
int modified_data_size;
97+
size_t off = 0;
98+
int i;
99+
int result = FLB_MAP_NOT_MODIFIED;
100+
*result_buffer = NULL;
101+
*result_size = 0;
102+
103+
if (message_key_name == NULL || map_data == NULL){
104+
return result;
105+
}
106+
107+
new_map_entries[0] = &message_entry;
108+
109+
message_entry.key.type = MSGPACK_OBJECT_STR;
110+
message_entry.key.via.str.size = flb_sds_len(message_key_name);
111+
message_entry.key.via.str.ptr = message_key_name;
112+
113+
msgpack_unpacked_init(&unpacker);
114+
if ((i=msgpack_unpack_next(&unpacker, map_data, map_size, &off)) !=
115+
MSGPACK_UNPACK_SUCCESS ) {
116+
msgpack_unpacked_destroy(&unpacker);
117+
return FLB_MAP_EXPANSION_ERROR;
118+
}
119+
if (unpacker.data.type != MSGPACK_OBJECT_MAP) {
120+
msgpack_unpacked_destroy(&unpacker);
121+
return FLB_MAP_EXPANSION_ERROR;
122+
}
123+
124+
message_entry.val = unpacker.data;
125+
result = flb_msgpack_expand_map(base_object_buffer,
126+
base_object_size,
127+
new_map_entries, 1,
128+
&modified_data_buffer,
129+
&modified_data_size);
130+
if (result == 0) {
131+
result = FLB_MAP_EXPAND_SUCCESS;
132+
*result_buffer = modified_data_buffer;
133+
*result_size = modified_data_size;
134+
}
135+
else {
136+
result = FLB_MAP_EXPANSION_ERROR;
137+
}
138+
139+
return result;
140+
}

0 commit comments

Comments
 (0)