Skip to content

Commit 415c401

Browse files
committed
wrap dynamic subscription functions
This patch wraps the dynamic subscription functions. We add a new handle for the subscription that contains the file descriptor where clients are supposed to send the notifications. We tried to reuse the current Subscription class for dynamic subscriptions as well and not let sysrepo create its own internal background threads. However, subscribing to yang push periodic timer creates a new thread in sysrepo anyway, so completely decoupling from the C API would be much work. This patch does not yet wrap the functions for modifying subscriptions. The new DynamicSubscription class implements pimpl idiom. The class is movable. When the subscription is moved, the destructor is called on the old object. However, the destructor is supposed to close the file descriptor. We need to make sure that the file descriptor is not closed twice. I was thinking about using std::unique_ptr to manage the file descriptor or by setting the fd to something "invalid" after the move. I found the former solution more elegant. Maybe we should refactor the Subscription class to use pimpl idiom as well. Depends-on: https://gerrit.cesnet.cz/c/CzechLight/libyang-cpp/+/8149 Change-Id: I1cee5e730fcc762b2e0a58d3d5c40bf358504397
1 parent aa78815 commit 415c401

15 files changed

+7392
-0
lines changed

CMakeLists.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,19 @@ if(BUILD_TESTING)
7676
set(fixture-test-module
7777
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_module.yang
7878
)
79+
set(fixture-dynamic-subscriptions
80+
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_module.yang
81+
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/yang/iana-if-type@2014-05-08.yang
82+
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/yang/ietf-interfaces@2018-02-20.yang
83+
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/yang/ietf-ip@2018-02-22.yang
84+
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/yang/ietf-network-instance@2019-01-21.yang
85+
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/yang/ietf-subscribed-notifications@2019-09-09.yang -e replay
86+
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/yang/ietf-yang-push@2019-09-09.yang -e on-change
87+
)
7988

8089
sysrepo_cpp_test(NAME session FIXTURE fixture-test-module)
8190
sysrepo_cpp_test(NAME subscriptions FIXTURE fixture-test-module LIBRARIES Threads::Threads)
91+
sysrepo_cpp_test(NAME subscriptions-dynamic FIXTURE fixture-dynamic-subscriptions LIBRARIES PkgConfig::SYSREPO)
8292
sysrepo_cpp_test(NAME unsafe FIXTURE fixture-test-module LIBRARIES PkgConfig::SYSREPO)
8393
endif()
8494

include/sysrepo-cpp/Session.hpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,22 @@ class Session {
136136
ExceptionHandler handler = nullptr,
137137
const std::optional<FDHandling>& callbacks = std::nullopt);
138138

139+
[[nodiscard]] DynamicSubscription yangPushPeriodic(
140+
const std::optional<std::string>& xpathFilter,
141+
std::chrono::milliseconds periodTime,
142+
const std::optional<NotificationTimeStamp>& anchorTime = std::nullopt,
143+
const std::optional<NotificationTimeStamp>& stopTime = std::nullopt);
144+
[[nodiscard]] DynamicSubscription yangPushOnChange(
145+
const std::optional<std::string>& xpathFilter,
146+
const std::optional<std::chrono::milliseconds>& dampeningPeriod = std::nullopt,
147+
SyncOnStart syncOnStart = SyncOnStart::No,
148+
const std::optional<NotificationTimeStamp>& stopTime = std::nullopt);
149+
[[nodiscard]] DynamicSubscription subscribeNotifications(
150+
const std::optional<std::string>& xpathFilter,
151+
const std::optional<std::string>& stream = std::nullopt,
152+
const std::optional<NotificationTimeStamp>& stopTime = std::nullopt,
153+
const std::optional<NotificationTimeStamp>& startTime = std::nullopt);
154+
139155
ChangeCollection getChanges(const std::string& xpath = "//.");
140156
void setErrorMessage(const std::string& msg);
141157
void setNetconfError(const NetconfErrorInfo& info);

include/sysrepo-cpp/Subscription.hpp

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,13 @@ using RpcActionCb = std::function<ErrorCode(Session session, uint32_t subscripti
189189
*/
190190
using NotifCb = std::function<void(Session session, uint32_t subscriptionId, const NotificationType type, const std::optional<libyang::DataNode> notificationTree, const NotificationTimeStamp timestamp)>;
191191

192+
/**
193+
* A callback for YANG push notification subscriptions.
194+
* @param notification The notification tree.
195+
* @param timestamp Time when the notification was generated.
196+
*/
197+
using YangPushNotifCb = std::function<void(const std::optional<libyang::DataNode> notificationTree, const NotificationTimeStamp timestamp)>;
198+
192199
/**
193200
* Exception handler type for handling exceptions thrown in user callbacks.
194201
*/
@@ -268,4 +275,46 @@ class Subscription {
268275

269276
bool m_didNacmInit;
270277
};
278+
279+
enum class SyncOnStart : bool {
280+
No,
281+
Yes,
282+
};
283+
284+
/**
285+
* @brief Manages lifetime of YANG push subscriptions.
286+
*
287+
* Users are supposed to create instances of this class via Session::yangPushPeriodic, Session::yangPushOnChange or Session::subscribeNotifications.
288+
* Whenever notified about a change (by polling the file descriptor obtained by fd() function),
289+
* there is at least one event waiting to be processed by a call to YangPushSubscription::processEvent.
290+
*
291+
* Internally, the sysrepo C library creates some background thread(s). These are used either for managing internal,
292+
* sysrepo-level module subscriptions, or for scheduling of periodic timers. These threads are fully encapsulated by
293+
* the C code, and there is no control over them from this C++ wrapper. The public interface of this class is a file
294+
* descriptor that the caller is expected to poll for readability/closing (and the subscription ID). Once the FD is
295+
* readable, invoke this class' processEvents(). There is no automatic event loop which would take care of this
296+
* functionality, and users are expected to integrate this FD into their own event handling.
297+
*/
298+
class DynamicSubscription {
299+
public:
300+
DynamicSubscription(const DynamicSubscription&) = delete;
301+
DynamicSubscription& operator=(const DynamicSubscription&) = delete;
302+
DynamicSubscription(DynamicSubscription&&) noexcept;
303+
DynamicSubscription& operator=(DynamicSubscription&&) noexcept;
304+
~DynamicSubscription();
305+
306+
int fd() const;
307+
uint64_t subscriptionId() const;
308+
std::optional<NotificationTimeStamp> replayStartTime() const;
309+
void processEvent(YangPushNotifCb cb) const;
310+
void terminate(const std::optional<std::string>& reason = std::nullopt);
311+
312+
private:
313+
DynamicSubscription(std::shared_ptr<sr_session_ctx_s> sess, int fd, uint64_t subId, const std::optional<NotificationTimeStamp>& replayStartTime = std::nullopt);
314+
315+
struct Data;
316+
std::unique_ptr<Data> m_data;
317+
318+
friend class Session;
319+
};
271320
}

src/Session.cpp

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ extern "C" {
1111
#include <sysrepo.h>
1212
#include <sysrepo/netconf_acm.h>
1313
#include <sysrepo/error_format.h>
14+
#include <sysrepo/subscribed_notifications.h>
1415
}
1516
#include <libyang-cpp/Context.hpp>
1617
#include <sysrepo-cpp/Connection.hpp>
@@ -522,6 +523,123 @@ Subscription Session::onNotification(
522523
return sub;
523524
}
524525

526+
/**
527+
* Subscribe for receiving notifications according to 'ietf-yang-push' YANG periodic subscriptions.
528+
*
529+
* Wraps `srsn_yang_push_periodic`.
530+
*
531+
* @param xpathFilter Optional XPath that filters received notification.
532+
* @param periodTime Notification period.
533+
* @param anchorTime Optional anchor time for the period. Anchor time acts as a reference point for the period.
534+
* @param stopTime Optional stop time ending the notification subscription.
535+
*
536+
* @return A YangPushSubscription handle.
537+
*/
538+
DynamicSubscription Session::yangPushPeriodic(
539+
const std::optional<std::string>& xpathFilter,
540+
std::chrono::milliseconds periodTime,
541+
const std::optional<NotificationTimeStamp>& anchorTime,
542+
const std::optional<NotificationTimeStamp>& stopTime)
543+
{
544+
int fd;
545+
uint32_t subId;
546+
auto stopSpec = stopTime ? std::optional{toTimespec(*stopTime)} : std::nullopt;
547+
auto anchorSpec = anchorTime ? std::optional{toTimespec(*anchorTime)} : std::nullopt;
548+
auto res = srsn_yang_push_periodic(m_sess.get(),
549+
toDatastore(activeDatastore()),
550+
xpathFilter ? xpathFilter->c_str() : nullptr,
551+
periodTime.count(),
552+
anchorSpec ? &anchorSpec.value() : nullptr,
553+
stopSpec ? &stopSpec.value() : nullptr,
554+
&fd,
555+
&subId);
556+
throwIfError(res, "Couldn't create yang-push periodic subscription", m_sess.get());
557+
558+
return {m_sess, fd, subId};
559+
}
560+
561+
/**
562+
* Subscribe for receiving notifications according to 'ietf-yang-push' YANG on-change subscriptions.
563+
*
564+
* Wraps `srsn_yang_push_on_change`.
565+
*
566+
* @param xpathFilter Optional XPath that filters received notification.
567+
* @param dampeningPeriod Optional dampening period.
568+
* @param syncOnStart Whether to start with a notification of the current state.
569+
* @param stopTime Optional stop time ending the notification subscription.
570+
*
571+
* @return A YangPushSubscription handle.
572+
*/
573+
DynamicSubscription Session::yangPushOnChange(
574+
const std::optional<std::string>& xpathFilter,
575+
const std::optional<std::chrono::milliseconds>& dampeningPeriod,
576+
SyncOnStart syncOnStart,
577+
const std::optional<NotificationTimeStamp>& stopTime)
578+
{
579+
int fd;
580+
uint32_t subId;
581+
auto stopSpec = stopTime ? std::optional{toTimespec(*stopTime)} : std::nullopt;
582+
auto res = srsn_yang_push_on_change(m_sess.get(),
583+
toDatastore(activeDatastore()),
584+
xpathFilter ? xpathFilter->c_str() : nullptr,
585+
dampeningPeriod ? dampeningPeriod->count() : 0,
586+
syncOnStart == SyncOnStart::Yes,
587+
nullptr,
588+
stopSpec ? &stopSpec.value() : nullptr,
589+
0,
590+
nullptr,
591+
&fd,
592+
&subId);
593+
throwIfError(res, "Couldn't create yang-push on-change subscription", m_sess.get());
594+
595+
return {m_sess, fd, subId};
596+
}
597+
598+
/**
599+
* Subscribe for receiving notifications according to 'ietf-subscribed-notifications'
600+
*
601+
* Wraps `srsn_subscribe.
602+
*
603+
* @param xpathFilter Optional XPath that filters received notification.
604+
* @param stream Optional stream to subscribe to.
605+
* @param stopTime Optional stop time ending the subscription.
606+
* @param startTime Optional start time of the subscription, used for replaying stored notifications.
607+
*
608+
* @return A YangPushSubscription handle.
609+
*/
610+
DynamicSubscription Session::subscribeNotifications(
611+
const std::optional<std::string>& xpathFilter,
612+
const std::optional<std::string>& stream,
613+
const std::optional<NotificationTimeStamp>& stopTime,
614+
const std::optional<NotificationTimeStamp>& startTime)
615+
{
616+
int fd;
617+
uint32_t subId;
618+
auto stopSpec = stopTime ? std::optional{toTimespec(*stopTime)} : std::nullopt;
619+
auto startSpec = startTime ? std::optional{toTimespec(*startTime)} : std::nullopt;
620+
struct timespec replayStartSpec;
621+
622+
auto res = srsn_subscribe(m_sess.get(),
623+
stream ? stream->c_str() : nullptr,
624+
xpathFilter ? xpathFilter->c_str() : nullptr,
625+
stopSpec ? &stopSpec.value() : nullptr,
626+
startSpec ? &startSpec.value() : nullptr,
627+
false,
628+
nullptr,
629+
&replayStartSpec,
630+
&fd,
631+
&subId);
632+
633+
throwIfError(res, "Couldn't create notification subscription", m_sess.get());
634+
635+
std::optional<NotificationTimeStamp> replayStart;
636+
if (replayStartSpec.tv_sec != 0) {
637+
replayStart = toTimePoint(replayStartSpec);
638+
}
639+
640+
return {m_sess, fd, subId, replayStart};
641+
}
642+
525643
/**
526644
* Returns a collection of changes based on an `xpath`. Use "//." to get a full change subtree.
527645
*

src/Subscription.cpp

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
extern "C" {
1212
#include <sysrepo.h>
1313
#include <sysrepo/netconf_acm.h>
14+
#include <sysrepo/subscribed_notifications.h>
1415
}
1516
#include "utils/enum.hpp"
1617
#include "utils/exception.hpp"
@@ -410,4 +411,104 @@ bool ChangeIterator::operator==(const ChangeIterator& other) const
410411
// And then either both contain nothing or contain the same thing.
411412
(!this->m_current.has_value() || this->m_current->node == other.m_current->node);
412413
}
414+
415+
416+
struct DynamicSubscription::Data {
417+
std::shared_ptr<sr_session_ctx_s> sess;
418+
int fd;
419+
uint64_t subId;
420+
std::optional<NotificationTimeStamp> m_replayStartTime;
421+
bool m_terminated;
422+
423+
Data(std::shared_ptr<sr_session_ctx_s> sess, int fd, uint64_t subId, const std::optional<NotificationTimeStamp>& replayStartTime, bool terminated);
424+
~Data();
425+
void terminate(const std::optional<std::string>& reason = std::nullopt);
426+
};
427+
428+
DynamicSubscription::DynamicSubscription(std::shared_ptr<sr_session_ctx_s> sess, int fd, uint64_t subId, const std::optional<NotificationTimeStamp>& replayStartTime)
429+
: m_data(std::make_unique<Data>(std::move(sess), fd, subId, replayStartTime, false))
430+
{
431+
}
432+
433+
DynamicSubscription::DynamicSubscription(DynamicSubscription&&) noexcept = default;
434+
DynamicSubscription& DynamicSubscription::operator=(DynamicSubscription&&) noexcept = default;
435+
DynamicSubscription::~DynamicSubscription() = default;
436+
437+
/** @brief Returns the file descriptor associated with this subscription. */
438+
int DynamicSubscription::fd() const
439+
{
440+
return m_data->fd;
441+
}
442+
443+
/** @brief Returns the subscription ID associated with this subscription. */
444+
uint64_t DynamicSubscription::subscriptionId() const
445+
{
446+
return m_data->subId;
447+
}
448+
449+
/** @brief Returns the actual start time of replayed notification subscription, if available. */
450+
std::optional<NotificationTimeStamp> DynamicSubscription::replayStartTime() const
451+
{
452+
return m_data->m_replayStartTime;
453+
}
454+
455+
/** @brief Terminates the subscription.
456+
*
457+
* Wraps `srsn_terminate`.
458+
*/
459+
void DynamicSubscription::terminate(const std::optional<std::string>& reason)
460+
{
461+
m_data->terminate(reason);
462+
}
463+
464+
/** @brief Processes a single event associated with this subscription.
465+
*
466+
* Invoke only when the file descriptor associated with this subscription is ready for reading.
467+
* Otherwise, the function blocks unless the FD is set to non-blocking.
468+
*
469+
* Wraps `srsn_read_notif`.
470+
*/
471+
void DynamicSubscription::processEvent(YangPushNotifCb cb) const
472+
{
473+
struct timespec timestamp;
474+
struct lyd_node* tree;
475+
auto ctx = std::unique_ptr<const ly_ctx, std::function<void(const ly_ctx*)>>(sr_session_acquire_context(m_data->sess.get()), [&](const ly_ctx*) { sr_session_release_context(m_data->sess.get()); });
476+
477+
auto err = srsn_read_notif(fd(), ctx.get(), &timestamp, &tree);
478+
throwIfError(err, "Couldn't read yang-push notification");
479+
480+
const auto wrappedNotification = tree ? std::optional{libyang::wrapRawNode(tree)} : std::nullopt;
481+
482+
// If we see subscription-terminated notification, the subscription is terminated by sysrepo (e.g. because of stop-time)
483+
// This is not the correct notification as per RFC to use when reaching stop-time, see https://github.com/sysrepo/sysrepo/issues/3525.
484+
if (wrappedNotification && wrappedNotification->path() == "/ietf-subscribed-notifications:subscription-terminated") {
485+
m_data->m_terminated = true;
486+
}
487+
488+
cb(wrappedNotification, toTimePoint(timestamp));
489+
}
490+
491+
DynamicSubscription::Data::Data(std::shared_ptr<sr_session_ctx_s> sess, int fd, uint64_t subId, const std::optional<NotificationTimeStamp>& replayStartTime, bool terminated)
492+
: sess(std::move(sess))
493+
, fd(fd)
494+
, subId(subId)
495+
, m_replayStartTime(replayStartTime)
496+
, m_terminated(terminated)
497+
{
498+
}
499+
500+
DynamicSubscription::Data::~Data()
501+
{
502+
if (!m_terminated) {
503+
terminate();
504+
}
505+
close(fd);
506+
}
507+
508+
void DynamicSubscription::Data::terminate(const std::optional<std::string>& reason)
509+
{
510+
auto err = srsn_terminate(subId, reason ? reason->c_str() : nullptr);
511+
throwIfError(err, "Couldn't terminate yang-push subscription with id " + std::to_string(subId));
512+
m_terminated = true;
513+
}
413514
}

0 commit comments

Comments
 (0)