Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add inject_message to the Heka output plugin API #241

Merged
merged 1 commit into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

cmake_minimum_required(VERSION 3.6 FATAL_ERROR)
project(luasandbox VERSION 1.3.4 LANGUAGES C)
project(luasandbox VERSION 1.4.0 LANGUAGES C)

set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "Generic Lua sandbox for dynamic data analysis")
set(CPACK_PACKAGE_VERSION_MAJOR ${PROJECT_VERSION_MAJOR})
Expand Down
22 changes: 17 additions & 5 deletions docs/heka/output.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ Converts a Heka protobuf encoded message string into a Lua table. See
### encode_message

Returns a Heka protocol buffer message using the contents of the specified Lua
table. `Logger` and `Hostname` are restricted header values. An override
configuration option is provided `restricted_headers`; when true the headers are
always set to the configuration values; when false (default) the headers are set
to the values provide in the message table, if no value is provided it defaults
to the appropriate configuration value.
table. `Timestamp`, `Logger`, `Hostname` and `Pid` are restricted header values.
An override configuration option is provided `restricted_headers`; when true the
headers are always set to the configuration values; when false (default) the
headers are set to the values provided in the message table, if no value is
provided it defaults to the appropriate configuration value.

Note: this operation uses the internal output buffer so it is goverened by the
`output_limit` configuration setting.
Expand All @@ -95,6 +95,18 @@ Note: this operation uses the internal output buffer so it is goverened by the
* heka_pb (string) - Heka protobuf binary string, framed as specified
or an error is thrown

### inject_message

Creates a new Heka protocol buffer message, in the input queue, using the
contents of the specified Lua table. The `restricted_headers` configuration
defaults to false (see encode_message above for a full description).

*Arguments*
* msg ([Heka message table](message.md))

*Return*
* none (throws an error if the table does not match the Heka message schema)

### create_message_matcher

Returns a Heka protocol buffer message matcher; used to dynamically filter
Expand Down
24 changes: 24 additions & 0 deletions include/luasandbox/heka/sandbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,30 @@ lsb_heka_sandbox* lsb_heka_create_output(void *parent,
lsb_logger *logger,
lsb_heka_update_checkpoint ucp);

/**
* Create a sandbox supporting the Heka Output Plugin API with
* inject_message support
*
* @param parent Opaque pointer the host object owning this sandbox
* @param lua_file Fully qualified path to the Lua source file
* @param state_file Fully qualified filename to the state preservation file
* (NULL if no preservation is required)
* @param lsb_cfg Full configuration string as a Lua table (NULL for lsb
* defaults)
* @param logger Struct for error reporting/debug printing (NULL to disable)
* @param ucp checkpoint_updated callback when using batch or async output
* @param im inject_message callback
*
* @return lsb_heka_sandbox* On success a pointer to the sandbox otherwise NULL
*/
LSB_HEKA_EXPORT
lsb_heka_sandbox* lsb_heka_create_output_im(void *parent,
const char *lua_file,
const char *state_file,
const char *lsb_cfg,
lsb_logger *logger,
lsb_heka_update_checkpoint ucp,
lsb_heka_im_analysis im);
/**
* Host access to the output sandbox process_message API
*
Expand Down
27 changes: 23 additions & 4 deletions src/heka/sandbox.c
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ static int update_checkpoint(lua_State *lua)
// fall thru
case 1:
luaL_checktype(lua, 1, LUA_TLIGHTUSERDATA);
result = hsb->cb.ucp(hsb->parent, lua_touserdata(lua, 1));
result = hsb->ucp(hsb->parent, lua_touserdata(lua, 1));
break;
case 0: // batch case
result = hsb->cb.ucp(hsb->parent, NULL);
result = hsb->ucp(hsb->parent, NULL);
break;
default:
return luaL_error(lua, "%s() invalid number of args: %d", __func__, n);
Expand Down Expand Up @@ -876,6 +876,19 @@ lsb_heka_sandbox* lsb_heka_create_output(void *parent,
const char *lsb_cfg,
lsb_logger *logger,
lsb_heka_update_checkpoint ucp)
{
return lsb_heka_create_output_im(parent, lua_file, state_file, lsb_cfg,
logger, ucp, NULL);
}


lsb_heka_sandbox* lsb_heka_create_output_im(void *parent,
const char *lua_file,
const char *state_file,
const char *lsb_cfg,
lsb_logger *logger,
lsb_heka_update_checkpoint ucp,
lsb_heka_im_analysis im)
{
if (!lua_file) {
if (logger && logger->cb) {
Expand All @@ -892,7 +905,6 @@ lsb_heka_sandbox* lsb_heka_create_output(void *parent,
return NULL;
}


lsb_heka_sandbox *hsb = calloc(1, sizeof(lsb_heka_sandbox));
if (!hsb) {
if (logger && logger->cb) {
Expand All @@ -904,7 +916,8 @@ lsb_heka_sandbox* lsb_heka_create_output(void *parent,
hsb->type = 'o';
hsb->parent = parent;
hsb->msg = NULL;
hsb->cb.ucp = ucp;
hsb->ucp = ucp;
hsb->cb.aim = im;
hsb->name = NULL;
hsb->hostname = NULL;

Expand All @@ -922,6 +935,11 @@ lsb_heka_sandbox* lsb_heka_create_output(void *parent,
lsb_add_function(hsb->lsb, heka_encode_message, "encode_message");
lsb_add_function(hsb->lsb, update_checkpoint, LSB_HEKA_UPDATE_CHECKPOINT);
lsb_add_function(hsb->lsb, mm_create, "create_message_matcher");
if (im) {
lsb_add_function(hsb->lsb, inject_message_analysis, "inject_message");
// inject_payload is intentionally excluded from output plugins
// you can construct whatever you need with inject_message
}

// start io.write override with zero copy functionality
lua_getfield(lua, LUA_REGISTRYINDEX, "_PRELOADED");
Expand Down Expand Up @@ -950,6 +968,7 @@ lsb_heka_sandbox* lsb_heka_create_output(void *parent,
}



void lsb_heka_stop_sandbox_clean(lsb_heka_sandbox *hsb)
{
lsb_stop_sandbox_clean(hsb->lsb);
Expand Down
6 changes: 3 additions & 3 deletions src/heka/sandbox_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ struct lsb_heka_sandbox {
char *name;
char *hostname;
union {
lsb_heka_im_input iim;
lsb_heka_im_analysis aim;
lsb_heka_update_checkpoint ucp;
lsb_heka_im_input iim; // used in input plugins only
lsb_heka_im_analysis aim; // used in analysis and output plugins
} cb;
struct heka_stats stats;
char type;
bool restricted_headers;
int pid;
lsb_heka_update_checkpoint ucp; // used in output plugins only
};

#endif
25 changes: 25 additions & 0 deletions src/heka/test/lua/oim.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

require "string"

-- Table tests
local msgs = {
{Timestamp = 1, Uuid = "\001\002\003\000\000\000\000\000\000\000\000\000\000\000\000\000"},
{Pid = 2, Timestamp = 3, Uuid = "\004\005\006\000\000\000\000\000\000\000\000\000\000\000\000\000", Logger = "ignore", Hostname = "spoof"},
}

local err_msgs = {
{err = "bad argument #1 to '?' (table expected, got nil)"},
}

for i, v in ipairs(msgs) do
inject_message(v)
end

for i, v in ipairs(err_msgs) do
local ok, err = pcall(inject_message, v.msg)
if ok then error(string.format("test: %d should have failed", i)) end
assert(v.err == err, string.format("test: %d expected: %s received: %s", i, v.err, err))
end
73 changes: 66 additions & 7 deletions src/heka/test/test_heka_sandbox.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,14 @@ static int aim(void *parent, const char *pb, size_t pb_len)
struct im_result {
const char *pb;
size_t pb_len;
double cp_numeric;
const char *cp_string;
};

struct im_result results[] = {
{ .pb = "\x0a\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10\x8e\xa8\xf3\xde\x88\xb5\xb7\x93\x15\x22\x03\x61\x69\x6d\x40\x00\x4a\x07\x66\x6f\x6f\x2e\x63\x6f\x6d", .pb_len = 44, .cp_numeric = NAN, .cp_string = NULL },
{ .pb = "\x0a\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10\xbf\x97\x9c\xcc\xbe\xc2\xb7\x93\x15\x22\x03\x61\x69\x6d\x40\x00\x4a\x07\x66\x6f\x6f\x2e\x63\x6f\x6d\x00\x00\x00\x00\x0a\x10\x00\x00", .pb_len = 44, .cp_numeric = NAN, .cp_string = NULL },
{ .pb = "\x0a\x10\xea\x95\xd4\xfc\x7c\x10\x40\x95\xa8\x17\xcb\x56\x26\x91\x8c\x47\x10\xba\xf6\xd5\xf9\xfd\xce\xb7\x93\x15\x1a\x0e\x69\x6e\x6a\x65\x63\x74\x5f\x70\x61\x79\x6c\x6f\x61\x64\x22\x03\x61\x69\x6d\x32\x07\x66\x6f\x6f\x20\x62\x61\x72\x40\x00\x4a\x07\x66\x6f\x6f\x2e\x63\x6f\x6d\x52\x13\x0a\x0c\x70\x61\x79\x6c\x6f\x61\x64\x5f\x74\x79\x70\x65\x22\x03\x74\x78\x74", .pb_len = 90, .cp_numeric = NAN, .cp_string = NULL },
{ .pb = "\x0a\x10\xfd\x49\x92\x77\x02\x37\x4b\xf0\xaf\x86\x6f\x9b\x80\x26\xf4\x35\x10\xaf\xec\x9e\xa4\xd8\xcf\xb7\x93\x15\x1a\x0e\x69\x6e\x6a\x65\x63\x74\x5f\x70\x61\x79\x6c\x6f\x61\x64\x22\x03\x61\x69\x6d\x32\x07\x66\x6f\x6f\x20\x62\x61\x72\x40\x00\x4a\x07\x66\x6f\x6f\x2e\x63\x6f\x6d\x52\x13\x0a\x0c\x70\x61\x79\x6c\x6f\x61\x64\x5f\x74\x79\x70\x65\x22\x03\x64\x61\x74\x52\x14\x0a\x0c\x70\x61\x79\x6c\x6f\x61\x64\x5f\x6e\x61\x6d\x65\x22\x04\x74\x65\x73\x74", .pb_len = 112, .cp_numeric = NAN, .cp_string = NULL },
{ .pb = "\x0a\x10\x7c\x32\xd6\x23\x98\xe8\x49\x9e\xa2\xe8\x0d\x78\x84\x8e\x75\xb2\x10\xf7\xf5\xdb\x89\x88\xe4\xb7\x93\x15\x22\x03\x61\x69\x6d\x40\x00\x4a\x07\x66\x6f\x6f\x2e\x63\x6f\x6d", .pb_len = 0, .cp_numeric = NAN, .cp_string = NULL }, }; // intentionally fail on size to to test the custom return value
{ .pb = "\x0a\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10\x8e\xa8\xf3\xde\x88\xb5\xb7\x93\x15\x22\x03\x61\x69\x6d\x40\x00\x4a\x07\x66\x6f\x6f\x2e\x63\x6f\x6d", .pb_len = 44},
{ .pb = "\x0a\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10\xbf\x97\x9c\xcc\xbe\xc2\xb7\x93\x15\x22\x03\x61\x69\x6d\x40\x00\x4a\x07\x66\x6f\x6f\x2e\x63\x6f\x6d\x00\x00\x00\x00\x0a\x10\x00\x00", .pb_len = 44},
{ .pb = "\x0a\x10\xea\x95\xd4\xfc\x7c\x10\x40\x95\xa8\x17\xcb\x56\x26\x91\x8c\x47\x10\xba\xf6\xd5\xf9\xfd\xce\xb7\x93\x15\x1a\x0e\x69\x6e\x6a\x65\x63\x74\x5f\x70\x61\x79\x6c\x6f\x61\x64\x22\x03\x61\x69\x6d\x32\x07\x66\x6f\x6f\x20\x62\x61\x72\x40\x00\x4a\x07\x66\x6f\x6f\x2e\x63\x6f\x6d\x52\x13\x0a\x0c\x70\x61\x79\x6c\x6f\x61\x64\x5f\x74\x79\x70\x65\x22\x03\x74\x78\x74", .pb_len = 90},
{ .pb = "\x0a\x10\xfd\x49\x92\x77\x02\x37\x4b\xf0\xaf\x86\x6f\x9b\x80\x26\xf4\x35\x10\xaf\xec\x9e\xa4\xd8\xcf\xb7\x93\x15\x1a\x0e\x69\x6e\x6a\x65\x63\x74\x5f\x70\x61\x79\x6c\x6f\x61\x64\x22\x03\x61\x69\x6d\x32\x07\x66\x6f\x6f\x20\x62\x61\x72\x40\x00\x4a\x07\x66\x6f\x6f\x2e\x63\x6f\x6d\x52\x13\x0a\x0c\x70\x61\x79\x6c\x6f\x61\x64\x5f\x74\x79\x70\x65\x22\x03\x64\x61\x74\x52\x14\x0a\x0c\x70\x61\x79\x6c\x6f\x61\x64\x5f\x6e\x61\x6d\x65\x22\x04\x74\x65\x73\x74", .pb_len = 112},
{ .pb = "\x0a\x10\x7c\x32\xd6\x23\x98\xe8\x49\x9e\xa2\xe8\x0d\x78\x84\x8e\x75\xb2\x10\xf7\xf5\xdb\x89\x88\xe4\xb7\x93\x15\x22\x03\x61\x69\x6d\x40\x00\x4a\x07\x66\x6f\x6f\x2e\x63\x6f\x6d", .pb_len = 0}, }; // intentionally fail on size to to test the custom return value

if (cnt >= (int)(sizeof results / sizeof results[0])) {
fprintf(stderr, "tests and results are mis-matched\n");
Expand Down Expand Up @@ -225,6 +223,51 @@ static int ucp(void *parent, void *sequence_id)
}


static int oim(void *parent, const char *pb, size_t pb_len)
{
static int cnt = 0;
struct im_result {
const char *pb;
size_t pb_len;
};

struct im_result results[] = {
{ .pb = "\x0a\x10\x01\x02\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10\x01\x22\x03\x6f\x69\x6d\x40\x00\x4a\x04\x74\x65\x73\x74", .pb_len = 33 },
{ .pb = "\x0a\x10\x04\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10\x03\x22\x06\x69\x67\x6e\x6f\x72\x65\x40\x02\x4a\x05\x73\x70\x6f\x6f\x66", .pb_len = 37 }, };

if (cnt >= (int)(sizeof results / sizeof results[0])) {
fprintf(stderr, "tests and results are mis-matched\n");
return LSB_HEKA_IM_LIMIT;
}

if (parent) {
fprintf(stderr, "test: %d parent set\n", cnt);
}

if (pb_len != results[cnt].pb_len) {
fprintf(stderr, "test: %d pb len expected: %" PRIuSIZE " received: %"
PRIuSIZE "\n", cnt, results[cnt].pb_len, pb_len);
cnt++;
return 99;
}

if (memcmp(pb, results[cnt].pb, pb_len)) {
fprintf(stderr, "test: %d\nexpected: ", cnt);
for (size_t i = 0; i < results[cnt].pb_len; ++i) {
fprintf(stderr, "\\x%02hhx", results[cnt].pb[i]);
}
fprintf(stderr, "\nreceived: ");
for (size_t i = 0; i < pb_len; ++i) {
fprintf(stderr, "\\x%02hhx", pb[i]);
}
fprintf(stderr, "\n");
return 1;
}
cnt++;
return LSB_HEKA_IM_SUCCESS;
}


static char* test_api_assertion()
{
lsb_heka_message m;
Expand Down Expand Up @@ -621,6 +664,21 @@ static char* test_im_analysis()
}


static char* test_im_output()
{
lsb_heka_sandbox *hsb;
hsb = lsb_heka_create_output_im(NULL, "lua/oim.lua", NULL,
"Hostname = 'test';Logger = 'oim'",
&logger, ucp, oim);
lsb_heka_stats stats = lsb_heka_get_stats(hsb);
mu_assert(2 == stats.im_cnt, "received %llu", stats.im_cnt);
mu_assert(70 == stats.im_bytes, "received %llu", stats.im_bytes);
mu_assert(hsb, "lsb_heka_create_input failed");
e = lsb_heka_destroy_sandbox(hsb);
return NULL;
}


static char* test_encode_message()
{
lsb_heka_sandbox *hsb;
Expand Down Expand Up @@ -808,6 +866,7 @@ static char* all_tests()
mu_run_test(test_pm_output);
mu_run_test(test_im_input);
mu_run_test(test_im_analysis);
mu_run_test(test_im_output);
mu_run_test(test_encode_message);
mu_run_test(test_decode_message);
mu_run_test(test_read_message);
Expand Down