Skip to content

Commit 9d81220

Browse files
committed
add subtree filtering to notifications subscription
Change-Id: Ie547b5f478cc8e3b09ea2f324b62854576787e1b
1 parent 415c401 commit 9d81220

File tree

4 files changed

+213
-26
lines changed

4 files changed

+213
-26
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ if(BUILD_TESTING)
8282
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/yang/ietf-interfaces@2018-02-20.yang
8383
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/yang/ietf-ip@2018-02-22.yang
8484
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/yang/ietf-network-instance@2019-01-21.yang
85-
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/yang/ietf-subscribed-notifications@2019-09-09.yang -e replay
85+
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/yang/ietf-subscribed-notifications@2019-09-09.yang -e replay -e subtree
8686
--install ${CMAKE_CURRENT_SOURCE_DIR}/tests/yang/ietf-yang-push@2019-09-09.yang -e on-change
8787
)
8888

include/sysrepo-cpp/Session.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,17 +137,17 @@ class Session {
137137
const std::optional<FDHandling>& callbacks = std::nullopt);
138138

139139
[[nodiscard]] DynamicSubscription yangPushPeriodic(
140-
const std::optional<std::string>& xpathFilter,
140+
const std::optional<std::variant<std::string, libyang::DataNodeAny>>& filter,
141141
std::chrono::milliseconds periodTime,
142142
const std::optional<NotificationTimeStamp>& anchorTime = std::nullopt,
143143
const std::optional<NotificationTimeStamp>& stopTime = std::nullopt);
144144
[[nodiscard]] DynamicSubscription yangPushOnChange(
145-
const std::optional<std::string>& xpathFilter,
145+
const std::optional<std::variant<std::string, libyang::DataNodeAny>>& filter,
146146
const std::optional<std::chrono::milliseconds>& dampeningPeriod = std::nullopt,
147147
SyncOnStart syncOnStart = SyncOnStart::No,
148148
const std::optional<NotificationTimeStamp>& stopTime = std::nullopt);
149149
[[nodiscard]] DynamicSubscription subscribeNotifications(
150-
const std::optional<std::string>& xpathFilter,
150+
const std::optional<std::variant<std::string, libyang::DataNodeAny>>& filter,
151151
const std::optional<std::string>& stream = std::nullopt,
152152
const std::optional<NotificationTimeStamp>& stopTime = std::nullopt,
153153
const std::optional<NotificationTimeStamp>& startTime = std::nullopt);

src/Session.cpp

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,37 @@ libyang::DataNode wrapSrData(std::shared_ptr<sr_session_ctx_s> sess, sr_data_t*
4242
sr_release_data(data);
4343
}));
4444
}
45+
46+
std::optional<std::string> constructXPathFilter(const std::optional<std::variant<std::string, libyang::DataNodeAny>>& filter)
47+
{
48+
if (!filter) {
49+
return std::nullopt;
50+
}
51+
52+
if (std::holds_alternative<std::string>(*filter)) {
53+
return std::get<std::string>(*filter);
54+
}
55+
56+
auto node = std::get<libyang::DataNodeAny>(*filter);
57+
auto value = node.releaseValue();
58+
59+
if (!value) {
60+
return "/"; // select nothing, RFC 6241, 6.4.2
61+
}
62+
63+
if (std::holds_alternative<libyang::DataNode>(*value)) {
64+
char* str;
65+
66+
auto filterTree = std::get<libyang::DataNode>(*value);
67+
auto res = srsn_filter_subtree2xpath(libyang::getRawNode(filterTree), nullptr, &str);
68+
std::unique_ptr<char, decltype([](auto* p) constexpr { std::free(p); })> strDeleter(str); // pass ownership of c-string to the deleter
69+
70+
throwIfError(res, "Unable to convert subtree filter to xpath");
71+
return str;
72+
}
73+
74+
throw Error("Subtree filter anydata node must contain (possibly empty) libyang tree");
75+
}
4576
}
4677

4778
/**
@@ -526,17 +557,17 @@ Subscription Session::onNotification(
526557
/**
527558
* Subscribe for receiving notifications according to 'ietf-yang-push' YANG periodic subscriptions.
528559
*
529-
* Wraps `srsn_yang_push_periodic`.
560+
* Wraps `srsn_subscribe` and `srsn_filter_subtree2xpath` for subtree filters.
530561
*
531-
* @param xpathFilter Optional XPath that filters received notification.
562+
* @param filter Optional filter for received notification, xpath filter for string type, subtree filter for libyang::DataNodeAny
532563
* @param periodTime Notification period.
533564
* @param anchorTime Optional anchor time for the period. Anchor time acts as a reference point for the period.
534565
* @param stopTime Optional stop time ending the notification subscription.
535566
*
536567
* @return A YangPushSubscription handle.
537568
*/
538569
DynamicSubscription Session::yangPushPeriodic(
539-
const std::optional<std::string>& xpathFilter,
570+
const std::optional<std::variant<std::string, libyang::DataNodeAny>>& filter,
540571
std::chrono::milliseconds periodTime,
541572
const std::optional<NotificationTimeStamp>& anchorTime,
542573
const std::optional<NotificationTimeStamp>& stopTime)
@@ -545,6 +576,8 @@ DynamicSubscription Session::yangPushPeriodic(
545576
uint32_t subId;
546577
auto stopSpec = stopTime ? std::optional{toTimespec(*stopTime)} : std::nullopt;
547578
auto anchorSpec = anchorTime ? std::optional{toTimespec(*anchorTime)} : std::nullopt;
579+
auto xpathFilter = constructXPathFilter(filter);
580+
548581
auto res = srsn_yang_push_periodic(m_sess.get(),
549582
toDatastore(activeDatastore()),
550583
xpathFilter ? xpathFilter->c_str() : nullptr,
@@ -561,24 +594,26 @@ DynamicSubscription Session::yangPushPeriodic(
561594
/**
562595
* Subscribe for receiving notifications according to 'ietf-yang-push' YANG on-change subscriptions.
563596
*
564-
* Wraps `srsn_yang_push_on_change`.
597+
* Wraps `srsn_subscribe` and `srsn_filter_subtree2xpath` for subtree filters.
565598
*
566-
* @param xpathFilter Optional XPath that filters received notification.
599+
* @param filter Optional filter for received notification, xpath filter for string type, subtree filter for libyang::DataNodeAny
567600
* @param dampeningPeriod Optional dampening period.
568601
* @param syncOnStart Whether to start with a notification of the current state.
569602
* @param stopTime Optional stop time ending the notification subscription.
570603
*
571604
* @return A YangPushSubscription handle.
572605
*/
573606
DynamicSubscription Session::yangPushOnChange(
574-
const std::optional<std::string>& xpathFilter,
607+
const std::optional<std::variant<std::string, libyang::DataNodeAny>>& filter,
575608
const std::optional<std::chrono::milliseconds>& dampeningPeriod,
576609
SyncOnStart syncOnStart,
577610
const std::optional<NotificationTimeStamp>& stopTime)
578611
{
579612
int fd;
580613
uint32_t subId;
581614
auto stopSpec = stopTime ? std::optional{toTimespec(*stopTime)} : std::nullopt;
615+
auto xpathFilter = constructXPathFilter(filter);
616+
582617
auto res = srsn_yang_push_on_change(m_sess.get(),
583618
toDatastore(activeDatastore()),
584619
xpathFilter ? xpathFilter->c_str() : nullptr,
@@ -596,19 +631,19 @@ DynamicSubscription Session::yangPushOnChange(
596631
}
597632

598633
/**
599-
* Subscribe for receiving notifications according to 'ietf-subscribed-notifications'
634+
* Subscribe for receiving notifications according to 'ietf-subscribed-notifications'.
600635
*
601-
* Wraps `srsn_subscribe.
636+
* Wraps `srsn_subscribe` and `srsn_filter_subtree2xpath` for subtree filters.
602637
*
603-
* @param xpathFilter Optional XPath that filters received notification.
638+
* @param filter Optional filter for received notification, xpath filter for string type, subtree filter for libyang::DataNodeAny
604639
* @param stream Optional stream to subscribe to.
605640
* @param stopTime Optional stop time ending the subscription.
606641
* @param startTime Optional start time of the subscription, used for replaying stored notifications.
607642
*
608643
* @return A YangPushSubscription handle.
609644
*/
610645
DynamicSubscription Session::subscribeNotifications(
611-
const std::optional<std::string>& xpathFilter,
646+
const std::optional<std::variant<std::string, libyang::DataNodeAny>>& filter,
612647
const std::optional<std::string>& stream,
613648
const std::optional<NotificationTimeStamp>& stopTime,
614649
const std::optional<NotificationTimeStamp>& startTime)
@@ -618,10 +653,11 @@ DynamicSubscription Session::subscribeNotifications(
618653
auto stopSpec = stopTime ? std::optional{toTimespec(*stopTime)} : std::nullopt;
619654
auto startSpec = startTime ? std::optional{toTimespec(*startTime)} : std::nullopt;
620655
struct timespec replayStartSpec;
656+
auto xpathFilter = constructXPathFilter(filter);
621657

622658
auto res = srsn_subscribe(m_sess.get(),
623659
stream ? stream->c_str() : nullptr,
624-
xpathFilter ? xpathFilter->c_str() : nullptr,
660+
xpathFilter ? xpathFilter->data() : nullptr,
625661
stopSpec ? &stopSpec.value() : nullptr,
626662
startSpec ? &startSpec.value() : nullptr,
627663
false,

tests/subscriptions-dynamic.cpp

Lines changed: 162 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
#define REQUIRE_NOTIFICATION(SUBSCRIPTION, NOTIFICATION) \
2929
TROMPELOEIL_REQUIRE_CALL(rec, recordNotification(NOTIFICATION)).IN_SEQUENCE(seq);
3030

31+
#define REQUIRE_NAMED_NOTIFICATION(SUBSCRIPTION, NOTIFICATION) \
32+
expectations.emplace_back(TROMPELOEIL_NAMED_REQUIRE_CALL(rec, recordNotification(NOTIFICATION)).IN_SEQUENCE(seq));
33+
3134
#define READ_NOTIFICATION(SUBSCRIPTION) \
3235
REQUIRE(pipeStatus((SUBSCRIPTION).fd()) == PipeStatus::DataReady); \
3336
(SUBSCRIPTION).processEvent(cbNotif);
@@ -276,6 +279,89 @@ TEST_CASE("Dynamic subscriptions")
276279
const auto excMessage = "Couldn't terminate yang-push subscription with id " + std::to_string(sub->subscriptionId()) + ": SR_ERR_NOT_FOUND";
277280
REQUIRE_THROWS_WITH_AS(sub->terminate(), excMessage.c_str(), sysrepo::ErrorWithCode);
278281
}
282+
283+
DOCTEST_SUBCASE("Filtering")
284+
{
285+
std::optional<sysrepo::DynamicSubscription> sub;
286+
std::vector<std::unique_ptr<trompeloeil::expectation>> expectations;
287+
288+
DOCTEST_SUBCASE("xpath filter")
289+
{
290+
sub = sess.subscribeNotifications("/test_module:ping");
291+
292+
REQUIRE_NAMED_NOTIFICATION(sub, notifications[0]);
293+
}
294+
295+
DOCTEST_SUBCASE("subtree filter")
296+
{
297+
libyang::CreatedNodes createdNodes;
298+
299+
DOCTEST_SUBCASE("filter a node")
300+
{
301+
DOCTEST_SUBCASE("XML")
302+
{
303+
createdNodes = sess.getContext().newPath2(
304+
"/ietf-subscribed-notifications:establish-subscription/stream-subtree-filter",
305+
libyang::XML{"<ping xmlns='urn:ietf:params:xml:ns:yang:test_module' />"});
306+
}
307+
308+
DOCTEST_SUBCASE("JSON")
309+
{
310+
createdNodes = sess.getContext().newPath2(
311+
"/ietf-subscribed-notifications:establish-subscription/stream-subtree-filter",
312+
libyang::JSON{R"({"test_module:ping": {}})"});
313+
}
314+
315+
REQUIRE_NAMED_NOTIFICATION(sub, notifications[0]);
316+
}
317+
318+
DOCTEST_SUBCASE("filter more top level nodes")
319+
{
320+
DOCTEST_SUBCASE("XML")
321+
{
322+
createdNodes = sess.getContext().newPath2(
323+
"/ietf-subscribed-notifications:establish-subscription/stream-subtree-filter",
324+
libyang::XML{"<ping xmlns='urn:ietf:params:xml:ns:yang:test_module' />"
325+
"<silent-ping xmlns='urn:ietf:params:xml:ns:yang:test_module' />"});
326+
}
327+
328+
DOCTEST_SUBCASE("JSON")
329+
{
330+
createdNodes = sess.getContext().newPath2(
331+
"/ietf-subscribed-notifications:establish-subscription/stream-subtree-filter",
332+
libyang::JSON{R"({
333+
"test_module:ping": {},
334+
"test_module:silent-ping": {}
335+
})"});
336+
}
337+
338+
REQUIRE_NAMED_NOTIFICATION(sub, notifications[0]);
339+
REQUIRE_NAMED_NOTIFICATION(sub, notifications[1]);
340+
}
341+
342+
DOCTEST_SUBCASE("empty filter selects nothing")
343+
{
344+
createdNodes = sess.getContext().newPath2(
345+
"/ietf-subscribed-notifications:establish-subscription/stream-subtree-filter",
346+
std::nullopt);
347+
}
348+
349+
sub = sess.subscribeNotifications(createdNodes.createdNode->asAny());
350+
}
351+
352+
CLIENT_SEND_NOTIFICATION(notifications[0]);
353+
CLIENT_SEND_NOTIFICATION(notifications[1]);
354+
355+
// read as many notifications as we expect
356+
for (size_t i = 0; i < expectations.size(); ++i) {
357+
READ_NOTIFICATION_BLOCKING(*sub);
358+
}
359+
360+
sub->terminate();
361+
362+
// ensure no more notifications were sent
363+
REQUIRE_PIPE_HANGUP(*sub);
364+
}
279365
}
280366

281367
DOCTEST_SUBCASE("YANG Push on change")
@@ -285,9 +371,73 @@ TEST_CASE("Dynamic subscriptions")
285371
* between writing to sysrepo and reading the notifications.
286372
*/
287373

288-
auto sub = sess.yangPushOnChange(std::nullopt, std::nullopt, sysrepo::SyncOnStart::Yes);
374+
DOCTEST_SUBCASE("Filters")
375+
{
376+
std::optional<sysrepo::DynamicSubscription> sub;
377+
378+
DOCTEST_SUBCASE("XPath filter")
379+
{
380+
sub = sess.yangPushOnChange("/test_module:leafInt32 | /test_module:popelnice/content/trash[name='asd']");
381+
}
382+
383+
DOCTEST_SUBCASE("Subtree filter")
384+
{
385+
auto createdNodes = sess.getContext().newPath2(
386+
"/ietf-subscribed-notifications:establish-subscription/ietf-yang-push:datastore-subtree-filter",
387+
libyang::XML{"<leafInt32 xmlns='http://example.com/' />"
388+
"<popelnice xmlns='http://example.com/'><content><trash><name>asd</name></trash></content></popelnice>"});
389+
sub = sess.yangPushOnChange(createdNodes.createdNode->asAny());
390+
}
391+
392+
client.setItem("/test_module:leafInt32", "42");
393+
client.setItem("/test_module:popelnice/s", "asd");
394+
client.setItem("/test_module:popelnice/content/trash[name='asd']", std::nullopt);
395+
client.applyChanges();
289396

290-
REQUIRE_YANG_PUSH_UPDATE(sub, R"({
397+
client.deleteItem("/test_module:popelnice/s");
398+
client.applyChanges();
399+
400+
REQUIRE_YANG_PUSH_UPDATE(*sub, R"({
401+
"ietf-yang-push:push-change-update": {
402+
"datastore-changes": {
403+
"yang-patch": {
404+
"patch-id": "patch-1",
405+
"edit": [
406+
{
407+
"edit-id": "edit-1",
408+
"operation": "create",
409+
"target": "/test_module:leafInt32",
410+
"value": {
411+
"test_module:leafInt32": 42
412+
}
413+
},
414+
{
415+
"edit-id": "edit-2",
416+
"operation": "create",
417+
"target": "/test_module:popelnice/content/trash[name='asd']",
418+
"value": {
419+
"test_module:trash": {
420+
"name": "asd"
421+
}
422+
}
423+
}
424+
]
425+
}
426+
}
427+
}
428+
}
429+
)");
430+
READ_YANG_PUSH_UPDATE(*sub);
431+
432+
sub->terminate();
433+
REQUIRE_PIPE_HANGUP(*sub);
434+
}
435+
436+
DOCTEST_SUBCASE("Sync on start")
437+
{
438+
auto sub = sess.yangPushOnChange(std::nullopt, std::nullopt, sysrepo::SyncOnStart::Yes);
439+
440+
REQUIRE_YANG_PUSH_UPDATE(sub, R"({
291441
"ietf-yang-push:push-update": {
292442
"datastore-contents": {
293443
"test_module:values": [
@@ -298,14 +448,14 @@ TEST_CASE("Dynamic subscriptions")
298448
}
299449
}
300450
)");
301-
READ_YANG_PUSH_UPDATE(sub);
451+
READ_YANG_PUSH_UPDATE(sub);
302452

303-
client.setItem("/test_module:leafInt32", "123");
304-
client.setItem("/test_module:values[.='5']", std::nullopt);
305-
client.deleteItem("/test_module:values[.='3']");
306-
client.applyChanges();
453+
client.setItem("/test_module:leafInt32", "123");
454+
client.setItem("/test_module:values[.='5']", std::nullopt);
455+
client.deleteItem("/test_module:values[.='3']");
456+
client.applyChanges();
307457

308-
REQUIRE_YANG_PUSH_UPDATE(sub, R"({
458+
REQUIRE_YANG_PUSH_UPDATE(sub, R"({
309459
"ietf-yang-push:push-change-update": {
310460
"datastore-changes": {
311461
"yang-patch": {
@@ -342,10 +492,11 @@ TEST_CASE("Dynamic subscriptions")
342492
}
343493
}
344494
)");
345-
READ_YANG_PUSH_UPDATE(sub);
495+
READ_YANG_PUSH_UPDATE(sub);
346496

347-
sub.terminate();
348-
REQUIRE_PIPE_HANGUP(sub);
497+
sub.terminate();
498+
REQUIRE_PIPE_HANGUP(sub);
499+
}
349500
}
350501

351502
DOCTEST_SUBCASE("YANG Push periodic")

0 commit comments

Comments
 (0)