Skip to content

Commit

Permalink
out_splunk: Handle cmetrics' metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
  • Loading branch information
cosmo0920 authored and edsiper committed Oct 21, 2022
1 parent 68aba41 commit ce7dd79
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 6 deletions.
1 change: 1 addition & 0 deletions include/fluent-bit/flb_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <cmetrics/cmt_encode_prometheus.h>
#include <cmetrics/cmt_encode_prometheus_remote_write.h>
#include <cmetrics/cmt_encode_msgpack.h>
#include <cmetrics/cmt_encode_splunk_hec.h>

/* Metrics IDs for general purpose (used by core and Plugins */
#define FLB_METRIC_N_RECORDS 0
Expand Down
67 changes: 61 additions & 6 deletions plugins/out_splunk/splunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_ra_key.h>
#include <fluent-bit/flb_metrics.h>

#include <msgpack.h>
#include "splunk.h"
Expand Down Expand Up @@ -308,6 +309,45 @@ static inline int pack_event_key(struct flb_splunk *ctx, msgpack_packer *mp_pck,
return 0;
}

#ifdef FLB_HAVE_METRICS
static inline int splunk_metrics_format(struct flb_output_instance *ins,
const void *in_buf, size_t in_bytes,
char **out_buf, size_t *out_size,
struct flb_splunk *ctx)
{
int ret;
size_t off = 0;
cfl_sds_t text;
cfl_sds_t host;
struct cmt *cmt = NULL;

if (ctx->event_host != NULL) {
host = ctx->event_host;
}
else {
host = "localhost";
}

/* get cmetrics context */
ret = cmt_decode_msgpack_create(&cmt, (char *) in_buf, in_bytes, &off);
if (ret != 0) {
flb_plg_error(ins, "could not process metrics payload");
return -1;
}

/* convert to text representation */
text = cmt_encode_splunk_hec_create(cmt, host, ctx->event_index, ctx->event_source, ctx->event_sourcetype);

/* destroy cmt context */
cmt_destroy(cmt);

*out_buf = text;
*out_size = flb_sds_len(text);

return 0;
}
#endif

static inline int splunk_format(const void *in_buf, size_t in_bytes,
char *tag, int tag_len,
char **out_buf, size_t *out_size,
Expand Down Expand Up @@ -530,12 +570,24 @@ static void cb_splunk_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* Convert binary logs into a JSON payload */
ret = splunk_format(event_chunk->data,
event_chunk->size,
(char *) event_chunk->tag,
flb_sds_len(event_chunk->tag),
&buf_data, &buf_size, ctx);
#ifdef FLB_HAVE_METRICS
/* Check if the event type is metrics, handle the payload differently */
if (event_chunk->type == FLB_EVENT_TYPE_METRICS) {
ret = splunk_metrics_format(ctx->ins,
event_chunk->data,
event_chunk->size,
&buf_data, &buf_size, ctx);
}
#endif
if (event_chunk->type == FLB_EVENT_TYPE_LOGS) {
/* Convert binary logs into a JSON payload */
ret = splunk_format(event_chunk->data,
event_chunk->size,
(char *) event_chunk->tag,
flb_sds_len(event_chunk->tag),
&buf_data, &buf_size, ctx);
}

if (ret == -1) {
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_ERROR);
Expand Down Expand Up @@ -815,6 +867,9 @@ struct flb_output_plugin out_splunk_plugin = {
.cb_exit = cb_splunk_exit,
.config_map = config_map,
.workers = 2,
#ifdef FLB_HAVE_METRICS
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS,
#endif

/* for testing */
.test_formatter.callback = cb_splunk_format_test,
Expand Down

0 comments on commit ce7dd79

Please sign in to comment.