Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add Grafana Loki JSON format #1853

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
pack: add json_loki format
This allows to use the existing HTTP output to push logs into Grafana
Loki. This uses the JSON format for pushing logs into Loki, as using
protobuf would require more dependencies. (cf. #994)

Signed-off-by: Christian Simon <simon@swine.de>
  • Loading branch information
simonswine committed Jan 6, 2020
commit ddf88671b852121e3f436f95905aa5cc2452b687
1 change: 1 addition & 0 deletions include/fluent-bit/flb_pack.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#define FLB_PACK_JSON_FORMAT_JSON 1
#define FLB_PACK_JSON_FORMAT_STREAM 2
#define FLB_PACK_JSON_FORMAT_LINES 3
#define FLB_PACK_JSON_FORMAT_LOKI 4

struct flb_pack_state {
int multiple; /* support multiple jsons? */
Expand Down
4 changes: 3 additions & 1 deletion plugins/out_http/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ static int http_post(struct flb_out_http *ctx,
if ((ctx->out_format == FLB_PACK_JSON_FORMAT_JSON) ||
(ctx->out_format == FLB_PACK_JSON_FORMAT_STREAM) ||
(ctx->out_format == FLB_PACK_JSON_FORMAT_LINES) ||
(ctx->out_format == FLB_PACK_JSON_FORMAT_LOKI) ||
(ctx->out_format == FLB_HTTP_OUT_GELF)) {
flb_http_add_header(c,
FLB_HTTP_CONTENT_TYPE,
Expand Down Expand Up @@ -283,7 +284,8 @@ static void cb_http_flush(const void *data, size_t bytes,

if ((ctx->out_format == FLB_PACK_JSON_FORMAT_JSON) ||
(ctx->out_format == FLB_PACK_JSON_FORMAT_STREAM) ||
(ctx->out_format == FLB_PACK_JSON_FORMAT_LINES)) {
(ctx->out_format == FLB_PACK_JSON_FORMAT_LINES) ||
(ctx->out_format == FLB_PACK_JSON_FORMAT_LOKI)) {

json = flb_pack_msgpack_to_json_format(data, bytes,
ctx->out_format,
Expand Down
134 changes: 98 additions & 36 deletions src/flb_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,9 @@ int flb_pack_to_json_format_type(const char *str)
else if (strcasecmp(str, "json_lines") == 0) {
return FLB_PACK_JSON_FORMAT_LINES;
}
else if (strcasecmp(str, "json_loki") == 0) {
return FLB_PACK_JSON_FORMAT_LOKI;
}

return -1;
}
Expand Down Expand Up @@ -717,6 +720,10 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes,
msgpack_object *obj;
struct tm tm;
struct flb_time tms;
const char *message_key = "message";
const char *stream_key = "stream";
const char *streams_key = "streams";
const char *values_key = "values";

if (!date_key) {
return NULL;
Expand Down Expand Up @@ -752,7 +759,12 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes,
* [T, M]...
* ]
*/
if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
if (json_format == FLB_PACK_JSON_FORMAT_LOKI) {
msgpack_pack_map(&tmp_pck, 1);
msgpack_pack_str(&tmp_pck, strlen(streams_key));
msgpack_pack_str_body(&tmp_pck, streams_key, strlen(streams_key));
}
if (json_format == FLB_PACK_JSON_FORMAT_JSON || json_format == FLB_PACK_JSON_FORMAT_LOKI) {
msgpack_pack_array(&tmp_pck, records);
}

Expand All @@ -770,50 +782,100 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes,
/* Get the record/map */
map = root.via.array.ptr[1];
map_size = map.via.map.size;
msgpack_pack_map(&tmp_pck, map_size + 1);

/* Append date key */
msgpack_pack_str(&tmp_pck, flb_sds_len(date_key));
msgpack_pack_str_body(&tmp_pck, date_key, flb_sds_len(date_key));
if (json_format == FLB_PACK_JSON_FORMAT_LOKI){
const char *message = "";
size_t message_len;
size_t labels_size = map_size;
// try to find message in record
for (i = 0; i < map_size; i++) {
msgpack_object *k = &map.via.map.ptr[i].key;
if (k->type != MSGPACK_OBJECT_STR || strncasecmp(k->via.str.ptr,message_key, k->via.str.size) != 0)
continue;
msgpack_object *v = &map.via.map.ptr[i].val;
if (v->type != MSGPACK_OBJECT_STR)
continue;
message = v->via.str.ptr;
message_len = v->via.str.size;
labels_size --;
}

/* Append date value */
switch (date_format) {
case FLB_PACK_JSON_DATE_DOUBLE:
msgpack_pack_double(&tmp_pck, flb_time_to_double(&tms));
break;
case FLB_PACK_JSON_DATE_ISO8601:
/* Format the time, use microsecond precision not nanoseconds */
gmtime_r(&tms.tm.tv_sec, &tm);
s = strftime(time_formatted, sizeof(time_formatted) - 1,
FLB_PACK_JSON_DATE_ISO8601_FMT, &tm);

len = snprintf(time_formatted + s,
sizeof(time_formatted) - 1 - s,
".%06" PRIu64 "Z",
(uint64_t) tms.tm.tv_nsec / 1000);
s += len;
msgpack_pack_str(&tmp_pck, s);
msgpack_pack_str_body(&tmp_pck, time_formatted, s);
break;
case FLB_PACK_JSON_DATE_EPOCH:
msgpack_pack_uint64(&tmp_pck, (long long unsigned)(tms.tm.tv_sec));
break;
}
/* copy labels into stream object */
// TODO: this should be deduplicated
msgpack_pack_map(&tmp_pck, 2);
msgpack_pack_str(&tmp_pck, strlen(stream_key));
msgpack_pack_str_body(&tmp_pck, stream_key, strlen(stream_key));
msgpack_pack_map(&tmp_pck, labels_size);
for (i = 0; i < map_size; i++) {
msgpack_object *k = &map.via.map.ptr[i].key;
if (k->type == MSGPACK_OBJECT_STR && strncasecmp(k->via.str.ptr,message_key, k->via.str.size) == 0)
continue;
msgpack_object *v = &map.via.map.ptr[i].val;
msgpack_pack_object(&tmp_pck, *k);
msgpack_pack_object(&tmp_pck, *v);
}

/* Append remaining keys/values */
for (i = 0; i < map_size; i++) {
msgpack_object *k = &map.via.map.ptr[i].key;
msgpack_object *v = &map.via.map.ptr[i].val;
/* insert timestamp and message into values */
msgpack_pack_str(&tmp_pck, strlen(values_key));
msgpack_pack_str_body(&tmp_pck, values_key, strlen(values_key));
msgpack_pack_array(&tmp_pck, 1);
msgpack_pack_array(&tmp_pck, 2);

len = snprintf(time_formatted,
sizeof(time_formatted) - 1,
"%lli%09li",
(long long unsigned) tms.tm.tv_sec,
(long unsigned) tms.tm.tv_nsec);
msgpack_pack_str(&tmp_pck, len);
msgpack_pack_str_body(&tmp_pck, time_formatted, len);

msgpack_pack_str(&tmp_pck, message_len);
msgpack_pack_str_body(&tmp_pck, message, message_len);
} else {
msgpack_pack_map(&tmp_pck, map_size + 1);
/* Append date key */
msgpack_pack_str(&tmp_pck, flb_sds_len(date_key));
msgpack_pack_str_body(&tmp_pck, date_key, flb_sds_len(date_key));

/* Append date value */
switch (date_format) {
case FLB_PACK_JSON_DATE_DOUBLE:
msgpack_pack_double(&tmp_pck, flb_time_to_double(&tms));
break;
case FLB_PACK_JSON_DATE_ISO8601:
/* Format the time, use microsecond precision not nanoseconds */
gmtime_r(&tms.tm.tv_sec, &tm);
s = strftime(time_formatted, sizeof(time_formatted) - 1,
FLB_PACK_JSON_DATE_ISO8601_FMT, &tm);

len = snprintf(time_formatted + s,
sizeof(time_formatted) - 1 - s,
".%06" PRIu64 "Z",
(uint64_t) tms.tm.tv_nsec / 1000);
s += len;
msgpack_pack_str(&tmp_pck, s);
msgpack_pack_str_body(&tmp_pck, time_formatted, s);
break;
case FLB_PACK_JSON_DATE_EPOCH:
msgpack_pack_uint64(&tmp_pck, (long long unsigned)(tms.tm.tv_sec));
break;
}

msgpack_pack_object(&tmp_pck, *k);
msgpack_pack_object(&tmp_pck, *v);
/* Append remaining keys/values */
for (i = 0; i < map_size; i++) {
msgpack_object *k = &map.via.map.ptr[i].key;
msgpack_object *v = &map.via.map.ptr[i].val;

msgpack_pack_object(&tmp_pck, *k);
msgpack_pack_object(&tmp_pck, *v);
}
}

/*
* If the format is the original msgpack style, just continue since
* we don't care about separator or JSON convertion at this point.
*/
if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
if (json_format == FLB_PACK_JSON_FORMAT_JSON || json_format == FLB_PACK_JSON_FORMAT_LOKI) {
continue;
}

Expand Down Expand Up @@ -882,7 +944,7 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes,
msgpack_unpacked_destroy(&result);

/* Format to JSON */
if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
if (json_format == FLB_PACK_JSON_FORMAT_JSON || json_format == FLB_PACK_JSON_FORMAT_LOKI) {
out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
msgpack_sbuffer_destroy(&tmp_sbuf);

Expand Down