Skip to content

Commit

Permalink
Merge pull request #3449 from opensourcerouting/network-wide-transact…
Browse files Browse the repository at this point in the history
…ions

lib: fix NETCONF network-wide transactions for confd and sysrepo
  • Loading branch information
donaldsharp authored Dec 9, 2018
2 parents ea2abdc + 88a7d12 commit ca65419
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 48 deletions.
127 changes: 95 additions & 32 deletions lib/northbound_confd.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ static struct confd_daemon_ctx *dctx;
static struct confd_notification_ctx *live_ctx;
static bool confd_connected;
static struct list *confd_spoints;
static struct nb_transaction *transaction;

static void frr_confd_finish_cdb(void);
static void frr_confd_finish_dp(void);
Expand Down Expand Up @@ -270,41 +271,12 @@ frr_confd_cdb_diff_iter(confd_hkeypath_t *kp, enum cdb_iter_op cdb_op,
return ITER_RECURSE;
}

static int frr_confd_cdb_read_cb(struct thread *thread)
static int frr_confd_cdb_read_cb_prepare(int fd, int *subp, int reslen)
{
int fd = THREAD_FD(thread);
int *subp = NULL;
enum cdb_sub_notification cdb_ev;
int flags;
int reslen = 0;
struct nb_config *candidate;
struct cdb_iter_args iter_args;
int ret;

thread = NULL;
thread_add_read(master, frr_confd_cdb_read_cb, NULL, fd, &thread);

if (cdb_read_subscription_socket2(fd, &cdb_ev, &flags, &subp, &reslen)
!= CONFD_OK) {
flog_err_confd("cdb_read_subscription_socket2");
return -1;
}

/*
* Ignore CDB_SUB_ABORT and CDB_SUB_COMMIT. We'll leverage the
* northbound layer itself to abort or apply the configuration changes
* when a transaction is created.
*/
if (cdb_ev != CDB_SUB_PREPARE) {
free(subp);
if (cdb_sync_subscription_socket(fd, CDB_DONE_PRIORITY)
!= CONFD_OK) {
flog_err_confd("cdb_sync_subscription_socket");
return -1;
}
return 0;
}

candidate = nb_config_dup(running_config);

/* Iterate over all configuration changes. */
Expand Down Expand Up @@ -332,8 +304,13 @@ static int frr_confd_cdb_read_cb(struct thread *thread)
return 0;
}

ret = nb_candidate_commit(candidate, NB_CLIENT_CONFD, true, NULL, NULL);
nb_config_free(candidate);
/*
* Validate the configuration changes and allocate all resources
* required to apply them.
*/
transaction = NULL;
ret = nb_candidate_commit_prepare(candidate, NB_CLIENT_CONFD, NULL,
&transaction);
if (ret != NB_OK && ret != NB_ERR_NO_CHANGES) {
enum confd_errcode errcode;
const char *errmsg;
Expand All @@ -353,23 +330,109 @@ static int frr_confd_cdb_read_cb(struct thread *thread)
break;
}

/* Reject the configuration changes. */
if (cdb_sub_abort_trans(cdb_sub_sock, errcode, 0, 0, "%s",
errmsg)
!= CONFD_OK) {
flog_err_confd("cdb_sub_abort_trans");
return -1;
}
} else {
/* Acknowledge the notification. */
if (cdb_sync_subscription_socket(fd, CDB_DONE_PRIORITY)
!= CONFD_OK) {
flog_err_confd("cdb_sync_subscription_socket");
return -1;
}

/* No configuration changes. */
if (!transaction)
nb_config_free(candidate);
}

return 0;
}

static int frr_confd_cdb_read_cb_commit(int fd, int *subp, int reslen)
{
/*
* No need to process the configuration changes again as we're already
* keeping track of them in the "transaction" variable.
*/
free(subp);

/* Apply the transaction. */
if (transaction) {
struct nb_config *candidate = transaction->config;

nb_candidate_commit_apply(transaction, true, NULL);
nb_config_free(candidate);
}

/* Acknowledge the notification. */
if (cdb_sync_subscription_socket(fd, CDB_DONE_PRIORITY) != CONFD_OK) {
flog_err_confd("cdb_sync_subscription_socket");
return -1;
}

return 0;
}

static int frr_confd_cdb_read_cb_abort(int fd, int *subp, int reslen)
{
/*
* No need to process the configuration changes again as we're already
* keeping track of them in the "transaction" variable.
*/
free(subp);

/* Abort the transaction. */
if (transaction) {
struct nb_config *candidate = transaction->config;

nb_candidate_commit_abort(transaction);
nb_config_free(candidate);
}

/* Acknowledge the notification. */
if (cdb_sync_subscription_socket(fd, CDB_DONE_PRIORITY) != CONFD_OK) {
flog_err_confd("cdb_sync_subscription_socket");
return -1;
}

return 0;
}

static int frr_confd_cdb_read_cb(struct thread *thread)
{
int fd = THREAD_FD(thread);
enum cdb_sub_notification cdb_ev;
int flags;
int *subp = NULL;
int reslen = 0;

thread = NULL;
thread_add_read(master, frr_confd_cdb_read_cb, NULL, fd, &thread);

if (cdb_read_subscription_socket2(fd, &cdb_ev, &flags, &subp, &reslen)
!= CONFD_OK) {
flog_err_confd("cdb_read_subscription_socket2");
return -1;
}

switch (cdb_ev) {
case CDB_SUB_PREPARE:
return frr_confd_cdb_read_cb_prepare(fd, subp, reslen);
case CDB_SUB_COMMIT:
return frr_confd_cdb_read_cb_commit(fd, subp, reslen);
case CDB_SUB_ABORT:
return frr_confd_cdb_read_cb_abort(fd, subp, reslen);
default:
flog_err_confd("unknown CDB event");
return -1;
}
}

/* Trigger CDB subscriptions to read the startup configuration. */
static void *thread_cdb_trigger_subscriptions(void *data)
{
Expand Down
90 changes: 74 additions & 16 deletions lib/northbound_sysrepo.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ static struct thread_master *master;
static struct list *sysrepo_threads;
static sr_session_ctx_t *session;
static sr_conn_ctx_t *connection;
static struct nb_transaction *transaction;

static int frr_sr_read_cb(struct thread *thread);
static int frr_sr_write_cb(struct thread *thread);
Expand Down Expand Up @@ -232,10 +233,9 @@ static int frr_sr_process_change(struct nb_config *candidate,
return NB_OK;
}

/* Callback for changes in the running configuration. */
static int frr_sr_config_change_cb(sr_session_ctx_t *session,
const char *module_name,
sr_notif_event_t sr_ev, void *private_ctx)
static int frr_sr_config_change_cb_verify(sr_session_ctx_t *session,
const char *module_name,
bool startup_config)
{
sr_change_iter_t *it;
int ret;
Expand All @@ -244,14 +244,6 @@ static int frr_sr_config_change_cb(sr_session_ctx_t *session,
char xpath[XPATH_MAXLEN];
struct nb_config *candidate;

/*
* Ignore SR_EV_ABORT and SR_EV_APPLY. We'll leverage the northbound
* layer itself to abort or apply the configuration changes when a
* transaction is created.
*/
if (sr_ev != SR_EV_ENABLED && sr_ev != SR_EV_VERIFY)
return SR_ERR_OK;

snprintf(xpath, sizeof(xpath), "/%s:*", module_name);
ret = sr_get_changes_iter(session, xpath, &it);
if (ret != SR_ERR_OK) {
Expand Down Expand Up @@ -280,15 +272,30 @@ static int frr_sr_config_change_cb(sr_session_ctx_t *session,
return SR_ERR_INTERNAL;
}

/* Commit changes. */
ret = nb_candidate_commit(candidate, NB_CLIENT_SYSREPO, true, NULL,
NULL);
nb_config_free(candidate);
transaction = NULL;
if (startup_config) {
/*
* sysrepod sends the entire startup configuration using a
* single event (SR_EV_ENABLED). This means we need to perform
* the full two-phase commit protocol in one go here.
*/
ret = nb_candidate_commit(candidate, NB_CLIENT_SYSREPO, true,
NULL, NULL);
} else {
/*
* Validate the configuration changes and allocate all resources
* required to apply them.
*/
ret = nb_candidate_commit_prepare(candidate, NB_CLIENT_SYSREPO,
NULL, &transaction);
}

/* Map northbound return code to sysrepo return code. */
switch (ret) {
case NB_OK:
return SR_ERR_OK;
case NB_ERR_NO_CHANGES:
nb_config_free(candidate);
return SR_ERR_OK;
case NB_ERR_LOCKED:
return SR_ERR_LOCKED;
Expand All @@ -299,6 +306,57 @@ static int frr_sr_config_change_cb(sr_session_ctx_t *session,
}
}

static int frr_sr_config_change_cb_apply(sr_session_ctx_t *session,
const char *module_name)
{
/* Apply the transaction. */
if (transaction) {
struct nb_config *candidate = transaction->config;

nb_candidate_commit_apply(transaction, true, NULL);
nb_config_free(candidate);
}

return SR_ERR_OK;
}

static int frr_sr_config_change_cb_abort(sr_session_ctx_t *session,
const char *module_name)
{
/* Abort the transaction. */
if (transaction) {
struct nb_config *candidate = transaction->config;

nb_candidate_commit_abort(transaction);
nb_config_free(candidate);
}

return SR_ERR_OK;
}

/* Callback for changes in the running configuration. */
static int frr_sr_config_change_cb(sr_session_ctx_t *session,
const char *module_name,
sr_notif_event_t sr_ev, void *private_ctx)
{
switch (sr_ev) {
case SR_EV_ENABLED:
return frr_sr_config_change_cb_verify(session, module_name,
true);
case SR_EV_VERIFY:
return frr_sr_config_change_cb_verify(session, module_name,
false);
case SR_EV_APPLY:
return frr_sr_config_change_cb_apply(session, module_name);
case SR_EV_ABORT:
return frr_sr_config_change_cb_abort(session, module_name);
default:
flog_err(EC_LIB_LIBSYSREPO, "%s: unknown sysrepo event: %u",
__func__, sr_ev);
return SR_ERR_INTERNAL;
}
}

static int frr_sr_state_data_iter_cb(const struct lys_node *snode,
struct yang_translator *translator,
struct yang_data *data, void *arg)
Expand Down

0 comments on commit ca65419

Please sign in to comment.