diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..ccc3329 --- /dev/null +++ b/.clang-format @@ -0,0 +1,4 @@ +BasedOnStyle: Mozilla +AllowShortFunctionsOnASingleLine: Empty +IndentWidth: 2 +ColumnLimit: 110 diff --git a/README.md b/README.md index ec06c2e..21725c0 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,8 @@ where `sync-prefix` is `/ndn/svs` for the example application. ## Contributing Contributions are welcome through GitHub. +Use `clang-format` to format the code before submitting a pull request. +The VS Code extension for `clang-format` is recommended to format the code automatically. ## License diff --git a/examples/chat-pubsub.cpp b/examples/chat-pubsub.cpp index 1bfad0d..563d7f5 100644 --- a/examples/chat-pubsub.cpp +++ b/examples/chat-pubsub.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2023 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -51,22 +51,19 @@ class Program opts.maxPubAge = ndn::time::seconds(10); // Create the Pub/Sub instance - m_svsps = std::make_shared( - ndn::Name(m_options.prefix), - ndn::Name(m_options.m_id), - face, - std::bind(&Program::onMissingData, this, _1), - opts, - secOpts); + m_svsps = std::make_shared(ndn::Name(m_options.prefix), + ndn::Name(m_options.m_id), + face, + std::bind(&Program::onMissingData, this, _1), + opts, + secOpts); std::cout << "SVS client starting: " << m_options.m_id << std::endl; // Subscribe to all data packets with prefix /chat (the "topic") - m_svsps->subscribe(ndn::Name("/chat"), [] (const auto& subData) - { + m_svsps->subscribe(ndn::Name("/chat"), [](const auto& subData) { std::string content(reinterpret_cast(subData.data.data()), subData.data.size()); - std::cout << subData.producerPrefix << " [" << subData.seqNo << "] : " << - subData.name << " : "; + std::cout << subData.producerPrefix << " [" << subData.seqNo << "] : " << subData.name << " : "; if (content.length() > 200) { std::cout << "[LONG] " << content.length() << " bytes" << " [" << std::hash{}(content) << "]"; @@ -77,8 +74,7 @@ class Program }); } - void - run() + void run() { // Begin processing face events in a separate thread. std::thread svsThread([this] { face.processEvents(); }); @@ -101,11 +97,10 @@ class Program protected: /** * Callback on receving a new State Vector from another node. - * This will be called regardless of whether the missing data contains any topics - * or producers that we are subscribed to. + * This will be called regardless of whether the missing data contains any + * topics or producers that we are subscribed to. */ - void - onMissingData(const std::vector&) + void onMissingData(const std::vector&) { // Ignore any other missing data for this example } @@ -113,8 +108,7 @@ class Program /** * Publish a string message to the group */ - void - publishMsg(const std::string& msg) + void publishMsg(const std::string& msg) { // Message to send std::string content = msg; @@ -129,14 +123,15 @@ class Program for (auto& c : content) c = std::rand() % 26 + 'a'; - std::cout << "> Sending random message with hash [" << std::hash{}(content) << "]" << std::endl; + std::cout << "> Sending random message with hash [" << std::hash{}(content) << "]" + << std::endl; } // Note that unlike SVSync, names can be arbitrary, // and need not be prefixed with the producer prefix. - ndn::Name name("chat"); // topic of publication - name.append(m_options.m_id); // who sent this - name.appendTimestamp(); // and when + ndn::Name name("chat"); // topic of publication + name.append(m_options.m_id); // who sent this + name.appendTimestamp(); // and when m_svsps->publish(name, ndn::make_span(reinterpret_cast(content.data()), content.size())); } diff --git a/examples/chat.cpp b/examples/chat.cpp index 9904690..36d4eca 100644 --- a/examples/chat.cpp +++ b/examples/chat.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2023 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -41,17 +41,18 @@ class Program // Create the SVSync instance m_svs = std::make_shared( - ndn::Name(m_options.prefix), // Sync prefix, common for all nodes in the group - ndn::Name(m_options.m_id), // Unique data prefix for this node - face, // Shared NDN face - std::bind(&Program::onMissingData, this, _1), // Callback on learning new sequence numbers from SVS - securityOptions); // Security configuration + ndn::Name(m_options.prefix), // Sync prefix, common for all nodes in the group + ndn::Name(m_options.m_id), // Unique data prefix for this node + face, // Shared NDN face + std::bind(&Program::onMissingData, + this, + _1), // Callback on learning new sequence numbers from SVS + securityOptions); // Security configuration std::cout << "SVS client starting: " << m_options.m_id << std::endl; } - void - run() + void run() { // Begin processing face events in a separate thread. std::thread svsThread([this] { face.processEvents(); }); @@ -75,19 +76,15 @@ class Program /** * Callback on receving a new State Vector from another node */ - void - onMissingData(const std::vector& v) + void onMissingData(const std::vector& v) { // Iterate over the entire difference set - for (size_t i = 0; i < v.size(); i++) - { + for (size_t i = 0; i < v.size(); i++) { // Iterate over each new sequence number that we learned - for (ndn::svs::SeqNo s = v[i].low; s <= v[i].high; ++s) - { + for (ndn::svs::SeqNo s = v[i].low; s <= v[i].high; ++s) { // Request a single data packet using the SVSync API ndn::svs::NodeID nid = v[i].nodeId; - m_svs->fetchData(nid, s, [nid] (const auto& data) - { + m_svs->fetchData(nid, s, [nid](const auto& data) { std::string content(reinterpret_cast(data.getContent().value()), data.getContent().value_size()); std::cout << data.getName() << " : " << content << std::endl; @@ -99,10 +96,10 @@ class Program /** * Publish a string message to the SVSync group */ - void - publishMsg(std::string_view msg) + void publishMsg(std::string_view msg) { - // Encode the message into a Content TLV block, which is what the SVSync API expects + // Encode the message into a Content TLV block, which is what the SVSync API + // expects auto block = ndn::encoding::makeStringBlock(ndn::tlv::Content, msg); m_svs->publishData(block, ndn::time::seconds(1)); } diff --git a/examples/core.cpp b/examples/core.cpp new file mode 100644 index 0000000..9735409 --- /dev/null +++ b/examples/core.cpp @@ -0,0 +1,104 @@ +/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2012-2025 University of California, Los Angeles + * + * This file is part of ndn-svs, synchronization library for distributed realtime + * applications for NDN. + * + * ndn-svs library is free software: you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License as published by the Free Software + * Foundation, in version 2.1 of the License. + * + * ndn-svs library is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. + */ + +#include +#include +#include +#include +#include + +#include + +struct Options +{ + std::string prefix; + std::string m_id; +}; + +class Program +{ +public: + Program(const Options& options) + : m_options(options) + { + // This is a usage example of the low level SvSyncCore API. + + // Create the SVSyncCore instance + m_svs = std::make_shared( + face, // Shared NDN face + ndn::Name(m_options.prefix), // Sync prefix, common for all nodes in the group + std::bind(&Program::onUpdate, this, _1), // Callback on learning new sequence numbers from SVS + ndn::svs::SecurityOptions::DEFAULT, // Security configuration + ndn::Name(m_options.m_id) // Unique prefix for this node + ); + + std::cout << "SVS client starting: " << m_options.m_id << std::endl; + } + + void run() + { + // Begin processing face events in a separate thread. + std::thread svsThread([this] { face.processEvents(); }); + + // Increment our sequence number every 3 seconds + while (true) { + auto seq = m_svs->getSeqNo() + 1; + m_svs->updateSeqNo(seq); + std::cout << "Published sequence number: " << m_options.m_id << "=" << seq << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(3)); + } + + // Wait for the SVSync thread to finish on exit. + svsThread.join(); + } + +protected: + /** + * Callback on receving a new State Vector from another node + */ + void onUpdate(const std::vector& v) + { + for (size_t i = 0; i < v.size(); i++) { + for (ndn::svs::SeqNo s = v[i].low; s <= v[i].high; ++s) { + std::cout << "Received update: " << v[i].nodeId << "=" << s << std::endl; + } + } + } + +public: + const Options m_options; + ndn::Face face; + std::shared_ptr m_svs; + ndn::KeyChain m_keyChain; +}; + +int +main(int argc, char** argv) +{ + if (argc != 2) { + std::cerr << "Usage: " << argv[0] << " " << std::endl; + return 1; + } + + Options opt; + opt.prefix = "/ndn/svs"; + opt.m_id = argv[1]; + + Program program(opt); + program.run(); + + return 0; +} diff --git a/ndn-svs/common.hpp b/ndn-svs/common.hpp index b85737d..477207d 100644 --- a/ndn-svs/common.hpp +++ b/ndn-svs/common.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2022 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -44,7 +44,7 @@ using SeqNo = uint64_t; using ndn::security::ValidationError; using DataValidatedCallback = std::function; -using DataValidationErrorCallback = std::function ; +using DataValidationErrorCallback = std::function; } // namespace ndn::svs diff --git a/ndn-svs/core.cpp b/ndn-svs/core.cpp index 92e09ae..a22fa4d 100644 --- a/ndn-svs/core.cpp +++ b/ndn-svs/core.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2023 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -27,8 +27,8 @@ #ifdef NDN_SVS_COMPRESSION #include #include -#include #include +#include #endif namespace ndn::svs { @@ -58,7 +58,7 @@ SVSyncCore::SVSyncCore(ndn::Face& face, m_face.setInterestFilter(syncPrefix, std::bind(&SVSyncCore::onSyncInterest, this, _2), std::bind(&SVSyncCore::sendInitialInterest, this), - [] (auto&&...) { NDN_THROW(Error("Failed to register sync prefix")); }); + [](auto&&...) { NDN_THROW(Error("Failed to register sync prefix")); }); } static inline int @@ -70,9 +70,9 @@ suppressionCurve(int constFactor, int value) // Increasing the curve factor makes the curve steeper => // better for more nodes, but worse for fewer nodes. - double c = constFactor; - double v = value; - double f = 10.0; // curve factor + float c = constFactor; + float v = value; + float f = 10.0; // curve factor return static_cast(c * (1.0 - std::exp((v - c) / (c / f)))); } @@ -91,14 +91,14 @@ SVSyncCore::sendInitialInterest() void SVSyncCore::onSyncInterest(const Interest& interest) { - switch (m_securityOptions.interestSigner->signingInfo.getSignerType()) - { + switch (m_securityOptions.interestSigner->signingInfo.getSignerType()) { case security::SigningInfo::SIGNER_TYPE_NULL: onSyncInterestValidated(interest); return; case security::SigningInfo::SIGNER_TYPE_HMAC: - if (security::verifySignature(interest, m_keyChainMem.getTpm(), + if (security::verifySignature(interest, + m_keyChainMem.getTpm(), m_securityOptions.interestSigner->signingInfo.getSignerName(), DigestAlgorithm::SHA256)) onSyncInterestValidated(interest); @@ -106,9 +106,8 @@ SVSyncCore::onSyncInterest(const Interest& interest) default: if (m_securityOptions.validator) - m_securityOptions.validator->validate(interest, - std::bind(&SVSyncCore::onSyncInterestValidated, this, _1), - nullptr); + m_securityOptions.validator->validate( + interest, std::bind(&SVSyncCore::onSyncInterestValidated, this, _1), nullptr); else onSyncInterestValidated(interest); return; @@ -118,9 +117,7 @@ SVSyncCore::onSyncInterest(const Interest& interest) void SVSyncCore::onSyncInterestValidated(const Interest& interest) { - const auto& n = interest.getName(); - - // Get incoming face + // Get incoming face (this is needed by NLSR) uint64_t incomingFace = 0; { auto tag = interest.getTag(); @@ -129,58 +126,66 @@ SVSyncCore::onSyncInterestValidated(const Interest& interest) } } - // Get state vector - std::shared_ptr vvOther; - try - { - ndn::Block vvBlock = n.get(-2); + // Check for invalid Interest + if (!interest.hasApplicationParameters()) { + return; + } + + // Decode state parameters + ndn::Block params = interest.getApplicationParameters(); + params.parse(); - // Decompress if necessary - if (vvBlock.type() == tlv::StateVectorLzma) { #ifdef NDN_SVS_COMPRESSION - boost::iostreams::filtering_istreambuf in; - in.push(boost::iostreams::lzma_decompressor()); - in.push(boost::iostreams::array_source(reinterpret_cast(vvBlock.value()), vvBlock.value_size())); - ndn::OBufferStream decompressed; - boost::iostreams::copy(in, decompressed); - auto inner = ndn::Block::fromBuffer(decompressed.buf()); - if (!std::get<0>(inner)) { - NDN_THROW(ndn::tlv::Error("Failed to decode inner block")); - } - vvBlock = std::get<1>(inner); -#else - NDN_THROW(ndn::tlv::Error("SVS was compiled without compression support")); -#endif + // Decompress if necessary. The spec requires that if an LZMA block is + // present, then no other blocks are present (everything is compressed + // together) + if (params.find(tlv::LzmaBlock) != params.elements_end()) { + auto lzmaBlock = params.get(tlv::LzmaBlock); + + boost::iostreams::filtering_istreambuf in; + in.push(boost::iostreams::lzma_decompressor()); + in.push(boost::iostreams::array_source(reinterpret_cast(lzmaBlock.value()), + lzmaBlock.value_size())); + ndn::OBufferStream decompressed; + boost::iostreams::copy(in, decompressed); + + auto parsed = ndn::Block::fromBuffer(decompressed.buf()); + if (!std::get<0>(parsed)) { + // TODO: log error parsing inner block + return; } - vvOther = std::make_shared(vvBlock); + params = std::get<1>(parsed); + params.parse(); } - catch (ndn::tlv::Error&) - { +#endif + + // Get state vector + std::shared_ptr vvOther; + try { + vvOther = std::make_shared(params.get(tlv::StateVector)); + } catch (ndn::tlv::Error&) { // TODO: log error return; } - if (m_recvExtraBlock && interest.hasApplicationParameters()) - { + // Read extra mapping blocks + if (m_recvExtraBlock) { try { - m_recvExtraBlock(interest.getApplicationParameters().blockFromValue(), *vvOther); + m_recvExtraBlock(params.get(tlv::MappingData), *vvOther); + } catch (std::exception&) { + // TODO: log error but continue } - catch (std::exception&) {} } // Merge state vector auto result = mergeStateVector(*vvOther); - bool myVectorNew = std::get<0>(result); - auto missingData = std::get<2>(result); - // Callback if missing data found - if (!missingData.empty()) - { - for (auto& e : missingData) + if (!result.missingInfo.empty()) { + for (auto& e : result.missingInfo) e.incomingFace = incomingFace; - m_onUpdate(missingData); + m_onUpdate(result.missingInfo); } // Try to record; the call will check if in suppression state @@ -189,12 +194,9 @@ SVSyncCore::onSyncInterestValidated(const Interest& interest) // If incoming state identical/newer to local vector, reset timer // If incoming state is older, send sync interest immediately - if (!myVectorNew) - { + if (!result.myVectorNew) { retxSyncInterest(false, 0); - } - else - { + } else { enterSuppressionState(*vvOther); // Check how much time is left on the timer, // reset to ~m_intrReplyDist if more than that. @@ -204,8 +206,7 @@ SVSyncCore::onSyncInterestValidated(const Interest& interest) // TODO: efficient curve depends on number of active nodes delay = suppressionCurve(m_maxSuppressionTime.count(), delay); - if (getCurrentTime() + delay * 1000 < m_nextSyncInterest) - { + if (getCurrentTime() + delay * 1000 < m_nextSyncInterest) { retxSyncInterest(false, delay); } } @@ -214,13 +215,12 @@ SVSyncCore::onSyncInterestValidated(const Interest& interest) void SVSyncCore::retxSyncInterest(bool send, unsigned int delay) { - if (send) - { + if (send) { std::lock_guard lock(m_recordedVvMutex); // Only send interest if in steady state or local vector has newer state // than recorded interests - if (!m_recordedVv || std::get<0>(mergeStateVector(*m_recordedVv))) + if (!m_recordedVv || mergeStateVector(*m_recordedVv).myVectorNew) sendSyncInterest(); m_recordedVv = nullptr; } @@ -234,8 +234,7 @@ SVSyncCore::retxSyncInterest(bool send, unsigned int delay) // Store the scheduled time m_nextSyncInterest = getCurrentTime() + 1000 * delay; - m_retxEvent = m_scheduler.schedule(time::milliseconds(delay), - [this] { retxSyncInterest(true, 0); }); + m_retxEvent = m_scheduler.schedule(time::milliseconds(delay), [this] { retxSyncInterest(true, 0); }); } } @@ -245,40 +244,43 @@ SVSyncCore::sendSyncInterest() if (!m_initialized) return; - Interest interest; - interest.setApplicationParameters(span{'0'}); - - ndn::Block vvWire; + // Build app parameters + ndn::encoding::EncodingBuffer enc; { std::lock_guard lock(m_vvMutex); - vvWire = m_vv.encode(); + size_t length = 0; - // Add parameters digest + // Add extra mapping blocks if (m_getExtraBlock) - { - interest.setApplicationParameters(m_getExtraBlock(m_vv)); - } + length += ndn::encoding::prependBlock(enc, m_getExtraBlock(m_vv)); + + // Add state vector + length += ndn::encoding::prependBlock(enc, m_vv.encode()); + + // Add length and ApplicationParameters type + enc.prependVarNumber(length); + enc.prependVarNumber(ndn::tlv::ApplicationParameters); } - // Create sync interest name - Name syncName(m_syncPrefix); + ndn::Block wire = enc.block(); + wire.encode(); #ifdef NDN_SVS_COMPRESSION - vvWire.encode(); boost::iostreams::filtering_istreambuf in; in.push(boost::iostreams::lzma_compressor()); - in.push(boost::iostreams::array_source(reinterpret_cast(vvWire.data()), vvWire.size())); + in.push(boost::iostreams::array_source(reinterpret_cast(wire.data()), wire.size())); ndn::OBufferStream compressed; boost::iostreams::copy(in, compressed); - vvWire = ndn::Block(tlv::StateVectorLzma, compressed.buf()); + wire = ndn::Block(tlv::LzmaBlock, compressed.buf()); + wire.encode(); #endif - syncName.append(Name::Component(vvWire)); - interest.setName(syncName); + // Create Sync Interest + Interest interest(Name(m_syncPrefix).appendVersion(2)); + interest.setApplicationParameters(wire); interest.setInterestLifetime(1_ms); - switch (m_securityOptions.interestSigner->signingInfo.getSignerType()) - { + switch (m_securityOptions.interestSigner->signingInfo.getSignerType()) { case security::SigningInfo::SIGNER_TYPE_NULL: break; @@ -294,50 +296,45 @@ SVSyncCore::sendSyncInterest() m_face.expressInterest(interest, nullptr, nullptr, nullptr); } -std::tuple> +SVSyncCore::MergeResult SVSyncCore::mergeStateVector(const VersionVector& vvOther) { std::lock_guard lock(m_vvMutex); - - bool myVectorNew = false, - otherVectorNew = false; - - // New data found in vvOther - std::vector missingData; + SVSyncCore::MergeResult result; // Check if other vector has newer state - for (const auto& entry : vvOther) - { + for (const auto& entry : vvOther) { NodeID nidOther = entry.first; SeqNo seqOther = entry.second; SeqNo seqCurrent = m_vv.get(nidOther); - if (seqCurrent < seqOther) - { - otherVectorNew = true; + if (seqCurrent < seqOther) { + result.otherVectorNew = true; SeqNo startSeq = m_vv.get(nidOther) + 1; - missingData.push_back({nidOther, startSeq, seqOther, 0}); + result.missingInfo.push_back({ nidOther, startSeq, seqOther, 0 }); m_vv.set(nidOther, seqOther); } } // Check if I have newer state - for (const auto& entry : m_vv) - { + for (const auto& entry : m_vv) { NodeID nid = entry.first; SeqNo seq = entry.second; SeqNo seqOther = vvOther.get(nid); - if (seqOther < seq) - { - myVectorNew = true; + // Ignore this node if it was last updated within network RTT + if (time::system_clock::now() - m_vv.getLastUpdate(nid) < m_maxSuppressionTime) + continue; + + if (seqOther < seq) { + result.myVectorNew = true; break; } } - return {myVectorNew, otherVectorNew, missingData}; + return result; } void @@ -374,8 +371,7 @@ SVSyncCore::getNodeIds() const { std::lock_guard lock(m_vvMutex); std::set sessionNames; - for (const auto& nid : m_vv) - { + for (const auto& nid : m_vv) { sessionNames.insert(nid.first); } return sessionNames; @@ -385,7 +381,8 @@ long SVSyncCore::getCurrentTime() const { return std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); } bool @@ -398,14 +395,12 @@ SVSyncCore::recordVector(const VersionVector& vvOther) std::lock_guard lock1(m_vvMutex); - for (const auto& entry : vvOther) - { + for (const auto& entry : vvOther) { NodeID nidOther = entry.first; SeqNo seqOther = entry.second; SeqNo seqCurrent = m_recordedVv->get(nidOther); - if (seqCurrent < seqOther) - { + if (seqCurrent < seqOther) { m_recordedVv->set(nidOther, seqOther); } } diff --git a/ndn-svs/core.hpp b/ndn-svs/core.hpp index 43b88bd..7229b6e 100644 --- a/ndn-svs/core.hpp +++ b/ndn-svs/core.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2023 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -81,18 +81,17 @@ class SVSyncCore : noncopyable /** * @brief Reset the sync tree (and restart synchronization again) * - * @param isOnInterest a flag that tells whether the reset is called by reset interest. + * @param isOnInterest a flag that tells whether the reset is called by reset + * interest. */ - void - reset(bool isOnInterest = false); + void reset(bool isOnInterest = false); /** * @brief Get the node ID of the local session. * * @param prefix prefix of the node */ - const NodeID& - getNodeId() + const NodeID& getNodeId() { return m_id; } @@ -105,8 +104,7 @@ class SVSyncCore : noncopyable * * @param prefix prefix of the node */ - SeqNo - getSeqNo(const NodeID& nid = EMPTY_NODE_ID) const; + SeqNo getSeqNo(const NodeID& nid = EMPTY_NODE_ID) const; /** * @brief Update the seqNo of the local session @@ -116,12 +114,10 @@ class SVSyncCore : noncopyable * @param seq The new seqNo. * @param nid The NodeID of node to update. */ - void - updateSeqNo(const SeqNo& seq, const NodeID& nid = EMPTY_NODE_ID); + void updateSeqNo(const SeqNo& seq, const NodeID& nid = EMPTY_NODE_ID); /// @brief Get all the nodeIDs - std::set - getNodeIds() const; + std::set getNodeIds() const; using GetExtraBlockCallback = std::function; using RecvExtraBlockCallback = std::function; @@ -132,8 +128,7 @@ class SVSyncCore : noncopyable * The version vector will be locked during the duration of this callback, * so it must return FAST! */ - void - setGetExtraBlockCallback(const GetExtraBlockCallback& callback) + void setGetExtraBlockCallback(const GetExtraBlockCallback& callback) { m_getExtraBlock = callback; } @@ -142,47 +137,40 @@ class SVSyncCore : noncopyable * @brief Callback on receiving extra data in a sync interest. * Will be called BEFORE the interest is processed. */ - void - setRecvExtraBlockCallback(const RecvExtraBlockCallback& callback) + void setRecvExtraBlockCallback(const RecvExtraBlockCallback& callback) { m_recvExtraBlock = callback; } /// @brief Get current version vector - VersionVector& - getState() + VersionVector& getState() { return m_vv; } /// @brief Get human-readable representation of version vector - std::string - getStateStr() const + std::string getStateStr() const { return m_vv.toStr(); } -NDN_SVS_PUBLIC_WITH_TESTS_ELSE_PRIVATE: - void - onSyncInterest(const Interest& interest); + NDN_SVS_PUBLIC_WITH_TESTS_ELSE_PRIVATE : void onSyncInterest(const Interest& interest); - void - onSyncInterestValidated(const Interest& interest); + void onSyncInterestValidated(const Interest& interest); /** * @brief Mark the instance as initialized and send the first interest */ - void - sendInitialInterest(); + void sendInitialInterest(); /** * @brief sendSyncInterest and schedule a new retxSyncInterest event. * * @param send Send a sync interest immediately - * @param delay Delay in milliseconds to schedule next interest (0 for default). + * @param delay Delay in milliseconds to schedule next interest (0 for + * default). */ - void - retxSyncInterest(bool send, unsigned int delay); + void retxSyncInterest(bool send, unsigned int delay); /** * @brief Add one sync interest to queue. @@ -190,52 +178,49 @@ class SVSyncCore : noncopyable * Called by retxSyncInterest(), or after increasing a sequence * number with updateSeqNo() */ - void - sendSyncInterest(); + void sendSyncInterest(); + + struct MergeResult + { + /// @brief If the local state vector has newer entries + bool myVectorNew; + /// @brief If the incoming state vector has newer entries + bool otherVectorNew; + /// @brief Newly learned missing information from incoming state vector + std::vector missingInfo; + }; /** * @brief Merge state vector into the current - * - * Also adds missing data interests to data interest queue. - * * @param vvOther state vector to merge in - * - * @returns a tuple of representing: - * . + * @details Also adds missing data interests to data interest queue. */ - std::tuple> - mergeStateVector(const VersionVector& vvOther); + MergeResult mergeStateVector(const VersionVector& vvOther); /** * @brief Record vector by merging it into m_recordedVv - * * @param vvOther state vector to merge in * @returns if recorded successfully */ - bool - recordVector(const VersionVector& vvOther); + bool recordVector(const VersionVector& vvOther); /** * @brief Enter suppression state by setting - * m_recording to True and initializing m_recordedVv to vvOther - * + * m_recording to True and initializing m_recordedVv to vvOther. * Does nothing if already in suppression state * * @param vvOther first vector to record */ - void - enterSuppressionState(const VersionVector& vvOther); + void enterSuppressionState(const VersionVector& vvOther); /// @brief Reference to scheduler - ndn::Scheduler& - getScheduler() + ndn::Scheduler& getScheduler() { return m_scheduler; } /// @brief Get the current time in microseconds with arbitrary reference - long - getCurrentTime() const; + long getCurrentTime() const; public: static inline const NodeID EMPTY_NODE_ID; diff --git a/ndn-svs/fetcher.cpp b/ndn-svs/fetcher.cpp index 19e37eb..59253b5 100644 --- a/ndn-svs/fetcher.cpp +++ b/ndn-svs/fetcher.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2023 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -19,12 +19,12 @@ namespace ndn::svs { -Fetcher::Fetcher(Face& face, - const SecurityOptions& securityOptions) +Fetcher::Fetcher(Face& face, const SecurityOptions& securityOptions) : m_face(face) , m_scheduler(face.getIoContext()) , m_securityOptions(securityOptions) -{} +{ +} void Fetcher::expressInterest(const ndn::Interest& interest, @@ -36,9 +36,14 @@ Fetcher::expressInterest(const ndn::Interest& interest, { uint64_t id = ++m_interestIdCounter; m_interestQueue.push({ - id, interest, afterSatisfied, afterNacked, - afterTimeout, nRetries, - m_securityOptions.nRetriesOnValidationFail, afterValidationFailed, + id, + interest, + afterSatisfied, + afterNacked, + afterTimeout, + nRetries, + m_securityOptions.nRetriesOnValidationFail, + afterValidationFailed, }); processQueue(); } @@ -60,41 +65,33 @@ Fetcher::expressInterest(const QueuedInterest& qi) void Fetcher::processQueue() { - while (!m_interestQueue.empty() && m_pendingInterests.size() < m_windowSize) - { + while (!m_interestQueue.empty() && m_pendingInterests.size() < m_windowSize) { QueuedInterest i = m_interestQueue.front(); m_interestQueue.pop(); - m_pendingInterests[i.id] = - m_face.expressInterest(i.interest, - std::bind(&Fetcher::onData, this, _1, _2, i), - std::bind(&Fetcher::onNack, this, _1, _2, i), - std::bind(&Fetcher::onTimeout, this, _1, i)); + m_pendingInterests[i.id] = m_face.expressInterest(i.interest, + std::bind(&Fetcher::onData, this, _1, _2, i), + std::bind(&Fetcher::onNack, this, _1, _2, i), + std::bind(&Fetcher::onTimeout, this, _1, i)); } } void -Fetcher::onData(const Interest& interest, const Data& data, - const QueuedInterest& qi) +Fetcher::onData(const Interest& interest, const Data& data, const QueuedInterest& qi) { m_pendingInterests.erase(qi.id); processQueue(); - if (m_securityOptions.validator == nullptr) - { + if (m_securityOptions.validator == nullptr) { // No validator provided qi.afterSatisfied(interest, data); - } - else - { - auto onDataValidated = [qi] (const Data& data) { - qi.afterSatisfied(qi.interest, data); - }; + } else { + auto onDataValidated = [qi](const Data& data) { qi.afterSatisfied(qi.interest, data); }; - auto onValidationFailed = [this, qi] (const Data& data, const ValidationError& error) { + auto onValidationFailed = [this, qi](const Data& data, const ValidationError& error) { if (qi.nRetriesOnValidationFail > 0) { - this->m_scheduler.schedule(ndn::time::milliseconds(this->m_securityOptions.millisBeforeRetryOnValidationFail), - [this, qi] { + this->m_scheduler.schedule( + ndn::time::milliseconds(this->m_securityOptions.millisBeforeRetryOnValidationFail), [this, qi] { QueuedInterest qiNew(qi); qiNew.nRetriesOnValidationFail--; this->expressInterest(qiNew); @@ -112,8 +109,7 @@ Fetcher::onData(const Interest& interest, const Data& data, } void -Fetcher::onNack(const ndn::Interest& interest, const ndn::lp::Nack& nack, - const QueuedInterest& qi) +Fetcher::onNack(const ndn::Interest& interest, const ndn::lp::Nack& nack, const QueuedInterest& qi) { m_pendingInterests.erase(qi.id); processQueue(); @@ -121,13 +117,11 @@ Fetcher::onNack(const ndn::Interest& interest, const ndn::lp::Nack& nack, } void -Fetcher::onTimeout(const Interest& interest, - const QueuedInterest& qi) +Fetcher::onTimeout(const Interest& interest, const QueuedInterest& qi) { m_pendingInterests.erase(qi.id); - if (qi.nRetries == 0) - { + if (qi.nRetries == 0) { processQueue(); return qi.afterTimeout(interest); } diff --git a/ndn-svs/fetcher.hpp b/ndn-svs/fetcher.hpp index eb82dbc..5473fd4 100644 --- a/ndn-svs/fetcher.hpp +++ b/ndn-svs/fetcher.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2022 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -31,34 +31,25 @@ class Fetcher public: Fetcher(Face& face, const SecurityOptions& securityOptions); - void - expressInterest(const ndn::Interest& interest, - const ndn::DataCallback& afterSatisfied, - const ndn::NackCallback& afterNacked, - const ndn::TimeoutCallback& afterTimeout, - int nRetries = 0, - const ndn::security::DataValidationFailureCallback& afterValidationFailed = nullptr); + void expressInterest(const ndn::Interest& interest, + const ndn::DataCallback& afterSatisfied, + const ndn::NackCallback& afterNacked, + const ndn::TimeoutCallback& afterTimeout, + int nRetries = 0, + const ndn::security::DataValidationFailureCallback& afterValidationFailed = nullptr); private: struct QueuedInterest; - void - expressInterest(const QueuedInterest& qi); + void expressInterest(const QueuedInterest& qi); - void - onData(const Interest& interest, const Data& data, - const QueuedInterest& qi); + void onData(const Interest& interest, const Data& data, const QueuedInterest& qi); - void - onNack(const ndn::Interest& interest, const ndn::lp::Nack& nack, - const QueuedInterest& qi); + void onNack(const ndn::Interest& interest, const ndn::lp::Nack& nack, const QueuedInterest& qi); - void - onTimeout(const Interest& interest, - const QueuedInterest& qi); + void onTimeout(const Interest& interest, const QueuedInterest& qi); - void - processQueue(); + void processQueue(); private: Face& m_face; diff --git a/ndn-svs/mapping-provider.cpp b/ndn-svs/mapping-provider.cpp index 6895967..7d47150 100644 --- a/ndn-svs/mapping-provider.cpp +++ b/ndn-svs/mapping-provider.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2023 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -23,21 +23,20 @@ MappingList::MappingList() = default; MappingList::MappingList(const NodeID& nid) : nodeId(nid) -{} +{ +} MappingList::MappingList(const Block& block) { block.parse(); for (auto it = block.elements_begin(); it != block.elements_end(); it++) { - if (it->type() == ndn::tlv::Name) - { + if (it->type() == ndn::tlv::Name) { nodeId = NodeID(*it); continue; } - if (it->type() == tlv::MappingEntry) - { + if (it->type() == tlv::MappingEntry) { it->parse(); // SeqNo and ApplicationName @@ -61,8 +60,7 @@ MappingList::encode() const ndn::encoding::EncodingBuffer enc; size_t totalLength = 0; - for (const auto& [seq, mapping] : pairs) - { + for (const auto& [seq, mapping] : pairs) { size_t entryLength = 0; // Additional blocks @@ -97,10 +95,9 @@ MappingProvider::MappingProvider(const Name& syncPrefix, , m_fetcher(face, securityOptions) , m_securityOptions(securityOptions) { - m_registeredPrefix = - m_face.setInterestFilter(Name(m_id).append(m_syncPrefix).append("MAPPING"), - std::bind(&MappingProvider::onMappingQuery, this, _2), - [] (auto&&...) {}); + m_registeredPrefix = m_face.setInterestFilter(Name(m_id).append(m_syncPrefix).append("MAPPING"), + std::bind(&MappingProvider::onMappingQuery, this, _2), + [](auto&&...) {}); } void @@ -121,13 +118,11 @@ MappingProvider::onMappingQuery(const Interest& interest) MissingDataInfo query = parseMappingQueryDataName(interest.getName()); MappingList queryResponse(query.nodeId); - for (SeqNo i = query.low; i <= std::max(query.high, query.low); i++) - { + for (SeqNo i = query.low; i <= std::max(query.high, query.low); i++) { try { auto mapping = getMapping(query.nodeId, i); queryResponse.pairs.emplace_back(i, mapping); - } - catch (const std::exception&) { + } catch (const std::exception&) { // TODO: don't give up if not everything is found // Instead return whatever we have and let the client request // the remaining mappings again @@ -151,7 +146,7 @@ MappingProvider::fetchNameMapping(const MissingDataInfo& info, const MappingListCallback& onValidated, int nRetries) { - TimeoutCallback onTimeout = [] (auto&&...) {}; + TimeoutCallback onTimeout = [](auto&&...) {}; return fetchNameMapping(info, onValidated, onTimeout, nRetries); } @@ -167,8 +162,7 @@ MappingProvider::fetchNameMapping(const MissingDataInfo& info, interest.setMustBeFresh(false); interest.setInterestLifetime(2_s); - auto onDataValidated = [this, onValidated, info] (const Data& data) - { + auto onDataValidated = [this, onValidated, info](const Data& data) { Block block = data.getContent().blockFromValue(); MappingList list(block); @@ -176,8 +170,7 @@ MappingProvider::fetchNameMapping(const MissingDataInfo& info, for (const auto& [seq, mapping] : list.pairs) { try { getMapping(info.nodeId, seq); - } - catch (const std::exception&) { + } catch (const std::exception&) { insertMapping(info.nodeId, seq, mapping); } } @@ -188,17 +181,19 @@ MappingProvider::fetchNameMapping(const MissingDataInfo& info, m_fetcher.expressInterest(interest, std::bind(onDataValidated, _2), std::bind(onTimeout, _1), // Nack - onTimeout, nRetries, - [] (auto&&...) {}); + onTimeout, + nRetries, + [](auto&&...) {}); } Name MappingProvider::getMappingQueryDataName(const MissingDataInfo& info) { - return Name(info.nodeId).append(m_syncPrefix) - .append("MAPPING") - .appendNumber(info.low) - .appendNumber(info.high); + return Name(info.nodeId) + .append(m_syncPrefix) + .append("MAPPING") + .appendNumber(info.low) + .appendNumber(info.high); } MissingDataInfo diff --git a/ndn-svs/mapping-provider.hpp b/ndn-svs/mapping-provider.hpp index 73490d6..7c99812 100644 --- a/ndn-svs/mapping-provider.hpp +++ b/ndn-svs/mapping-provider.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2022 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -34,16 +34,13 @@ class MappingList public: MappingList(); - explicit - MappingList(const NodeID& nid); + explicit MappingList(const NodeID& nid); /// @brief Decode from Block - explicit - MappingList(const Block& block); + explicit MappingList(const Block& block); /// @brief Encode to Block - Block - encode() const; + Block encode() const; public: NodeID nodeId; @@ -61,24 +58,21 @@ class MappingProvider : noncopyable ndn::Face& face, const SecurityOptions& securityOptions); - virtual - ~MappingProvider() = default; + virtual ~MappingProvider() = default; using MappingListCallback = std::function; /** * @brief Insert a mapping entry into the store */ - void - insertMapping(const NodeID& nodeId, const SeqNo& seqNo, const MappingEntryPair& entry); + void insertMapping(const NodeID& nodeId, const SeqNo& seqNo, const MappingEntryPair& entry); /** * @brief Get a mapping and throw if not found * * @returns Corresponding application name */ - MappingEntryPair - getMapping(const NodeID& nodeId, const SeqNo& seqNo); + MappingEntryPair getMapping(const NodeID& nodeId, const SeqNo& seqNo); /** * @brief Retrieve the data mappings for encapsulated data packets @@ -86,10 +80,9 @@ class MappingProvider : noncopyable * @param info Query info * @param onValidated Callback when mapping is fetched and validated */ - void - fetchNameMapping(const MissingDataInfo& info, - const MappingListCallback& onValidated, - int nRetries = 0); + void fetchNameMapping(const MissingDataInfo& info, + const MappingListCallback& onValidated, + int nRetries = 0); /** * @brief Retrieve the data mappings for encapsulated data packets @@ -98,27 +91,23 @@ class MappingProvider : noncopyable * @param onValidated Callback when mapping is fetched and validated * @param onTimeout Callback when mapping is not retrieved */ - void - fetchNameMapping(const MissingDataInfo& info, - const MappingListCallback& onValidated, - const TimeoutCallback& onTimeout, - int nRetries = 0); + void fetchNameMapping(const MissingDataInfo& info, + const MappingListCallback& onValidated, + const TimeoutCallback& onTimeout, + int nRetries = 0); private: /** * @brief Return data name for mapping query */ - Name - getMappingQueryDataName(const MissingDataInfo& info); + Name getMappingQueryDataName(const MissingDataInfo& info); /** * @brief Return the query from mapping data name */ - MissingDataInfo - parseMappingQueryDataName(const Name& name); + MissingDataInfo parseMappingQueryDataName(const Name& name); - void - onMappingQuery(const Interest& interest); + void onMappingQuery(const Interest& interest); private: const Name m_syncPrefix; diff --git a/ndn-svs/security-options.cpp b/ndn-svs/security-options.cpp index 59c2852..2927675 100644 --- a/ndn-svs/security-options.cpp +++ b/ndn-svs/security-options.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2022 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -18,7 +18,7 @@ namespace ndn::svs { -const SecurityOptions SecurityOptions::DEFAULT{SecurityOptions::DEFAULT_KEYCHAIN}; +const SecurityOptions SecurityOptions::DEFAULT{ SecurityOptions::DEFAULT_KEYCHAIN }; BaseSigner::~BaseSigner() = default; diff --git a/ndn-svs/security-options.hpp b/ndn-svs/security-options.hpp index 53517fb..ac34621 100644 --- a/ndn-svs/security-options.hpp +++ b/ndn-svs/security-options.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2022 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -27,18 +27,16 @@ namespace ndn::svs { class BaseValidator : noncopyable { public: - virtual - ~BaseValidator() = default; + virtual ~BaseValidator() = default; /** * @brief Asynchronously validate @p data * * @note @p successCb and @p failureCb must not be nullptr */ - virtual void - validate(const Data& data, - const ndn::security::DataValidationSuccessCallback& successCb, - const ndn::security::DataValidationFailureCallback& failureCb) + virtual void validate(const Data& data, + const ndn::security::DataValidationSuccessCallback& successCb, + const ndn::security::DataValidationFailureCallback& failureCb) { successCb(data); } @@ -48,10 +46,9 @@ class BaseValidator : noncopyable * * @note @p successCb and @p failureCb must not be nullptr */ - virtual void - validate(const Interest& interest, - const ndn::security::InterestValidationSuccessCallback& successCb, - const ndn::security::InterestValidationFailureCallback& failureCb) + virtual void validate(const Interest& interest, + const ndn::security::InterestValidationSuccessCallback& successCb, + const ndn::security::InterestValidationFailureCallback& failureCb) { successCb(interest); } @@ -63,14 +60,11 @@ class BaseValidator : noncopyable class BaseSigner : noncopyable { public: - virtual - ~BaseSigner(); + virtual ~BaseSigner(); - virtual void - sign(Interest& interest) const {} + virtual void sign(Interest& interest) const {} - virtual void - sign(Data& data) const {} + virtual void sign(Data& data) const {} public: security::SigningInfo signingInfo; @@ -82,16 +76,14 @@ class BaseSigner : noncopyable class KeyChainSigner : public BaseSigner { public: - explicit - KeyChainSigner(KeyChain& keyChain) + explicit KeyChainSigner(KeyChain& keyChain) : m_keyChain(keyChain) - {} + { + } - void - sign(Interest& interest) const override; + void sign(Interest& interest) const override; - void - sign(Data& data) const override; + void sign(Data& data) const override; private: KeyChain& m_keyChain; @@ -103,8 +95,7 @@ class KeyChainSigner : public BaseSigner class SecurityOptions { public: - explicit - SecurityOptions(KeyChain& keyChain); + explicit SecurityOptions(KeyChain& keyChain); public: /** Signing options for sync interests */ diff --git a/ndn-svs/store-memory.hpp b/ndn-svs/store-memory.hpp index 1483c8e..3c4c7f4 100644 --- a/ndn-svs/store-memory.hpp +++ b/ndn-svs/store-memory.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2022 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -26,14 +26,12 @@ namespace ndn::svs { class MemoryDataStore : public DataStore { public: - std::shared_ptr - find(const Interest& interest) override + std::shared_ptr find(const Interest& interest) override { return m_ims.find(interest); } - void - insert(const Data& data) override + void insert(const Data& data) override { return m_ims.insert(data); } diff --git a/ndn-svs/store.hpp b/ndn-svs/store.hpp index 04eba07..cd0fa9d 100644 --- a/ndn-svs/store.hpp +++ b/ndn-svs/store.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2022 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -24,14 +24,11 @@ namespace ndn::svs { class DataStore : noncopyable { public: - virtual - ~DataStore() = default; + virtual ~DataStore() = default; - virtual std::shared_ptr - find(const Interest& interest) = 0; + virtual std::shared_ptr find(const Interest& interest) = 0; - virtual void - insert(const Data& data) = 0; + virtual void insert(const Data& data) = 0; }; } // namespace ndn::svs diff --git a/ndn-svs/svspubsub.cpp b/ndn-svs/svspubsub.cpp index 723c890..edcd003 100644 --- a/ndn-svs/svspubsub.cpp +++ b/ndn-svs/svspubsub.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2021-2023 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -34,9 +34,12 @@ SVSPubSub::SVSPubSub(const Name& syncPrefix, , m_onUpdate(std::move(updateCallback)) , m_opts(options) , m_securityOptions(securityOptions) - , m_svsync(syncPrefix, nodePrefix, face, + , m_svsync(syncPrefix, + nodePrefix, + face, std::bind(&SVSPubSub::updateCallbackInternal, this, _1), - securityOptions, options.dataStore) + securityOptions, + options.dataStore) , m_mappingProvider(syncPrefix, nodePrefix, face, securityOptions) { m_svsync.getCore().setGetExtraBlockCallback(std::bind(&SVSPubSub::onGetExtraData, this, _1)); @@ -44,8 +47,10 @@ SVSPubSub::SVSPubSub(const Name& syncPrefix, } SeqNo -SVSPubSub::publish(const Name& name, span value, - const Name& nodePrefix, time::milliseconds freshnessPeriod, +SVSPubSub::publish(const Name& name, + span value, + const Name& nodePrefix, + time::milliseconds freshnessPeriod, std::vector mappingBlocks) { // Segment the data if larger than MAX_DATA_SIZE @@ -56,8 +61,7 @@ SVSPubSub::publish(const Name& name, span value, NodeID nid = nodePrefix == EMPTY_NAME ? m_dataPrefix : nodePrefix; SeqNo seqNo = m_svsync.getCore().getSeqNo(nid) + 1; - for (size_t i = 0; i < nSegments; i++) - { + for (size_t i = 0; i < nSegments; i++) { // Create encapsulated segment auto segmentName = Name(name).appendVersion(0).appendSegment(i); auto segment = Data(segmentName); @@ -71,17 +75,15 @@ SVSPubSub::publish(const Name& name, span value, m_securityOptions.dataSigner->sign(segment); // Insert outer segment - m_svsync.insertDataSegment(segment.wireEncode(), freshnessPeriod, - nid, seqNo, i, finalBlock, ndn::tlv::Data); + m_svsync.insertDataSegment( + segment.wireEncode(), freshnessPeriod, nid, seqNo, i, finalBlock, ndn::tlv::Data); } // Insert mapping and manually update the sequence number insertMapping(nid, seqNo, name, mappingBlocks); m_svsync.getCore().updateSeqNo(seqNo, nid); return seqNo; - } - else - { + } else { ndn::Data data(name); data.setContent(value); data.setFreshnessPeriod(freshnessPeriod); @@ -91,8 +93,7 @@ SVSPubSub::publish(const Name& name, span value, } SeqNo -SVSPubSub::publishPacket(const Data& data, const Name& nodePrefix, - std::vector mappingBlocks) +SVSPubSub::publishPacket(const Data& data, const Name& nodePrefix, std::vector mappingBlocks) { NodeID nid = nodePrefix == EMPTY_NAME ? m_dataPrefix : nodePrefix; SeqNo seqNo = m_svsync.publishData(data.wireEncode(), data.getFreshnessPeriod(), nid, ndn::tlv::Data); @@ -101,17 +102,16 @@ SVSPubSub::publishPacket(const Data& data, const Name& nodePrefix, } void -SVSPubSub::insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name, - std::vector additional) +SVSPubSub::insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name, std::vector additional) { // additional is a copy deliberately // this way we can add well-known mappings to the list // add timestamp block if (m_opts.useTimestamp) { - unsigned long now = - std::chrono::duration_cast - (std::chrono::system_clock::now().time_since_epoch()).count(); + unsigned long now = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); auto timestamp = Name::Component::fromNumber(now, tlv::TimestampNameComponent); additional.push_back(timestamp); } @@ -120,8 +120,7 @@ SVSPubSub::insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name, MappingEntryPair entry = { name, additional }; // notify subscribers in next sync interest - if (m_notificationMappingList.nodeId == EMPTY_NAME || m_notificationMappingList.nodeId == nid) - { + if (m_notificationMappingList.nodeId == EMPTY_NAME || m_notificationMappingList.nodeId == nid) { m_notificationMappingList.nodeId = nid; m_notificationMappingList.pairs.push_back({ seqNo, entry }); } @@ -140,8 +139,10 @@ SVSPubSub::subscribe(const Name& prefix, const SubscriptionCallback& callback, b } uint32_t -SVSPubSub::subscribeToProducer(const Name& nodePrefix, const SubscriptionCallback& callback, - bool prefetch, bool packets) +SVSPubSub::subscribeToProducer(const Name& nodePrefix, + const SubscriptionCallback& callback, + bool prefetch, + bool packets) { uint32_t handle = ++m_subscriptionCount; Subscription sub = { handle, nodePrefix, callback, packets, prefetch }; @@ -152,12 +153,9 @@ SVSPubSub::subscribeToProducer(const Name& nodePrefix, const SubscriptionCallbac void SVSPubSub::unsubscribe(uint32_t handle) { - auto unsub = [handle](std::vector& subs) - { - for (auto it = subs.begin(); it != subs.end(); ++it) - { - if (it->id == handle) - { + auto unsub = [handle](std::vector& subs) { + for (auto it = subs.begin(); it != subs.end(); ++it) { + if (it->id == handle) { subs.erase(it); return; } @@ -171,68 +169,60 @@ SVSPubSub::unsubscribe(uint32_t handle) void SVSPubSub::updateCallbackInternal(const std::vector& info) { - for (const auto& stream : info) - { + for (const auto& stream : info) { Name streamName(stream.nodeId); // Producer subscriptions - for (const auto& sub : m_producerSubscriptions) - { - if (sub.prefix.isPrefixOf(streamName)) - { + for (const auto& sub : m_producerSubscriptions) { + if (sub.prefix.isPrefixOf(streamName)) { // Add to fetching queue for (SeqNo i = stream.low; i <= stream.high; i++) m_fetchMap[std::pair(stream.nodeId, i)].push_back(sub); // Prefetch next available data if (sub.prefetch) - m_svsync.fetchData(stream.nodeId, stream.high + 1, [] (auto&&...) {}); // do nothing with prefetch + m_svsync.fetchData(stream.nodeId, stream.high + 1, [](auto&&...) {}); // do nothing with prefetch } } // Fetch all mappings if we have prefix subscription(s) - if (!m_prefixSubscriptions.empty()) - { + if (!m_prefixSubscriptions.empty()) { MissingDataInfo remainingInfo = stream; // Attemt to find what we already know about mapping // This typically refers to the Sync Interest mapping optimization, // where the Sync Interest contains the notification mapping list - for (SeqNo i = remainingInfo.low; i <= remainingInfo.high; i++) - { - try - { + for (SeqNo i = remainingInfo.low; i <= remainingInfo.high; i++) { + try { // throws if mapping not found this->processMapping(stream.nodeId, i); remainingInfo.low++; - } - catch (const std::exception&) - { + } catch (const std::exception&) { break; } } // Find from network what we don't yet know - while (remainingInfo.high >= remainingInfo.low) - { + while (remainingInfo.high >= remainingInfo.low) { // Fetch a max of 10 entries per request // This is to ensure the mapping response does not overflow // TODO: implement a better solution to this issue MissingDataInfo truncatedRemainingInfo = remainingInfo; - if (truncatedRemainingInfo.high - truncatedRemainingInfo.low > 10) - { + if (truncatedRemainingInfo.high - truncatedRemainingInfo.low > 10) { truncatedRemainingInfo.high = truncatedRemainingInfo.low + 10; } - m_mappingProvider.fetchNameMapping(truncatedRemainingInfo, - [this, remainingInfo, streamName] (const MappingList& list) { + m_mappingProvider.fetchNameMapping( + truncatedRemainingInfo, + [this, remainingInfo, streamName](const MappingList& list) { bool queued = false; for (const auto& [seq, mapping] : list.pairs) queued |= this->processMapping(streamName, seq); if (queued) this->fetchAll(); - }, -1); + }, + -1); remainingInfo.low += 11; } @@ -250,18 +240,16 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) auto mapping = m_mappingProvider.getMapping(nodeId, seqNo); // check if timestamp is too old - if (m_opts.maxPubAge > 0_ms) - { + if (m_opts.maxPubAge > 0_ms) { // look for the additional timestamp block // if no timestamp block is present, we just skip this step - for (const auto& block : mapping.second) - { + for (const auto& block : mapping.second) { if (block.type() != tlv::TimestampNameComponent) continue; - unsigned long now = - std::chrono::duration_cast - (std::chrono::system_clock::now().time_since_epoch()).count(); + unsigned long now = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); unsigned long pubTime = Name::Component(block).toNumber(); unsigned long maxAge = time::microseconds(m_opts.maxPubAge).count(); @@ -273,10 +261,8 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) // check if known mapping matches subscription bool queued = false; - for (const auto& sub : m_prefixSubscriptions) - { - if (sub.prefix.isPrefixOf(mapping.first)) - { + for (const auto& sub : m_prefixSubscriptions) { + if (sub.prefix.isPrefixOf(mapping.first)) { m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); queued = true; } @@ -288,8 +274,7 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) void SVSPubSub::fetchAll() { - for (const auto& pair : m_fetchMap) - { + for (const auto& pair : m_fetchMap) { // Check if already fetching this publication auto key = pair.first; if (m_fetchingMap.find(key) != m_fetchingMap.end()) @@ -317,37 +302,31 @@ SVSPubSub::onSyncData(const Data& firstData, const std::pair& publi // Return data to packet subscriptions SubscriptionData subData = { - innerData.getName(), - innerContent.value_bytes(), - publication.first, - publication.second, - innerData, + innerData.getName(), innerContent.value_bytes(), publication.first, publication.second, innerData, }; // Function to return data to subscriptions - auto returnData = [this, firstData, subData, publication] () - { + auto returnData = [this, firstData, subData, publication]() { bool hasFinalBlock = subData.packet.value().getFinalBlock().has_value(); bool hasBlobSubcriptions = false; - for (const auto& sub : this->m_fetchMap[publication]) - { + for (const auto& sub : this->m_fetchMap[publication]) { if (sub.isPacketSubscription || !hasFinalBlock) sub.callback(subData); hasBlobSubcriptions |= !sub.isPacketSubscription; } - // If there are blob subscriptions and a final block, we need to fetch remaining segments - if (hasBlobSubcriptions && hasFinalBlock && firstData.getName().size() > 2) - { + // If there are blob subscriptions and a final block, we need to fetch + // remaining segments + if (hasBlobSubcriptions && hasFinalBlock && firstData.getName().size() > 2) { // Fetch remaining segments auto pubName = firstData.getName().getPrefix(-2); Interest interest(pubName); // strip off version and segment number ndn::SegmentFetcher::Options opts; auto fetcher = ndn::SegmentFetcher::start(m_face, interest, m_nullValidator, opts); - fetcher->onComplete.connectSingleShot([this, publication] (const ndn::ConstBufferPtr& data) { + fetcher->onComplete.connectSingleShot([this, publication](const ndn::ConstBufferPtr& data) { try { // Binary BLOB to return to app auto finalBuffer = std::make_shared>(std::vector(data->size())); @@ -370,35 +349,30 @@ SVSPubSub::onSyncData(const Data& firstData, const std::pair& publi auto innerName = Data(block.elements()[0]).getName().getPrefix(-2); // Function to send final buffer to subscriptions if possible - auto sendFinalBuffer = [this, innerName, publication, finalBuffer, bufSize, numFailed, numValidated, numElem] - { - if (*numValidated + *numFailed != numElem) - return; - - if (*numFailed > 0) // abort - return this->cleanUpFetch(publication); - - // Resize buffer to actual size - finalBuffer->resize(*bufSize); - - // Return data to packet subscriptions - SubscriptionData subData = { - innerName, - *finalBuffer, - publication.first, - publication.second, - std::nullopt, - }; + auto sendFinalBuffer = + [this, innerName, publication, finalBuffer, bufSize, numFailed, numValidated, numElem] { + if (*numValidated + *numFailed != numElem) + return; + + if (*numFailed > 0) // abort + return this->cleanUpFetch(publication); + + // Resize buffer to actual size + finalBuffer->resize(*bufSize); - for (const auto& sub : this->m_fetchMap[publication]) - if (!sub.isPacketSubscription) - sub.callback(subData); + // Return data to packet subscriptions + SubscriptionData subData = { + innerName, *finalBuffer, publication.first, publication.second, std::nullopt, + }; - this->cleanUpFetch(publication); - }; + for (const auto& sub : this->m_fetchMap[publication]) + if (!sub.isPacketSubscription) + sub.callback(subData); - for (size_t i = 0; i < numElem; i++) - { + this->cleanUpFetch(publication); + }; + + for (size_t i = 0; i < numElem; i++) { Data innerData(block.elements()[i]); // Copy actual binary data to final buffer @@ -408,12 +382,13 @@ SVSPubSub::onSyncData(const Data& firstData, const std::pair& publi // Validate inner data if (hasValidator) { - this->m_securityOptions.encapsulatedDataValidator->validate(innerData, - [sendFinalBuffer, numValidated] (auto&&...) { + this->m_securityOptions.encapsulatedDataValidator->validate( + innerData, + [sendFinalBuffer, numValidated](auto&&...) { *numValidated += 1; sendFinalBuffer(); }, - [sendFinalBuffer, numFailed] (auto&&...) { + [sendFinalBuffer, numFailed](auto&&...) { *numFailed += 1; sendFinalBuffer(); }); @@ -423,15 +398,12 @@ SVSPubSub::onSyncData(const Data& firstData, const std::pair& publi } sendFinalBuffer(); - } - catch (const std::exception&) { + } catch (const std::exception&) { cleanUpFetch(publication); } }); fetcher->onError.connectSingleShot(std::bind(&SVSPubSub::cleanUpFetch, this, publication)); - } - else - { + } else { cleanUpFetch(publication); } }; @@ -439,11 +411,8 @@ SVSPubSub::onSyncData(const Data& firstData, const std::pair& publi // Validate encapsulated packet if (m_securityOptions.encapsulatedDataValidator) { m_securityOptions.encapsulatedDataValidator->validate( - innerData, - [&] (auto&&...) { returnData(); }, - [] (auto&&...) {}); - } - else { + innerData, [&](auto&&...) { returnData(); }, [](auto&&...) {}); + } else { returnData(); } } @@ -466,15 +435,13 @@ SVSPubSub::onGetExtraData(const VersionVector&) void SVSPubSub::onRecvExtraData(const Block& block) { - try - { + try { MappingList list(block); - for (const auto& p : list.pairs) - { + for (const auto& p : list.pairs) { m_mappingProvider.insertMapping(list.nodeId, p.first, p.second); } + } catch (const std::exception&) { } - catch (const std::exception&) {} } } // namespace ndn::svs diff --git a/ndn-svs/svspubsub.hpp b/ndn-svs/svspubsub.hpp index 941d4da..c34686f 100644 --- a/ndn-svs/svspubsub.hpp +++ b/ndn-svs/svspubsub.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2021-2023 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -19,8 +19,8 @@ #include "core.hpp" #include "mapping-provider.hpp" -#include "store.hpp" #include "security-options.hpp" +#include "store.hpp" #include "svsync.hpp" #include @@ -77,8 +77,7 @@ class SVSPubSub : noncopyable const SVSPubSubOptions& options = {}, const SecurityOptions& securityOptions = SecurityOptions::DEFAULT); - virtual - ~SVSPubSub() = default; + virtual ~SVSPubSub() = default; struct SubscriptionData { @@ -108,13 +107,14 @@ class SVSPubSub : noncopyable * @param value data buffer * @param nodePrefix Name to publish the data under * @param freshnessPeriod freshness period for the data - * @param mappingBlocks Additional blocks to be published with the mapping (use sparingly) + * @param mappingBlocks Additional blocks to be published with the mapping + * (use sparingly) */ - SeqNo - publish(const Name& name, span value, - const Name& nodePrefix = EMPTY_NAME, - time::milliseconds freshnessPeriod = FRESH_FOREVER, - std::vector mappingBlocks = {}); + SeqNo publish(const Name& name, + span value, + const Name& nodePrefix = EMPTY_NAME, + time::milliseconds freshnessPeriod = FRESH_FOREVER, + std::vector mappingBlocks = {}); /** * @brief Subscribe to a application name prefix. @@ -125,8 +125,7 @@ class SVSPubSub : noncopyable * * @returns Handle to the subscription */ - uint32_t - subscribe(const Name& prefix, const SubscriptionCallback& callback, bool packets = false); + uint32_t subscribe(const Name& prefix, const SubscriptionCallback& callback, bool packets = false); /** * @brief Subscribe to a data producer @@ -138,17 +137,17 @@ class SVSPubSub : noncopyable * * @returns Handle to the subscription */ - uint32_t - subscribeToProducer(const Name& nodePrefix, const SubscriptionCallback& callback, - bool prefetch = false, bool packets = false); + uint32_t subscribeToProducer(const Name& nodePrefix, + const SubscriptionCallback& callback, + bool prefetch = false, + bool packets = false); /** * @brief Unsubscribe from a stream using a handle * * @param handle Handle received during subscription */ - void - unsubscribe(uint32_t handle); + void unsubscribe(uint32_t handle); /** * @brief Publish a encapsulated Data packet in the session and trigger @@ -159,16 +158,15 @@ class SVSPubSub : noncopyable * * @param data Data packet to publish * @param nodePrefix Name to publish the data under - * @param mappingBlocks Additional blocks to be published with the mapping (use sparingly) + * @param mappingBlocks Additional blocks to be published with the mapping + * (use sparingly) */ - SeqNo - publishPacket(const Data& data, - const Name& nodePrefix = EMPTY_NAME, - std::vector mappingBlocks = {}); + SeqNo publishPacket(const Data& data, + const Name& nodePrefix = EMPTY_NAME, + std::vector mappingBlocks = {}); /** @brief Get the underlying sync */ - SVSync& - getSVSync() + SVSync& getSVSync() { return m_svsync; } @@ -183,36 +181,27 @@ class SVSPubSub : noncopyable bool prefetch; }; - void - onSyncData(const Data& syncData, const std::pair& publication); + void onSyncData(const Data& syncData, const std::pair& publication); - void - updateCallbackInternal(const std::vector& info); + void updateCallbackInternal(const std::vector& info); - Block - onGetExtraData(const VersionVector& vv); + Block onGetExtraData(const VersionVector& vv); - void - onRecvExtraData(const Block& block); + void onRecvExtraData(const Block& block); /// @brief Insert a mapping entry into the store - void - insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name, - std::vector additional); + void insertMapping(const NodeID& nid, SeqNo seqNo, const Name& name, std::vector additional); /** * @brief Get and process mapping from store. * @returns true if new publications were queued for fetch * @throws std::exception error if mapping is not found */ - bool - processMapping(const NodeID& nodeId, SeqNo seqNo); + bool processMapping(const NodeID& nodeId, SeqNo seqNo); - void - fetchAll(); + void fetchAll(); - void - cleanUpFetch(const std::pair& publication); + void cleanUpFetch(const std::pair& publication); public: static inline const Name EMPTY_NAME; diff --git a/ndn-svs/svsync-base.cpp b/ndn-svs/svsync-base.cpp index c5dda36..0f2f4b8 100644 --- a/ndn-svs/svsync-base.cpp +++ b/ndn-svs/svsync-base.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2023 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -43,22 +43,24 @@ SVSyncBase::SVSyncBase(const Name& syncPrefix, m_dataStore = std::make_shared(); // Register data prefix - m_registeredDataPrefix = - m_face.setInterestFilter(m_dataPrefix, - std::bind(&SVSyncBase::onDataInterest, this, _2), - [] (auto&&...) {}); + m_registeredDataPrefix = m_face.setInterestFilter( + m_dataPrefix, std::bind(&SVSyncBase::onDataInterest, this, _2), [](auto&&...) {}); } SeqNo -SVSyncBase::publishData(const uint8_t* buf, size_t len, const ndn::time::milliseconds& freshness, +SVSyncBase::publishData(const uint8_t* buf, + size_t len, + const ndn::time::milliseconds& freshness, const NodeID& nid) { - return publishData(ndn::encoding::makeBinaryBlock(ndn::tlv::Content, {buf, len}), freshness, nid); + return publishData(ndn::encoding::makeBinaryBlock(ndn::tlv::Content, { buf, len }), freshness, nid); } SeqNo -SVSyncBase::publishData(const Block& content, const ndn::time::milliseconds& freshness, - const NodeID& id, uint32_t contentType) +SVSyncBase::publishData(const Block& content, + const ndn::time::milliseconds& freshness, + const NodeID& id, + uint32_t contentType) { NodeID pubId = id != EMPTY_NODE_ID ? id : m_id; SeqNo newSeq = m_core.getSeqNo(pubId) + 1; @@ -79,9 +81,13 @@ SVSyncBase::publishData(const Block& content, const ndn::time::milliseconds& fre } void -SVSyncBase::insertDataSegment(const Block& content, const ndn::time::milliseconds& freshness, - const NodeID& nid, const SeqNo seq, const size_t segNo, - const Name::Component& finalBlock, uint32_t contentType) +SVSyncBase::insertDataSegment(const Block& content, + const ndn::time::milliseconds& freshness, + const NodeID& nid, + const SeqNo seq, + const size_t segNo, + const Name::Component& finalBlock, + uint32_t contentType) { Name dataName = getDataName(nid, seq).appendVersion(0).appendSegment(segNo); auto data = std::make_shared(dataName); @@ -102,17 +108,20 @@ SVSyncBase::onDataInterest(const Interest& interest) } void -SVSyncBase::fetchData(const NodeID& nid, const SeqNo& seqNo, - const DataValidatedCallback& onValidated, int nRetries) +SVSyncBase::fetchData(const NodeID& nid, + const SeqNo& seqNo, + const DataValidatedCallback& onValidated, + int nRetries) { DataValidationErrorCallback onValidationFailed = std::bind(&SVSyncBase::onDataValidationFailed, this, _1, _2); - TimeoutCallback onTimeout = [] (auto&&...) {}; + TimeoutCallback onTimeout = [](auto&&...) {}; fetchData(nid, seqNo, onValidated, onValidationFailed, onTimeout, nRetries); } void -SVSyncBase::fetchData(const NodeID& nid, const SeqNo& seqNo, +SVSyncBase::fetchData(const NodeID& nid, + const SeqNo& seqNo, const DataValidatedCallback& onValidated, const DataValidationErrorCallback& onValidationFailed, const TimeoutCallback& onTimeout, @@ -126,12 +135,13 @@ SVSyncBase::fetchData(const NodeID& nid, const SeqNo& seqNo, m_fetcher.expressInterest(interest, std::bind(&SVSyncBase::onDataValidated, this, _2, onValidated), std::bind(onTimeout, _1), // Nack - onTimeout, nRetries, onValidationFailed); + onTimeout, + nRetries, + onValidationFailed); } void -SVSyncBase::onDataValidated(const Data& data, - const DataValidatedCallback& dataCallback) +SVSyncBase::onDataValidated(const Data& data, const DataValidatedCallback& dataCallback) { if (shouldCache(data)) m_dataStore->insert(data); @@ -140,8 +150,7 @@ SVSyncBase::onDataValidated(const Data& data, } void -SVSyncBase::onDataValidationFailed(const Data& data, - const ValidationError& error) +SVSyncBase::onDataValidationFailed(const Data& data, const ValidationError& error) { } diff --git a/ndn-svs/svsync-base.hpp b/ndn-svs/svsync-base.hpp index 5e22a25..8daa8fd 100644 --- a/ndn-svs/svsync-base.hpp +++ b/ndn-svs/svsync-base.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2023 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -53,11 +53,11 @@ class SVSyncBase : noncopyable const SecurityOptions& securityOptions = SecurityOptions::DEFAULT, std::shared_ptr dataStore = DEFAULT_DATASTORE); - virtual - ~SVSyncBase() = default; + virtual ~SVSyncBase() = default; /** - * @brief Publish a data packet in the session and trigger synchronization updates + * @brief Publish a data packet in the session and trigger synchronization + * updates * * This method will create a data packet with the supplied content. * The packet name is the local session + seqNo. @@ -70,12 +70,14 @@ class SVSyncBase : noncopyable * * @returns Sequence number of the published data packet */ - SeqNo - publishData(const uint8_t* buf, size_t len, const ndn::time::milliseconds& freshness, - const NodeID& id = EMPTY_NODE_ID); + SeqNo publishData(const uint8_t* buf, + size_t len, + const ndn::time::milliseconds& freshness, + const NodeID& id = EMPTY_NODE_ID); /** - * @brief Publish a data packet in the session and trigger synchronization updates + * @brief Publish a data packet in the session and trigger synchronization + * updates * * This method will create a data packet with the supplied content. * The packet name is the local session + seqNo. @@ -87,10 +89,10 @@ class SVSyncBase : noncopyable * * @returns Sequence number of the published data packet */ - SeqNo - publishData(const Block& content, const ndn::time::milliseconds& freshness, - const NodeID& nid = EMPTY_NODE_ID, - uint32_t contentType = ndn::tlv::Content); + SeqNo publishData(const Block& content, + const ndn::time::milliseconds& freshness, + const NodeID& nid = EMPTY_NODE_ID, + uint32_t contentType = ndn::tlv::Content); /** * Insert segment into the store without changing the sequence number. @@ -103,52 +105,55 @@ class SVSyncBase : noncopyable * @param finalBlock FinalBlockId of the data packet * @param contentType Content type of the data packet */ - void - insertDataSegment(const Block& content, const ndn::time::milliseconds& freshness, - const NodeID& nid, const SeqNo seq, const size_t segNo, - const Name::Component& finalBlock, - uint32_t contentType = ndn::tlv::Content); + void insertDataSegment(const Block& content, + const ndn::time::milliseconds& freshness, + const NodeID& nid, + const SeqNo seq, + const size_t segNo, + const Name::Component& finalBlock, + uint32_t contentType = ndn::tlv::Content); /** * @brief Retrive a data packet with a particular seqNo from a session * * @param nid The name of the target node * @param seq The seqNo of the data packet. - * @param onValidated The callback when the retrieved packet has been validated. + * @param onValidated The callback when the retrieved packet has been + * validated. * @param nRetries The number of retries. */ - void - fetchData(const NodeID& nid, const SeqNo& seq, - const DataValidatedCallback& onValidated, - int nRetries = 0); + void fetchData(const NodeID& nid, + const SeqNo& seq, + const DataValidatedCallback& onValidated, + int nRetries = 0); /** * @brief Retrive a data packet with a particular seqNo from a session * * @param nid The name of the target node * @param seq The seqNo of the data packet. - * @param onValidated The callback when the retrieved packet has been validated. - * @param onValidationFailed The callback when the retrieved packet failed validation. + * @param onValidated The callback when the retrieved packet has been + * validated. + * @param onValidationFailed The callback when the retrieved packet failed + * validation. * @param onTimeout The callback when data is not retrieved. * @param nRetries The number of retries. */ - void - fetchData(const NodeID& nid, const SeqNo& seq, - const DataValidatedCallback& onValidated, - const DataValidationErrorCallback& onValidationFailed, - const TimeoutCallback& onTimeout, - int nRetries = 0); + void fetchData(const NodeID& nid, + const SeqNo& seq, + const DataValidatedCallback& onValidated, + const DataValidationErrorCallback& onValidationFailed, + const TimeoutCallback& onTimeout, + int nRetries = 0); /** @brief Get the underlying data store */ - DataStore& - getDataStore() + DataStore& getDataStore() { return *m_dataStore; } /** @brief Get the underlying SVS core */ - SVSyncCore& - getCore() + SVSyncCore& getCore() { return m_core; } @@ -162,32 +167,25 @@ class SVSyncBase : noncopyable * data prefix for proper functionality, or the application must * independently produce data under the prefix. */ - virtual Name - getDataName(const NodeID& nid, const SeqNo& seqNo) = 0; + virtual Name getDataName(const NodeID& nid, const SeqNo& seqNo) = 0; public: static inline const NodeID EMPTY_NODE_ID; static inline const std::shared_ptr DEFAULT_DATASTORE; private: - void - onDataInterest(const Interest& interest); + void onDataInterest(const Interest& interest); - void - onDataValidated(const Data& data, - const DataValidatedCallback& dataCallback); + void onDataValidated(const Data& data, const DataValidatedCallback& dataCallback); - void - onDataValidationFailed(const Data& data, - const ValidationError& error); + void onDataValidationFailed(const Data& data, const ValidationError& error); /** * Determines whether a particular data packet is to be cached * Can be used to cache data packets from other nodes when * using multicast data interests. */ - virtual bool - shouldCache(const Data& data) const + virtual bool shouldCache(const Data& data) const { return false; } diff --git a/ndn-svs/svsync-shared.hpp b/ndn-svs/svsync-shared.hpp index 89a3e70..4a38331 100644 --- a/ndn-svs/svsync-shared.hpp +++ b/ndn-svs/svsync-shared.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2022 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -40,25 +40,27 @@ class SVSyncShared : public SVSyncBase std::shared_ptr dataStore = DEFAULT_DATASTORE) : SVSyncBase(Name(grpPrefix).append("s"), Name(grpPrefix).append("d"), - id, face, updateCallback, securityOptions, std::move(dataStore)) - {} + id, + face, + updateCallback, + securityOptions, + std::move(dataStore)) + { + } - Name - getDataName(const NodeID& nid, const SeqNo& seqNo) override + Name getDataName(const NodeID& nid, const SeqNo& seqNo) override { return Name(m_dataPrefix).append(nid).appendNumber(seqNo); } /** @brief Set whether data of other nodes is also cached and served */ - void - setCacheAll(bool val) + void setCacheAll(bool val) { m_cacheAll = val; } private: - bool - shouldCache(const Data&) const override + bool shouldCache(const Data&) const override { return m_cacheAll; } diff --git a/ndn-svs/svsync.hpp b/ndn-svs/svsync.hpp index 8e88de5..c48b77a 100644 --- a/ndn-svs/svsync.hpp +++ b/ndn-svs/svsync.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2022 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -37,12 +37,17 @@ class SVSync : public SVSyncBase const UpdateCallback& updateCallback, const SecurityOptions& securityOptions = SecurityOptions::DEFAULT, std::shared_ptr dataStore = DEFAULT_DATASTORE) - : SVSyncBase(syncPrefix, Name(nodePrefix).append(syncPrefix), nodePrefix, - face, updateCallback, securityOptions, std::move(dataStore)) - {} + : SVSyncBase(syncPrefix, + Name(nodePrefix).append(syncPrefix), + nodePrefix, + face, + updateCallback, + securityOptions, + std::move(dataStore)) + { + } - Name - getDataName(const NodeID& nid, const SeqNo& seqNo) override + Name getDataName(const NodeID& nid, const SeqNo& seqNo) override { return Name(nid).append(m_syncPrefix).appendNumber(seqNo); } diff --git a/ndn-svs/tlv.hpp b/ndn-svs/tlv.hpp index 64eb9d1..2794b59 100644 --- a/ndn-svs/tlv.hpp +++ b/ndn-svs/tlv.hpp @@ -1,3 +1,19 @@ +/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2012-2025 University of California, Los Angeles + * + * This file is part of ndn-svs, synchronization library for distributed realtime + * applications for NDN. + * + * ndn-svs library is free software: you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License as published by the Free Software + * Foundation, in version 2.1 of the License. + * + * ndn-svs library is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. + */ + #ifndef NDN_SVS_TLV_HPP #define NDN_SVS_TLV_HPP @@ -5,13 +21,14 @@ namespace ndn::svs::tlv { -enum : uint32_t { - StateVector = 201, - StateVectorEntry = 202, - SeqNo = 204, - MappingData = 205, - MappingEntry = 206, - StateVectorLzma = 211, +enum : uint32_t +{ + StateVector = 201, + StateVectorEntry = 202, + SeqNo = 204, + MappingData = 205, + MappingEntry = 206, + LzmaBlock = 211, }; } // namespace ndn::svs::tlv diff --git a/ndn-svs/version-vector.cpp b/ndn-svs/version-vector.cpp index 96d3f7d..141527e 100644 --- a/ndn-svs/version-vector.cpp +++ b/ndn-svs/version-vector.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2022 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -44,8 +44,7 @@ VersionVector::encode() const ndn::encoding::EncodingBuffer enc; size_t totalLength = 0; - for (auto it = m_map.rbegin(); it != m_map.rend(); it++) - { + for (auto it = m_map.rbegin(); it != m_map.rend(); it++) { // SeqNo size_t entryLength = ndn::encoding::prependNonNegativeIntegerBlock(enc, tlv::SeqNo, it->second); @@ -66,8 +65,7 @@ std::string VersionVector::toStr() const { std::ostringstream stream; - for (const auto& elem : m_map) - { + for (const auto& elem : m_map) { stream << elem.first << ":" << elem.second << " "; } return stream.str(); diff --git a/ndn-svs/version-vector.hpp b/ndn-svs/version-vector.hpp index 7a25839..c31761e 100644 --- a/ndn-svs/version-vector.hpp +++ b/ndn-svs/version-vector.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2022 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -37,51 +37,51 @@ class VersionVector VersionVector() = default; /** Decode a version vector from ndn::Block */ - explicit - VersionVector(const ndn::Block& encoded); + explicit VersionVector(const ndn::Block& encoded); /** Encode the version vector to a string */ - ndn::Block - encode() const; + ndn::Block encode() const; /** Get a human-readable representation */ - std::string - toStr() const; + std::string toStr() const; - SeqNo - set(const NodeID& nid, SeqNo seqNo) + SeqNo set(const NodeID& nid, SeqNo seqNo) { m_map[nid] = seqNo; + m_lastUpdate[nid] = time::system_clock::now(); return seqNo; } - SeqNo - get(const NodeID& nid) const + SeqNo get(const NodeID& nid) const { auto elem = m_map.find(nid); return elem == m_map.end() ? 0 : elem->second; } - const_iterator - begin() const noexcept + time::system_clock::time_point getLastUpdate(const NodeID& nid) const + { + auto elem = m_lastUpdate.find(nid); + return elem == m_lastUpdate.end() ? time::system_clock::time_point::min() : elem->second; + } + + const_iterator begin() const noexcept { return m_map.begin(); } - const_iterator - end() const noexcept + const_iterator end() const noexcept { return m_map.end(); } - bool - has(const NodeID& nid) const + bool has(const NodeID& nid) const { return m_map.find(nid) != end(); } private: std::map m_map; + std::map m_lastUpdate; }; } // namespace ndn::svs diff --git a/tests/boost-test.hpp b/tests/boost-test.hpp index 7449f38..2650874 100644 --- a/tests/boost-test.hpp +++ b/tests/boost-test.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2021 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. diff --git a/tests/main.cpp b/tests/main.cpp index ead1da6..82927e8 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2021 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. diff --git a/tests/unit-tests/core.t.cpp b/tests/unit-tests/core.t.cpp index ec576db..75f2de1 100644 --- a/tests/unit-tests/core.t.cpp +++ b/tests/unit-tests/core.t.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2023 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -27,7 +27,7 @@ class CoreFixture protected: CoreFixture() : m_syncPrefix("/ndn/test") - , m_core(m_face, m_syncPrefix, [] (auto&&...) {}) + , m_core(m_face, m_syncPrefix, [](auto&&...) {}) { } @@ -41,40 +41,40 @@ BOOST_FIXTURE_TEST_SUITE(TestCore, CoreFixture) BOOST_AUTO_TEST_CASE(MergeStateVector) { - std::vector missingData; + std::vector missingInfo; VersionVector v = m_core.getState(); BOOST_CHECK_EQUAL(v.get("one"), 0); BOOST_CHECK_EQUAL(v.get("two"), 0); BOOST_CHECK_EQUAL(v.get("three"), 0); - BOOST_CHECK_EQUAL(missingData.size(), 0); + BOOST_CHECK_EQUAL(missingInfo.size(), 0); VersionVector v1; v1.set("one", 1); v1.set("two", 2); - missingData = std::get<2>(m_core.mergeStateVector(v1)); + missingInfo = m_core.mergeStateVector(v1).missingInfo; v = m_core.getState(); BOOST_CHECK_EQUAL(v.get("one"), 1); BOOST_CHECK_EQUAL(v.get("two"), 2); BOOST_CHECK_EQUAL(v.get("three"), 0); - BOOST_CHECK_EQUAL(missingData.size(), 2); + BOOST_CHECK_EQUAL(missingInfo.size(), 2); VersionVector v2; v2.set("one", 1); v2.set("two", 1); v2.set("three", 3); - missingData = std::get<2>(m_core.mergeStateVector(v2)); + missingInfo = m_core.mergeStateVector(v2).missingInfo; v = m_core.getState(); BOOST_CHECK_EQUAL(v.get("one"), 1); BOOST_CHECK_EQUAL(v.get("two"), 2); BOOST_CHECK_EQUAL(v.get("three"), 3); - BOOST_REQUIRE_EQUAL(missingData.size(), 1); - BOOST_CHECK_EQUAL(missingData[0].nodeId, "three"); - BOOST_CHECK_EQUAL(missingData[0].low, 1); - BOOST_CHECK_EQUAL(missingData[0].high, 3); + BOOST_REQUIRE_EQUAL(missingInfo.size(), 1); + BOOST_CHECK_EQUAL(missingInfo[0].nodeId, "three"); + BOOST_CHECK_EQUAL(missingInfo[0].low, 1); + BOOST_CHECK_EQUAL(missingInfo[0].high, 3); } BOOST_AUTO_TEST_SUITE_END() diff --git a/tests/unit-tests/version-vector.t.cpp b/tests/unit-tests/version-vector.t.cpp index f4fd111..2b8d967 100644 --- a/tests/unit-tests/version-vector.t.cpp +++ b/tests/unit-tests/version-vector.t.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2023 University of California, Los Angeles + * Copyright (c) 2012-2025 University of California, Los Angeles * * This file is part of ndn-svs, synchronization library for distributed realtime * applications for NDN. @@ -54,8 +54,7 @@ BOOST_AUTO_TEST_CASE(Set) BOOST_AUTO_TEST_CASE(Iterate) { std::unordered_map umap; - for (auto elem : v) - { + for (auto elem : v) { umap[elem.first] = elem.second; } @@ -77,8 +76,8 @@ BOOST_AUTO_TEST_CASE(EncodeDecode) BOOST_AUTO_TEST_CASE(DecodeStatic) { // Hex: CA0A070508036F6E65CC0101CA0A0705080374776FCC0102 - constexpr std::string_view encoded{"\xCA\x0A\x07\x05\x08\x03\x6F\x6E\x65\xCC\x01\x01" - "\xCA\x0A\x07\x05\x08\x03\x74\x77\x6F\xCC\x01\x02"}; + constexpr std::string_view encoded{ "\xCA\x0A\x07\x05\x08\x03\x6F\x6E\x65\xCC\x01\x01" + "\xCA\x0A\x07\x05\x08\x03\x74\x77\x6F\xCC\x01\x02" }; VersionVector dv(ndn::encoding::makeStringBlock(svs::tlv::StateVector, encoded)); BOOST_CHECK_EQUAL(dv.get("one"), 1); BOOST_CHECK_EQUAL(dv.get("two"), 2);