Skip to content

Commit

Permalink
in_opentelemetry: use ctraces to decode binary payload (fluent#6180)
Browse files Browse the repository at this point in the history
Signed-off-by: Aditya Prajapati <aditya@calyptia.com>
Signed-off-by: Manal Geries <mgeriesa@gmail.com>
  • Loading branch information
Aditya Prajapati authored and mgeriesa committed Oct 25, 2022
1 parent 6d31079 commit 5119b4c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 6 deletions.
8 changes: 6 additions & 2 deletions plugins/in_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_opentelemetry, successful_response_code),
"Set successful response code. 200, 201 and 204 are supported."
},

{
FLB_CONFIG_MAP_BOOL, "raw_traces", "false",
0, FLB_TRUE, offsetof(struct flb_opentelemetry, raw_traces),
"Forward traces without processing"
},

/* EOF */
{0}
Expand All @@ -193,5 +197,5 @@ struct flb_input_plugin in_opentelemetry_plugin = {
.cb_exit = in_opentelemetry_exit,
.config_map = config_map,
.flags = FLB_INPUT_NET | FLB_IO_OPT_TLS,
.event_type = FLB_INPUT_LOGS | FLB_INPUT_METRICS
.event_type = FLB_INPUT_LOGS
};
1 change: 1 addition & 0 deletions plugins/in_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct flb_opentelemetry {
flb_sds_t listen;
flb_sds_t tcp_port;
const char *tag_key;
bool raw_traces;

size_t buffer_max_size; /* Maximum buffer size */
size_t buffer_chunk_size; /* Chunk allocation size */
Expand Down
48 changes: 44 additions & 4 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,33 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co
return 0;
}

static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
{
struct ctrace *decoded_context;
size_t offset;
int result;

offset = 0;
result = ctr_decode_opentelemetry_create(&decoded_context,
request->data.data,
request->data.len,
&offset);
if (result == 0) {
ctx->ins->event_type = FLB_INPUT_TRACES;
result = flb_input_trace_append(ctx->ins, NULL, 0, decoded_context);
ctr_decode_opentelemetry_destroy(decoded_context);
}

return result;
}

static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
{
int ret;
int root_type;
Expand Down Expand Up @@ -173,6 +196,23 @@ static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_con
return 0;
}

static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
{
int result;

if (ctx->raw_traces) {
result = process_payload_raw_traces(ctx, conn, tag, session, request);
}
else {
result = process_payload_traces_proto(ctx, conn, tag, session, request);
}

return result;
}

static inline int mk_http_point_header(mk_ptr_t *h,
struct mk_http_parser *parser, int key)
{
Expand Down

0 comments on commit 5119b4c

Please sign in to comment.