Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/neuron/define.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#define NEU_DEFAULT_GROUP_INTERVAL 100
#define NEU_NODE_NAME_LEN 128
#define NEU_PLUGIN_NAME_LEN 32
#define NEU_NODE_TAGS_LEN 256
#define NEU_PLUGIN_LIBRARY_LEN 64
#define NEU_PLUGIN_DESCRIPTION_LEN 512
#define NEU_TEMPLATE_NAME_LEN 128
Expand Down
2 changes: 2 additions & 0 deletions include/neuron/errcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ typedef enum {
NEU_ERR_NODE_NOT_ALLOW_UPDATE = 2013,
NEU_ERR_NODE_NOT_ALLOW_MAP = 2014,
NEU_ERR_NODE_NAME_EMPTY = 2015,
NEU_ERR_NODE_TAGS_TOO_LONG = 2016,
NEU_ERR_NODE_TAGS_TOO_MANY = 2017,

NEU_ERR_GROUP_ALREADY_SUBSCRIBED = 2101,
NEU_ERR_GROUP_NOT_SUBSCRIBE = 2102,
Expand Down
15 changes: 15 additions & 0 deletions include/neuron/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ typedef enum neu_reqresp_type {
NEU_RESP_NODE_UNINIT,
NEU_REQ_ADD_NODE,
NEU_REQ_UPDATE_NODE,
NEU_REQ_UPDATE_NODE_TAG,
NEU_REQ_DEL_NODE,
NEU_REQ_GET_NODE,
NEU_RESP_GET_NODE,
Expand Down Expand Up @@ -242,6 +243,7 @@ static const char *neu_reqresp_type_string_t[] = {
[NEU_RESP_NODE_UNINIT] = "NEU_RESP_NODE_UNINIT",
[NEU_REQ_ADD_NODE] = "NEU_REQ_ADD_NODE",
[NEU_REQ_UPDATE_NODE] = "NEU_REQ_UPDATE_NODE",
[NEU_REQ_UPDATE_NODE_TAG] = "NEU_REQ_UPDATE_NODE_TAG",
[NEU_REQ_DEL_NODE] = "NEU_REQ_DEL_NODE",
[NEU_REQ_GET_NODE] = "NEU_REQ_GET_NODE",
[NEU_RESP_GET_NODE] = "NEU_RESP_GET_NODE",
Expand Down Expand Up @@ -522,18 +524,26 @@ typedef struct neu_req_add_node {
char node[NEU_NODE_NAME_LEN];
char plugin[NEU_PLUGIN_NAME_LEN];
char *setting;
char *tags;
} neu_req_add_node_t;

static inline void neu_req_add_node_fini(neu_req_add_node_t *req)
{
free(req->setting);
if (req->tags)
free(req->tags);
}

typedef struct neu_req_update_node {
char node[NEU_NODE_NAME_LEN];
char new_name[NEU_NODE_NAME_LEN];
} neu_req_update_node_t;

typedef struct neu_req_update_node_tag {
char node[NEU_NODE_NAME_LEN];
char tags[NEU_NODE_TAGS_LEN];
} neu_req_update_node_tag_t;

typedef struct neu_req_del_node {
char node[NEU_NODE_NAME_LEN];
} neu_req_del_node_t;
Expand All @@ -557,6 +567,7 @@ typedef struct neu_resp_node_info {
int64_t delay;
char node[NEU_NODE_NAME_LEN];
char plugin[NEU_PLUGIN_NAME_LEN];
char tags[NEU_NODE_TAGS_LEN];
} neu_resp_node_info_t;

typedef struct neu_resp_get_node {
Expand Down Expand Up @@ -1148,6 +1159,7 @@ typedef struct {
char * node;
char * plugin;
char * setting;
char * tags;
uint16_t n_group;
neu_gdatatag_t *groups;
} neu_req_driver_t;
Expand All @@ -1157,6 +1169,9 @@ static inline void neu_req_driver_fini(neu_req_driver_t *req)
free(req->node);
free(req->plugin);
free(req->setting);
if (req->tags)
free(req->tags);

for (uint16_t i = 0; i < req->n_group; i++) {
for (int j = 0; j < req->groups[i].n_tag; j++) {
neu_tag_fini(&req->groups[i].tags[j]);
Expand Down
5 changes: 5 additions & 0 deletions include/neuron/persist/persist.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ typedef struct {
int type;
char *plugin_name;
int state;
char *tags;
} neu_persist_node_info_t;

typedef struct {
Expand Down Expand Up @@ -125,6 +126,9 @@ static inline void neu_persist_node_info_fini(neu_persist_node_info_t *info)
{
free(info->name);
free(info->plugin_name);
if (info->tags) {
free(info->tags);
}
}

static inline void neu_persist_group_info_fini(neu_persist_group_info_t *info)
Expand Down Expand Up @@ -250,6 +254,7 @@ int neu_persister_delete_node(const char *node_name);
* @return 0 on success, none-zero on failure
*/
int neu_persister_update_node(const char *node_name, const char *new_name);
int neu_persister_update_node_tags(const char *node_name, const char *tags);

/**
* Update node state.
Expand Down
6 changes: 6 additions & 0 deletions persistence/0114_2.14.0_modbus_simulator.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
BEGIN TRANSACTION;

CREATE TABLE IF NOT EXISTS modbus_tcp_simulator (
id INTEGER PRIMARY KEY CHECK (id = 1),
enabled INTEGER NOT NULL DEFAULT 0,
tags_json TEXT NOT NULL DEFAULT '[]',
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);

alter TABLE nodes add column tags TEXT NULL;

COMMIT;
63 changes: 63 additions & 0 deletions plugins/restful/adapter_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@
#include "utils/utextend.h"
#include "utils/uthash.h"

static int tags_length(const char *tags)
{
int length = 0;
char *tag = NULL;
char *tmp = strdup(tags);
char *save = NULL;

tag = strtok_r(tmp, ",", &save);
while (tag != NULL) {
length++;
tag = strtok_r(NULL, ",", &save);
}

free(tmp);
return length;
}

void handle_add_adapter(nng_aio *aio)
{
neu_plugin_t *plugin = neu_rest_get_plugin();
Expand All @@ -45,6 +62,10 @@ void handle_add_adapter(nng_aio *aio)
aio, neu_json_add_node_req_t, neu_json_decode_add_node_req, {
if (strlen(req->name) >= NEU_NODE_NAME_LEN) {
CHECK_NODE_NAME_LENGTH_ERR;
} else if (req->tags != NULL && tags_length(req->tags) > 5) {
NEU_JSON_RESPONSE_ERROR(NEU_ERR_NODE_TAGS_TOO_MANY, {
neu_http_response(aio, error_code.error, result_error);
});
} else if (strcmp(req->name, "monitor") == 0 ||
strcmp(req->name, "DataStorage") == 0) {
CHECK_NODE_SINGLE_ERR;
Expand All @@ -59,7 +80,9 @@ void handle_add_adapter(nng_aio *aio)
strcpy(cmd.node, req->name);
strcpy(cmd.plugin, req->plugin);
cmd.setting = req->setting;
cmd.tags = req->tags;
req->setting = NULL; // moved
req->tags = NULL; // moved
ret = neu_plugin_op(plugin, header, &cmd);
if (ret != 0) {
neu_req_add_node_fini(&cmd);
Expand Down Expand Up @@ -236,6 +259,7 @@ void handle_get_adapter_resp(nng_aio *aio, neu_resp_get_node_t *nodes)

nodes_res.nodes[index].name = info->node;
nodes_res.nodes[index].plugin = info->plugin;
nodes_res.nodes[index].tags = info->tags;
nodes_res.nodes[index].support_import_tags =
neu_plugin_support_import_tags_simple(info->plugin);
}
Expand Down Expand Up @@ -501,6 +525,12 @@ static int send_drivers(nng_aio *aio, neu_json_drivers_req_t *req)
ret = NEU_ERR_NODE_NAME_TOO_LONG;
goto check_end;
}
if (driver->node.tags != NULL &&
strlen(driver->node.tags) >= NEU_NODE_TAGS_LEN) {
ret = NEU_ERR_NODE_TAGS_TOO_LONG;
goto check_end;
}

if (strlen(driver->node.plugin) >= NEU_PLUGIN_NAME_LEN) {
ret = NEU_ERR_PLUGIN_NAME_TOO_LONG;
goto check_end;
Expand Down Expand Up @@ -592,9 +622,11 @@ static int send_drivers(nng_aio *aio, neu_json_drivers_req_t *req)
cmd.drivers[i].plugin = driver->node.plugin;
cmd.drivers[i].setting = driver->node.setting;
cmd.drivers[i].n_group = driver->gtags.len;
cmd.drivers[i].tags = driver->node.tags;
driver->node.name = NULL; // moved
driver->node.plugin = NULL; // moved
driver->node.setting = NULL; // moved
driver->node.tags = NULL; // moved
cmd.n_driver += 1;

ret = json_to_gdatatags(&driver->gtags, &cmd.drivers[i].groups);
Expand Down Expand Up @@ -623,3 +655,34 @@ void handle_put_drivers(nng_aio *aio)
}
})
}

void handle_put_node_tag(nng_aio *aio)
{
neu_plugin_t *plugin = neu_rest_get_plugin();

NEU_PROCESS_HTTP_REQUEST_VALIDATE_JWT(
aio, neu_json_update_node_tag_req_t,
neu_json_decode_update_node_tag_req, {
int ret = 0;
neu_reqresp_head_t header = { 0 };
neu_req_update_node_tag_t cmd = { 0 };

if (req->tags != NULL && tags_length(req->tags) > 5) {
NEU_JSON_RESPONSE_ERROR(NEU_ERR_NODE_TAGS_TOO_MANY, {
neu_http_response(aio, error_code.error, result_error);
});
return;
}

header.ctx = aio;
header.type = NEU_REQ_UPDATE_NODE_TAG;
strcpy(cmd.node, req->name);
strcpy(cmd.tags, req->tags);
ret = neu_plugin_op(plugin, header, &cmd);
if (ret != 0) {
NEU_JSON_RESPONSE_ERROR(NEU_ERR_IS_BUSY, {
neu_http_response(aio, NEU_ERR_IS_BUSY, result_error);
});
}
})
}
1 change: 1 addition & 0 deletions plugins/restful/adapter_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ void handle_get_nodes_state_resp(nng_aio * aio,
neu_resp_get_nodes_state_t *states);

void handle_put_drivers(nng_aio *aio);
void handle_put_node_tag(nng_aio *aio);

#endif
4 changes: 4 additions & 0 deletions plugins/restful/global_config_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,7 @@ static int get_drivers_resp(context_t *ctx, neu_resp_get_node_t *resp)
neu_resp_node_info_t *info = utarray_eltptr(nodes, i);
drivers.drivers[i].node.name = info->node;
drivers.drivers[i].node.plugin = info->plugin;
drivers.drivers[i].node.tags = info->tags;
}

end:
Expand Down Expand Up @@ -1654,6 +1655,7 @@ static int get_apps_resp(context_t *ctx, neu_resp_get_node_t *resp)
neu_resp_node_info_t *info = utarray_eltptr(nodes, i);
apps.apps[i].node.name = info->node;
apps.apps[i].node.plugin = info->plugin;
apps.apps[i].node.tags = info->tags;
}

end:
Expand Down Expand Up @@ -1767,7 +1769,9 @@ static int add_app_node(context_t *ctx, neu_json_app_t *app)
strcpy(cmd.node, app->node.name);
strcpy(cmd.plugin, app->node.plugin);
cmd.setting = app->node.setting;
cmd.tags = app->node.tags;
app->node.setting = NULL;
app->node.tags = NULL;

if (0 != neu_plugin_op(plugin, header, &cmd)) {
return NEU_ERR_IS_BUSY;
Expand Down
6 changes: 6 additions & 0 deletions plugins/restful/handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,12 @@ static struct neu_http_handler rest_handlers[] = {
.url = "/api/v2/simulator/export",
.value.handler = handle_simulator_export,
},
{
.method = NEU_HTTP_METHOD_PUT,
.type = NEU_HTTP_HANDLER_FUNCTION,
.url = "/api/v2/node/tag",
.value.handler = handle_put_node_tag,
},
};

void neu_rest_handler(const struct neu_http_handler **handlers, uint32_t *size)
Expand Down
20 changes: 20 additions & 0 deletions src/adapter/driver/driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -1397,6 +1397,8 @@ int neu_adapter_driver_write_tags(neu_adapter_driver_t *driver,
group_t *g = find_group(driver, cmd->group);

if (g == NULL) {
nlog_warn("driver write tags group not exist, driver: %s, group: %s",
driver->adapter.name, cmd->group);
driver->adapter.cb_funs.driver.write_response(&driver->adapter, req,
NEU_ERR_GROUP_NOT_EXIST);
return NEU_ERR_GROUP_NOT_EXIST;
Expand All @@ -1417,6 +1419,9 @@ int neu_adapter_driver_write_tags(neu_adapter_driver_t *driver,
cmd->tags[i].value.value.d64,
cmd->tags[i].value.type, tag->decimal);
} else {
nlog_warn("driver write tags not exist, driver: %s, group: %s, "
"tag: %s",
driver->adapter.name, cmd->group, cmd->tags[i].tag);
value_check = NEU_ERR_TAG_NOT_EXIST;
}

Expand Down Expand Up @@ -1450,6 +1455,8 @@ int neu_adapter_driver_write_tags(neu_adapter_driver_t *driver,
}

if (value_err != NEU_ERR_SUCCESS) {
nlog_warn("driver write tags value error(%d), driver: %s, group: %s",
value_err, driver->adapter.name, cmd->group);
driver->adapter.cb_funs.driver.write_response(&driver->adapter, req,
value_err);
utarray_foreach(tags, neu_plugin_tag_value_t *, tv)
Expand All @@ -1463,6 +1470,9 @@ int neu_adapter_driver_write_tags(neu_adapter_driver_t *driver,
}

if (utarray_len(tags) != (unsigned int) cmd->n_tag) {
nlog_warn("driver write tags some tag not exist, driver: %s, group: "
"%s",
driver->adapter.name, cmd->group);
driver->adapter.cb_funs.driver.write_response(&driver->adapter, req,
NEU_ERR_TAG_NOT_EXIST);
utarray_foreach(tags, neu_plugin_tag_value_t *, tv)
Expand Down Expand Up @@ -1662,18 +1672,25 @@ int neu_adapter_driver_write_tag(neu_adapter_driver_t *driver,
group_t * g = find_group(driver, cmd->group);

if (g == NULL) {
nlog_warn("driver write tag group not exist, driver: %s, group: %s",
driver->adapter.name, cmd->group);
driver->adapter.cb_funs.driver.write_response(&driver->adapter, req,
NEU_ERR_GROUP_NOT_EXIST);
return NEU_ERR_GROUP_NOT_EXIST;
}
neu_datatag_t *tag = neu_group_find_tag(g->group, cmd->tag);

if (tag == NULL) {
nlog_warn("driver write tag not exist, driver: %s, group: %s, tag: %s",
driver->adapter.name, cmd->group, cmd->tag);
driver->adapter.cb_funs.driver.write_response(&driver->adapter, req,
NEU_ERR_TAG_NOT_EXIST);
return NEU_ERR_TAG_NOT_EXIST;
} else {
if ((tag->attribute & NEU_ATTRIBUTE_WRITE) != NEU_ATTRIBUTE_WRITE) {
nlog_warn("driver write tag not allow write, driver: %s, group: "
"%s, tag: %s",
driver->adapter.name, cmd->group, cmd->tag);
driver->adapter.cb_funs.driver.write_response(
&driver->adapter, req, NEU_ERR_PLUGIN_TAG_NOT_ALLOW_WRITE);
neu_tag_free(tag);
Expand All @@ -1685,6 +1702,9 @@ int neu_adapter_driver_write_tag(neu_adapter_driver_t *driver,
cmd->value.type, tag->decimal);

if (value_check != NEU_ERR_SUCCESS) {
nlog_warn("driver write tag value error(%d), driver: %s, group: "
"%s, tag: %s",
value_check, driver->adapter.name, cmd->group, cmd->tag);
driver->adapter.cb_funs.driver.write_response(&driver->adapter, req,
value_check);
neu_tag_free(tag);
Expand Down
1 change: 1 addition & 0 deletions src/base/msg_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ extern "C" {
XX(NEU_RESP_NODE_UNINIT, neu_resp_node_uninit_t) \
XX(NEU_REQ_ADD_NODE, neu_req_add_node_t) \
XX(NEU_REQ_UPDATE_NODE, neu_req_update_node_t) \
XX(NEU_REQ_UPDATE_NODE_TAG, neu_req_update_node_tag_t) \
XX(NEU_REQ_DEL_NODE, neu_req_del_node_t) \
XX(NEU_REQ_GET_NODE, neu_req_get_node_t) \
XX(NEU_RESP_GET_NODE, neu_resp_get_node_t) \
Expand Down
Loading