Skip to content

Commit

Permalink
Fix cloud connect after claim (netdata#19547)
Browse files Browse the repository at this point in the history
  • Loading branch information
stelfrag authored Jan 31, 2025
1 parent 30892d4 commit d35dcc3
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 40 deletions.
11 changes: 8 additions & 3 deletions src/aclk/aclk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1002,12 +1002,15 @@ void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
aclk_add_job(query);
}

void aclk_send_node_instances()
void aclk_send_node_instances(mqtt_wss_client client)
{
struct node_instance_list *list_head = get_node_list();
struct node_instance_list *list = list_head;
if (unlikely(!list)) {
error_report("Failure to get_node_list from DB!");
sleep_usec(USEC_PER_SEC);
aclk_query_t query = aclk_query_new(SEND_NODE_INSTANCES);
aclk_add_job(query);
return;
}
while (!uuid_is_null(list->host_id)) {
Expand Down Expand Up @@ -1045,7 +1048,8 @@ void aclk_send_node_instances()
freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
aclk_add_job(query);
send_bin_msg(client, query);
aclk_query_free(query);
} else {
aclk_query_t create_query;
create_query = aclk_query_new(REGISTER_NODE);
Expand All @@ -1067,7 +1071,8 @@ void aclk_send_node_instances()
(char*)node_instance_creation.machine_guid, list->hops);

freez((void *)node_instance_creation.machine_guid);
aclk_add_job(create_query);
send_bin_msg(client, create_query);
aclk_query_free(create_query);
}
freez(list->hostname);

Expand Down
2 changes: 1 addition & 1 deletion src/aclk/aclk.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ extern struct aclk_shared_state {
void aclk_host_state_update(RRDHOST *host, int cmd, int queryable);
bool aclk_host_state_update_auto(RRDHOST *host);

void aclk_send_node_instances(void);
void aclk_send_node_instances(mqtt_wss_client client);

void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);

Expand Down
56 changes: 28 additions & 28 deletions src/database/sqlite/sqlite_aclk.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ void sanity_check(void) {
#include "../aclk_query.h"
#include "../aclk_capas.h"

static void create_node_instance_result_job(const char *machine_guid, const char *node_id)
static void create_node_instance_result_job(mqtt_wss_client client __maybe_unused, const char *machine_guid, const char *node_id)
{
nd_uuid_t host_uuid, node_uuid;

Expand All @@ -32,33 +32,33 @@ static void create_node_instance_result_job(const char *machine_guid, const char
netdata_log_error("Cannot find machine_guid provided by CreateNodeInstanceResult");
return;
}

sql_update_node_id(&host_uuid, &node_uuid);

aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
node_instance_connection_t node_state_update = {
.hops = 1,
.live = 0,
.queryable = 1,
.session_id = aclk_session_newarch,
.node_id = node_id,
.capabilities = NULL};

node_state_update.live = rrdhost_is_local(host) ? 1 : 0;
node_state_update.hops = rrdhost_ingestion_hops(host);
node_state_update.capabilities = aclk_get_node_instance_capas(host);
schedule_node_state_update(host, 5000);

CLAIM_ID claim_id = claim_id_get();
node_state_update.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);

freez((void *)node_state_update.capabilities);

query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;

aclk_add_job(query);
//
// aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
// node_instance_connection_t node_state_update = {
// .hops = 1,
// .live = 0,
// .queryable = 1,
// .session_id = aclk_session_newarch,
// .node_id = node_id,
// .capabilities = NULL};
//
// node_state_update.live = rrdhost_is_local(host) ? 1 : 0;
// node_state_update.hops = rrdhost_ingestion_hops(host);
// node_state_update.capabilities = aclk_get_node_instance_capas(host);
// schedule_node_state_update(host, 5000);
//
// CLAIM_ID claim_id = claim_id_get();
// node_state_update.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL;
// query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
//
// freez((void *)node_state_update.capabilities);
//
// query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
// query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
//
// aclk_add_job(query);
}

struct aclk_sync_config_s {
Expand Down Expand Up @@ -395,7 +395,7 @@ static void aclk_run_query(struct aclk_sync_config_s *config, aclk_query_t query
break;
case SEND_NODE_INSTANCES:
worker_is_busy(UV_EVENT_SEND_NODE_INSTANCES);
aclk_send_node_instances();
aclk_send_node_instances(config->client);
ok_to_send = false;
break;
case ALERT_START_STREAMING:
Expand All @@ -410,7 +410,7 @@ static void aclk_run_query(struct aclk_sync_config_s *config, aclk_query_t query
break;
case CREATE_NODE_INSTANCE:
worker_is_busy(UV_EVENT_CREATE_NODE_INSTANCE);
create_node_instance_result_job(query->machine_guid, query->data.node_id);
create_node_instance_result_job(config->client, query->machine_guid, query->data.node_id);
ok_to_send = false;
break;

Expand Down
14 changes: 7 additions & 7 deletions src/database/sqlite/sqlite_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,11 +420,10 @@ bool sql_set_host_label(nd_uuid_t *host_id, const char *label_key, const char *l

#define SQL_UPDATE_NODE_ID "UPDATE node_instance SET node_id = @node_id WHERE host_id = @host_id"

int sql_update_node_id(nd_uuid_t *host_id, nd_uuid_t *node_id)
void sql_update_node_id(nd_uuid_t *host_id, nd_uuid_t *node_id)
{
sqlite3_stmt *res = NULL;
RRDHOST *host = NULL;
int rc = 2;

char host_guid[GUID_LEN + 1];
uuid_unparse_lower(*host_id, host_guid);
Expand All @@ -435,25 +434,23 @@ int sql_update_node_id(nd_uuid_t *host_id, nd_uuid_t *node_id)
rrd_wrunlock();

if (!REQUIRE_DB(db_meta))
return 1;
return;

if (!PREPARE_STATEMENT(db_meta, SQL_UPDATE_NODE_ID, &res))
return 1;
return;

int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, node_id, sizeof(*node_id), SQLITE_STATIC));
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_id, sizeof(*host_id), SQLITE_STATIC));

param = 0;
rc = execute_insert(res);
int rc = sqlite3_step_monitored(res);
if (unlikely(rc != SQLITE_DONE))
error_report("Failed to store node instance information, rc = %d", rc);
rc = sqlite3_changes(db_meta);

done:
REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
return rc - 1;
}

#define SQL_SELECT_NODE_ID "SELECT node_id FROM node_instance WHERE host_id = @host_id AND node_id IS NOT NULL"
Expand Down Expand Up @@ -534,6 +531,9 @@ struct node_instance_list *get_node_list(void)
while (sqlite3_step_monitored(res) == SQLITE_ROW)
row++;

if (row == 0)
return NULL;

if (sqlite3_reset(res) != SQLITE_OK) {
error_report("Failed to reset the prepared statement while fetching node instance information");
goto failed;
Expand Down
2 changes: 1 addition & 1 deletion src/database/sqlite/sqlite_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void vacuum_database(sqlite3 *database, const char *db_alias, int threshold, int
int sql_metadata_cache_stats(int op);

int get_node_id(nd_uuid_t *host_id, nd_uuid_t *node_id);
int sql_update_node_id(nd_uuid_t *host_id, nd_uuid_t *node_id);
void sql_update_node_id(nd_uuid_t *host_id, nd_uuid_t *node_id);
struct node_instance_list *get_node_list(void);
void sql_load_node_id(RRDHOST *host);

Expand Down

0 comments on commit d35dcc3

Please sign in to comment.