Skip to content

Commit

Permalink
Merge pull request #26 from named-data/svsv2
Browse files Browse the repository at this point in the history
v2: move state vector to App Parameter
  • Loading branch information
pulsejet authored Jan 6, 2025
2 parents e502c2a + e472352 commit 7fa0af0
Show file tree
Hide file tree
Showing 29 changed files with 663 additions and 642 deletions.
4 changes: 4 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
BasedOnStyle: Mozilla
AllowShortFunctionsOnASingleLine: Empty
IndentWidth: 2
ColumnLimit: 110
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ where `sync-prefix` is `/ndn/svs` for the example application.
## Contributing

Contributions are welcome through GitHub.
Use `clang-format` to format the code before submitting a pull request.
The VS Code extension for `clang-format` is recommended to format the code automatically.

## License

Expand Down
43 changes: 19 additions & 24 deletions examples/chat-pubsub.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012-2023 University of California, Los Angeles
* Copyright (c) 2012-2025 University of California, Los Angeles
*
* This file is part of ndn-svs, synchronization library for distributed realtime
* applications for NDN.
Expand Down Expand Up @@ -51,22 +51,19 @@ class Program
opts.maxPubAge = ndn::time::seconds(10);

// Create the Pub/Sub instance
m_svsps = std::make_shared<SVSPubSub>(
ndn::Name(m_options.prefix),
ndn::Name(m_options.m_id),
face,
std::bind(&Program::onMissingData, this, _1),
opts,
secOpts);
m_svsps = std::make_shared<SVSPubSub>(ndn::Name(m_options.prefix),
ndn::Name(m_options.m_id),
face,
std::bind(&Program::onMissingData, this, _1),
opts,
secOpts);

std::cout << "SVS client starting: " << m_options.m_id << std::endl;

// Subscribe to all data packets with prefix /chat (the "topic")
m_svsps->subscribe(ndn::Name("/chat"), [] (const auto& subData)
{
m_svsps->subscribe(ndn::Name("/chat"), [](const auto& subData) {
std::string content(reinterpret_cast<const char*>(subData.data.data()), subData.data.size());
std::cout << subData.producerPrefix << " [" << subData.seqNo << "] : " <<
subData.name << " : ";
std::cout << subData.producerPrefix << " [" << subData.seqNo << "] : " << subData.name << " : ";
if (content.length() > 200) {
std::cout << "[LONG] " << content.length() << " bytes"
<< " [" << std::hash<std::string>{}(content) << "]";
Expand All @@ -77,8 +74,7 @@ class Program
});
}

void
run()
void run()
{
// Begin processing face events in a separate thread.
std::thread svsThread([this] { face.processEvents(); });
Expand All @@ -101,20 +97,18 @@ class Program
protected:
/**
* Callback on receving a new State Vector from another node.
* This will be called regardless of whether the missing data contains any topics
* or producers that we are subscribed to.
* This will be called regardless of whether the missing data contains any
* topics or producers that we are subscribed to.
*/
void
onMissingData(const std::vector<MissingDataInfo>&)
void onMissingData(const std::vector<MissingDataInfo>&)
{
// Ignore any other missing data for this example
}

/**
* Publish a string message to the group
*/
void
publishMsg(const std::string& msg)
void publishMsg(const std::string& msg)
{
// Message to send
std::string content = msg;
Expand All @@ -129,14 +123,15 @@ class Program
for (auto& c : content)
c = std::rand() % 26 + 'a';

std::cout << "> Sending random message with hash [" << std::hash<std::string>{}(content) << "]" << std::endl;
std::cout << "> Sending random message with hash [" << std::hash<std::string>{}(content) << "]"
<< std::endl;
}

// Note that unlike SVSync, names can be arbitrary,
// and need not be prefixed with the producer prefix.
ndn::Name name("chat"); // topic of publication
name.append(m_options.m_id); // who sent this
name.appendTimestamp(); // and when
ndn::Name name("chat"); // topic of publication
name.append(m_options.m_id); // who sent this
name.appendTimestamp(); // and when

m_svsps->publish(name, ndn::make_span(reinterpret_cast<const uint8_t*>(content.data()), content.size()));
}
Expand Down
35 changes: 16 additions & 19 deletions examples/chat.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012-2023 University of California, Los Angeles
* Copyright (c) 2012-2025 University of California, Los Angeles
*
* This file is part of ndn-svs, synchronization library for distributed realtime
* applications for NDN.
Expand Down Expand Up @@ -41,17 +41,18 @@ class Program

// Create the SVSync instance
m_svs = std::make_shared<ndn::svs::SVSync>(
ndn::Name(m_options.prefix), // Sync prefix, common for all nodes in the group
ndn::Name(m_options.m_id), // Unique data prefix for this node
face, // Shared NDN face
std::bind(&Program::onMissingData, this, _1), // Callback on learning new sequence numbers from SVS
securityOptions); // Security configuration
ndn::Name(m_options.prefix), // Sync prefix, common for all nodes in the group
ndn::Name(m_options.m_id), // Unique data prefix for this node
face, // Shared NDN face
std::bind(&Program::onMissingData,
this,
_1), // Callback on learning new sequence numbers from SVS
securityOptions); // Security configuration

std::cout << "SVS client starting: " << m_options.m_id << std::endl;
}

void
run()
void run()
{
// Begin processing face events in a separate thread.
std::thread svsThread([this] { face.processEvents(); });
Expand All @@ -75,19 +76,15 @@ class Program
/**
* Callback on receving a new State Vector from another node
*/
void
onMissingData(const std::vector<ndn::svs::MissingDataInfo>& v)
void onMissingData(const std::vector<ndn::svs::MissingDataInfo>& v)
{
// Iterate over the entire difference set
for (size_t i = 0; i < v.size(); i++)
{
for (size_t i = 0; i < v.size(); i++) {
// Iterate over each new sequence number that we learned
for (ndn::svs::SeqNo s = v[i].low; s <= v[i].high; ++s)
{
for (ndn::svs::SeqNo s = v[i].low; s <= v[i].high; ++s) {
// Request a single data packet using the SVSync API
ndn::svs::NodeID nid = v[i].nodeId;
m_svs->fetchData(nid, s, [nid] (const auto& data)
{
m_svs->fetchData(nid, s, [nid](const auto& data) {
std::string content(reinterpret_cast<const char*>(data.getContent().value()),
data.getContent().value_size());
std::cout << data.getName() << " : " << content << std::endl;
Expand All @@ -99,10 +96,10 @@ class Program
/**
* Publish a string message to the SVSync group
*/
void
publishMsg(std::string_view msg)
void publishMsg(std::string_view msg)
{
// Encode the message into a Content TLV block, which is what the SVSync API expects
// Encode the message into a Content TLV block, which is what the SVSync API
// expects
auto block = ndn::encoding::makeStringBlock(ndn::tlv::Content, msg);
m_svs->publishData(block, ndn::time::seconds(1));
}
Expand Down
104 changes: 104 additions & 0 deletions examples/core.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012-2025 University of California, Los Angeles
*
* This file is part of ndn-svs, synchronization library for distributed realtime
* applications for NDN.
*
* ndn-svs library is free software: you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
* Foundation, in version 2.1 of the License.
*
* ndn-svs library is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
*/

#include <iostream>
#include <string>
#include <string_view>
#include <thread>
#include <vector>

#include <ndn-svs/svsync.hpp>

struct Options
{
std::string prefix;
std::string m_id;
};

class Program
{
public:
Program(const Options& options)
: m_options(options)
{
// This is a usage example of the low level SvSyncCore API.

// Create the SVSyncCore instance
m_svs = std::make_shared<ndn::svs::SVSyncCore>(
face, // Shared NDN face
ndn::Name(m_options.prefix), // Sync prefix, common for all nodes in the group
std::bind(&Program::onUpdate, this, _1), // Callback on learning new sequence numbers from SVS
ndn::svs::SecurityOptions::DEFAULT, // Security configuration
ndn::Name(m_options.m_id) // Unique prefix for this node
);

std::cout << "SVS client starting: " << m_options.m_id << std::endl;
}

void run()
{
// Begin processing face events in a separate thread.
std::thread svsThread([this] { face.processEvents(); });

// Increment our sequence number every 3 seconds
while (true) {
auto seq = m_svs->getSeqNo() + 1;
m_svs->updateSeqNo(seq);
std::cout << "Published sequence number: " << m_options.m_id << "=" << seq << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
}

// Wait for the SVSync thread to finish on exit.
svsThread.join();
}

protected:
/**
* Callback on receving a new State Vector from another node
*/
void onUpdate(const std::vector<ndn::svs::MissingDataInfo>& v)
{
for (size_t i = 0; i < v.size(); i++) {
for (ndn::svs::SeqNo s = v[i].low; s <= v[i].high; ++s) {
std::cout << "Received update: " << v[i].nodeId << "=" << s << std::endl;
}
}
}

public:
const Options m_options;
ndn::Face face;
std::shared_ptr<ndn::svs::SVSyncCore> m_svs;
ndn::KeyChain m_keyChain;
};

int
main(int argc, char** argv)
{
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <prefix>" << std::endl;
return 1;
}

Options opt;
opt.prefix = "/ndn/svs";
opt.m_id = argv[1];

Program program(opt);
program.run();

return 0;
}
4 changes: 2 additions & 2 deletions ndn-svs/common.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012-2022 University of California, Los Angeles
* Copyright (c) 2012-2025 University of California, Los Angeles
*
* This file is part of ndn-svs, synchronization library for distributed realtime
* applications for NDN.
Expand Down Expand Up @@ -44,7 +44,7 @@ using SeqNo = uint64_t;
using ndn::security::ValidationError;

using DataValidatedCallback = std::function<void(const Data&)>;
using DataValidationErrorCallback = std::function<void(const Data&, const ValidationError&)> ;
using DataValidationErrorCallback = std::function<void(const Data&, const ValidationError&)>;

} // namespace ndn::svs

Expand Down
Loading

0 comments on commit 7fa0af0

Please sign in to comment.