-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
issue #21 #28
base: master
Are you sure you want to change the base?
issue #21 #28
Changes from 2 commits
a2e587b
c25ee67
ec07ab3
d314b59
43c7c07
2ac6180
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ | ||
/* | ||
* Copyright (c) 2012-2023 University of California, Los Angeles | ||
* | ||
* This file is part of ndn-svs, synchronization library for distributed realtime | ||
* applications for NDN. | ||
* | ||
* ndn-svs library is free software: you can redistribute it and/or modify it under the | ||
* terms of the GNU Lesser General Public License as published by the Free Software | ||
* Foundation, in version 2.1 of the License. | ||
* | ||
* ndn-svs library is distributed in the hope that it will be useful, but WITHOUT ANY | ||
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A | ||
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. | ||
*/ | ||
|
||
#include <ctime> | ||
#include <functional> | ||
#include <iostream> | ||
#include <string> | ||
#include <thread> | ||
#include <vector> | ||
|
||
#include <ndn-svs/svspubsub.hpp> | ||
|
||
using namespace ndn::svs; | ||
|
||
struct Options | ||
{ | ||
std::string prefix; | ||
std::string m_id; | ||
}; | ||
|
||
class Program | ||
{ | ||
public: | ||
Program(const Options& options) | ||
: m_options(options) | ||
{ | ||
// Use HMAC signing for Sync Interests | ||
// Note: this is not generally recommended, but is used here for simplicity | ||
SecurityOptions secOpts(m_keyChain); | ||
secOpts.interestSigner->signingInfo.setSigningHmacKey("dGhpcyBpcyBhIHNlY3JldCBtZXNzYWdl"); | ||
|
||
// Sign data packets using SHA256 (for simplicity) | ||
secOpts.dataSigner->signingInfo.setSha256Signing(); | ||
|
||
// Do not fetch publications older than 10 seconds | ||
SVSPubSubOptions opts; | ||
opts.useTimestamp = true; | ||
opts.maxPubAge = ndn::time::seconds(10); | ||
|
||
// Create the Pub/Sub instance | ||
m_svsps = std::make_shared<SVSPubSub>( | ||
ndn::Name(m_options.prefix), | ||
ndn::Name(m_options.m_id), | ||
face, | ||
std::bind(&Program::onMissingData, this, _1), | ||
opts, | ||
secOpts); | ||
|
||
std::cout << "SVS client starting: " << m_options.m_id << std::endl; | ||
|
||
// Subscribe to all data packets with prefix /chat (the "topic") | ||
m_svsps->subscribeWithRegex(ndn::Regex("^<chat>"), [] (const auto& subData) | ||
{ | ||
std::string content(reinterpret_cast<const char*>(subData.data.data()), subData.data.size()); | ||
std::cout << subData.producerPrefix << " [" << subData.seqNo << "] : " << | ||
subData.name << " : "; | ||
if (content.length() > 200) { | ||
std::cout << "[LONG] " << content.length() << " bytes" | ||
<< " [" << std::hash<std::string>{}(content) << "]"; | ||
} else { | ||
std::cout << content; | ||
} | ||
std::cout << std::endl; | ||
}); | ||
} | ||
|
||
void | ||
run() | ||
{ | ||
// Begin processing face events in a separate thread. | ||
std::thread svsThread([this] { face.processEvents(); }); | ||
|
||
// Announce our presence. | ||
// Note that the SVS-PS instance is thread-safe. | ||
publishMsg("User " + m_options.m_id + " has joined the groupchat"); | ||
|
||
// Read from stdin and publish messages. | ||
std::string userInput; | ||
while (true) { | ||
std::getline(std::cin, userInput); | ||
publishMsg(userInput); | ||
} | ||
|
||
// Wait for the SVS-PS thread to finish. | ||
svsThread.join(); | ||
} | ||
|
||
protected: | ||
/** | ||
* Callback on receving a new State Vector from another node. | ||
* This will be called regardless of whether the missing data contains any topics | ||
* or producers that we are subscribed to. | ||
*/ | ||
void | ||
onMissingData(const std::vector<MissingDataInfo>&) | ||
{ | ||
// Ignore any other missing data for this example | ||
} | ||
|
||
/** | ||
* Publish a string message to the group | ||
*/ | ||
void | ||
publishMsg(const std::string& msg) | ||
{ | ||
// Message to send | ||
std::string content = msg; | ||
|
||
// If the message starts with "SEND " generate a new message | ||
// with random content with length after send | ||
if (msg.length() > 5 && msg.substr(0, 5) == "SEND ") { | ||
auto len = std::stoi(msg.substr(5)); | ||
|
||
content = std::string(len, 'a'); | ||
std::srand(std::time(nullptr)); | ||
for (auto& c : content) | ||
c = std::rand() % 26 + 'a'; | ||
|
||
std::cout << "> Sending random message with hash [" << std::hash<std::string>{}(content) << "]" << std::endl; | ||
} | ||
|
||
// Note that unlike SVSync, names can be arbitrary, | ||
// and need not be prefixed with the producer prefix. | ||
ndn::Name name("chat"); // topic of publication | ||
name.append(m_options.m_id); // who sent this | ||
name.appendTimestamp(); // and when | ||
|
||
m_svsps->publish(name, ndn::make_span(reinterpret_cast<const uint8_t*>(content.data()), content.size())); | ||
} | ||
|
||
private: | ||
const Options m_options; | ||
ndn::Face face; | ||
std::shared_ptr<SVSPubSub> m_svsps; | ||
ndn::KeyChain m_keyChain; | ||
}; | ||
|
||
int | ||
main(int argc, char** argv) | ||
{ | ||
if (argc != 2) { | ||
std::cerr << "Usage: " << argv[0] << " <prefix>" << std::endl; | ||
return 1; | ||
} | ||
|
||
Options opt; | ||
opt.prefix = "/ndn/svs"; | ||
opt.m_id = argv[1]; | ||
|
||
Program program(opt); | ||
program.run(); | ||
|
||
return 0; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,6 +90,24 @@ SVSPubSub::publish(const Name& name, span<const uint8_t> value, | |
} | ||
} | ||
|
||
|
||
SeqNo | ||
SVSPubSub::publish(const Name& name, | ||
const Name& nodePrefix, time::milliseconds freshnessPeriod, | ||
std::vector<Block> mappingBlocks) | ||
{ | ||
// Segment the data if larger than MAX_DATA_SIZE | ||
|
||
NodeID nid = nodePrefix == EMPTY_NAME ? m_dataPrefix : nodePrefix; | ||
SeqNo seqNo = m_svsync.getCore().getSeqNo(nid) + 1; | ||
|
||
// Insert mapping and manually update the sequence number | ||
insertMapping(nid, seqNo, name, mappingBlocks); | ||
m_svsync.getCore().updateSeqNo(seqNo, nid); | ||
|
||
return seqNo; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation looks wrong |
||
} | ||
|
||
SeqNo | ||
SVSPubSub::publishPacket(const Data& data, const Name& nodePrefix, | ||
std::vector<Block> mappingBlocks) | ||
|
@@ -139,6 +157,15 @@ SVSPubSub::subscribe(const Name& prefix, const SubscriptionCallback& callback, b | |
return handle; | ||
} | ||
|
||
uint32_t | ||
SVSPubSub::subscribeWithRegex(const Regex ®ex, const SubscriptionCallback &callback,bool autofetch, bool packets) | ||
{ | ||
uint32_t handle = ++m_subscriptionCount; | ||
Subscription sub = { handle, ndn::Name(), callback, packets, false, make_shared<Regex>(regex), autofetch}; | ||
m_regexSubscriptions.push_back(sub); | ||
return handle; | ||
} | ||
|
||
uint32_t | ||
SVSPubSub::subscribeToProducer(const Name& nodePrefix, const SubscriptionCallback& callback, | ||
bool prefetch, bool packets) | ||
|
@@ -190,8 +217,8 @@ SVSPubSub::updateCallbackInternal(const std::vector<MissingDataInfo>& info) | |
} | ||
} | ||
|
||
// Fetch all mappings if we have prefix subscription(s) | ||
if (!m_prefixSubscriptions.empty()) | ||
// Fetch all mappings if we have prefix subscription(s) or regex subscription(s) | ||
if (!m_prefixSubscriptions.empty() or !m_regexSubscriptions.empty()) | ||
{ | ||
MissingDataInfo remainingInfo = stream; | ||
|
||
|
@@ -275,11 +302,41 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo) | |
bool queued = false; | ||
for (const auto& sub : m_prefixSubscriptions) | ||
{ | ||
if (sub.prefix.isPrefixOf(mapping.first)) | ||
if (sub.prefix.isPrefixOf(mapping.first) and sub.autofetch) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use |
||
{ | ||
m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); | ||
queued = true; | ||
} | ||
else if (sub.prefix.isPrefixOf(mapping.first) and !sub.autofetch) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refactor the if/else to avoid repeating the conditions |
||
{ | ||
SubscriptionData subData = { | ||
mapping.first, | ||
ndn::span<const uint8_t>{}, | ||
nodeId, | ||
seqNo, | ||
ndn::Data() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't love this "passing an empty Data" to be honest... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have any suggestion? Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not at the moment... I didn't do too much thinking to be honest. But maybe this should be a new/separate API instead of shoehorning it into the existing API with a bool as discriminator. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah... But it's better to be compatible with old APIs because of existing apps... |
||
}; | ||
sub.callback(subData); | ||
} | ||
} | ||
for (const auto& sub : m_regexSubscriptions) | ||
{ | ||
if (sub.regex->match(mapping.first) and sub.autofetch) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ... |
||
{ | ||
m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub); | ||
queued = true; | ||
} | ||
else if (sub.regex->match(mapping.first) and !sub.autofetch) | ||
{ | ||
SubscriptionData subData = { | ||
mapping.first, | ||
ndn::span<const uint8_t>{}, | ||
nodeId, | ||
seqNo, | ||
ndn::Data() | ||
}; | ||
sub.callback(subData); | ||
} | ||
} | ||
|
||
return queued; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,6 +116,20 @@ class SVSPubSub : noncopyable | |
time::milliseconds freshnessPeriod = FRESH_FOREVER, | ||
std::vector<Block> mappingBlocks = {}); | ||
|
||
/** | ||
* @brief Publish data names only on the pub/sub group. | ||
* | ||
* @param name name for the publication | ||
* @param nodePrefix Name to publish the data under | ||
* @param freshnessPeriod freshness period for the data | ||
* @param mappingBlocks Additional blocks to be published with the mapping (use sparingly) | ||
*/ | ||
SeqNo | ||
publish(const Name& name, | ||
const Name& nodePrefix = EMPTY_NAME, | ||
time::milliseconds freshnessPeriod = FRESH_FOREVER, | ||
std::vector<Block> mappingBlocks = {}); | ||
|
||
/** | ||
* @brief Subscribe to a application name prefix. | ||
* | ||
|
@@ -128,6 +142,18 @@ class SVSPubSub : noncopyable | |
uint32_t | ||
subscribe(const Name& prefix, const SubscriptionCallback& callback, bool packets = false); | ||
|
||
/** | ||
* @brief Subscribe with a regex to name. | ||
* | ||
* @param regex regex of the application data | ||
* @param callback Callback when new data is received | ||
* @param packets Subscribe to the raw Data packets instead of BLOBs | ||
* | ||
* @returns Handle to the subscription | ||
*/ | ||
uint32_t | ||
subscribeWithRegex(const Regex& regex, const SubscriptionCallback& callback, bool autofetch = true, bool packets = false); | ||
|
||
/** | ||
* @brief Subscribe to a data producer | ||
* | ||
|
@@ -181,6 +207,8 @@ class SVSPubSub : noncopyable | |
SubscriptionCallback callback; | ||
bool isPacketSubscription; | ||
bool prefetch; | ||
std::shared_ptr<Regex> regex = make_shared<Regex>("^<>+$"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why a |
||
bool autofetch = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This wastes space in the struct, please move the bool next to the others above to minimize padding. Even better, the bools and the |
||
}; | ||
|
||
void | ||
|
@@ -241,6 +269,7 @@ class SVSPubSub : noncopyable | |
uint32_t m_subscriptionCount; | ||
std::vector<Subscription> m_producerSubscriptions; | ||
std::vector<Subscription> m_prefixSubscriptions; | ||
std::vector<Subscription> m_regexSubscriptions; | ||
|
||
// Queue of publications to fetch | ||
std::map<std::pair<Name, SeqNo>, std::vector<Subscription>> m_fetchMap; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please include this in the files where it's actually needed, not in
common.hpp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your review and I will update them accordingly