Skip to content

Commit

Permalink
Disable positive ACKs QoS [5042] (eProsima#477)
Browse files Browse the repository at this point in the history
* Refs eProsima#4995 Code formatting

* Refs eProsima#4939 Added TimedCallback class and made changes to participant classes to be able to get the resource event

* Refs eProsima#4939 Sorting history cache changes by timestamp

* Refs eProsima#4939 Changes to subscriber side

* Refs eProsima#4939 Changing signature of ReaderListener::onNewCacheChangeAdded so that the cache change can be removed from the subscriber if is received after it has expired

* Refs eProsima#4939 Changes to subscriber side

* Refs eProsima#4939 Adding two blackbox tests for lifespan

* Refs eProsima#4939 Adding a C++ example

* Refs eProsima#4939 Mutexing SubscriberImpl::lifespan_expired and calling destroy() in TimedCallback destructor

* Refs eProsima#4939 Fixing CI builds

* Refs eProsima#4939 Fixing some tests that were failing

* Refs eProsima#4939 Taking into account the case where the earliest change in the history is not the one that started the timer. This can happen for instance if a change starts the timer but it is later removed from the history

* Refs eProsima#4992 Applying coding style to lifespan c++ example files

* Refs eProsima#4992 Moving lifespan example readme file to markdown

* Refs eProsima#4992 Moving TimedCallback to a new timedevent/ directory

* Refs #Adding boolean to Writer and Reader Attributes + some format changes

* Refs eProsima#4995 More format changes

* Refs eProsima#4995 Implementing disable positive ACKs QoS

* Refs eProsima#4995 Some fixes and format changes + adding a new QoS

* Refs eProsima#4995 Adding C++ example

* Refs eProsima#4995 Allowing users to turn on/off disable positive ACKs QoS in the C++ example and updating example documentation accordingly

* Refs eProsima#4995 Adding two blackbox tests (very similar to lifespan QoS tests)

* Refs eProsima#5042 Changes to C++ example

* Refs eProsima#5042 Addressing some of the review suggestions

* Refs eProsima#4995 Always using source timestamp in info TS

* Refs eProsima#4995 Expired changes are no longer removed from the history so logic when ack timer expires has to be changed

* Refs eProsima#5042 Fixing compiler warnings

* Refs eProsima#5042 Fixing matching reader-writer

* Refs eProsima#5042 Adding functionality to XML parser

* Refs eProsima#5042 Adding command line options to C++ examples + updating example documentation
  • Loading branch information
raquelalvarezbanos authored and richiware committed Apr 24, 2019
1 parent 2e8ae32 commit f809cdf
Show file tree
Hide file tree
Showing 75 changed files with 2,400 additions and 562 deletions.
2 changes: 2 additions & 0 deletions examples/C++/DisablePositiveACKs/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bin
output
51 changes: 51 additions & 0 deletions examples/C++/DisablePositiveACKs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cmake_minimum_required(VERSION 2.8.12)

if(NOT CMAKE_VERSION VERSION_LESS 3.0)
cmake_policy(SET CMP0048 NEW)
endif()

project(LifespanQoSExample)

# Find requirements
if(NOT fastcdr_FOUND)
find_package(fastcdr REQUIRED)
endif()

if(NOT fastrtps_FOUND)
find_package(fastrtps REQUIRED)
endif()

# Set C++11
include(CheckCXXCompilerFlag)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANG OR
CMAKE_CXX_COMPILER_ID MATCHES "Clang")
check_cxx_compiler_flag(-std=c++11 SUPPORTS_CXX11)
if(SUPPORTS_CXX11)
add_compile_options(-std=c++11)
else()
message(FATAL_ERROR "Compiler doesn't support C++11")
endif()
endif()

message(STATUS "Configuring HelloWorld example...")
file(GLOB POSITIVEACKS_EXAMPLE_SOURCES_CXX "*.cxx")
file(GLOB POSITIVEACKS_EXAMPLE_SOURCES_CPP "*.cpp")

add_executable(DisablePositiveACKsQoS ${POSITIVEACKS_EXAMPLE_SOURCES_CXX} ${POSITIVEACKS_EXAMPLE_SOURCES_CPP})
target_link_libraries(DisablePositiveACKsQoS fastrtps fastcdr)
install(TARGETS DisablePositiveACKsQoS
RUNTIME DESTINATION examples/C++/DisablePositiveACKsQoS/${BIN_INSTALL_DIR})
188 changes: 188 additions & 0 deletions examples/C++/DisablePositiveACKs/DisablePositiveACKS_main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file DisablePositiveACKs_main.cpp
*
*/

#include "DisablePositiveACKsPublisher.h"
#include "DisablePositiveACKsSubscriber.h"

#include <fastrtps/Domain.h>

#include <fastrtps/utils/eClock.h>
#include <fastrtps/log/Log.h>

using namespace eprosima;
using namespace fastrtps;
using namespace rtps;

/**
* @brief Parses command line arguments
* @param argc Number of command line arguments
* @param argv Array of command line arguments
* @param type Publisher or subscriber
* @param enabled A positive number to enable this Qos or zero to disable it
* @param duration_ms Duration before setting samples to acknowledged
* @param sleep_ms Writer sleep read from command line arguments (populated if specified)
* @param samples Number of samples read from command line arguments (populated if specified
* @return True if command line arguments were parsed succesfully and execution can continue
*/
bool parse_arguments(
int argc,
char** argv,
int& type,
long& enabled,
long& duration_ms,
long& sleep_ms,
long& samples)
{
if (argc == 1)
{
// No arguments provided
return false;
}

for (int i=0; i<argc; i++)
{
if (!strcmp(argv[i], "--help"))
{
// --help command found
return false;
}
}

if (strcmp(argv[1], "publisher") == 0)
{
type = 1;

int count = 2;
while (count < argc)
{
if (!strcmp(argv[count], "--disable"))
{
enabled = 1;
count = count + 1;
}
else if (!strcmp(argv[count], "--keep_duration"))
{
duration_ms = atoi(argv[count + 1]);
count = count + 2;
}
else if (!strcmp(argv[count], "--sleep"))
{
sleep_ms = atoi(argv[count + 1]);
count = count + 2;
}
else if (!strcmp(argv[count], "--samples"))
{
samples = atoi(argv[count + 1]);
count = count + 2;
}
else
{
std::cout << "Unknown command line option " << argv[count] << " for publisher" << std::endl;
return false;
}
}
return true;
}

if (strcmp(argv[1], "subscriber") == 0)
{
type = 2;

int count = 2;
while (count < argc)
{
if (!strcmp(argv[count], "--disable"))
{
enabled = 1;
count = count + 1;
}
else
{
std::cout << "Unknown command line option " << argv[2] << " for publisher" << std::endl;
return false;
}
}
return true;
}

return false;
}

int main(int argc, char** argv)
{
int type = 1;

// >0 to use disable positive acks
long use_disable_positive_acks = 0;
// Keep duration in milliseconds
long keep_duration_ms = 5000;
// Sleep time between samples
long writer_sleep_ms = 1000;
// Number of samples to send
long count = 20;

if (!parse_arguments(
argc,
argv,
type,
use_disable_positive_acks,
keep_duration_ms,
writer_sleep_ms,
count))
{
std::cout << "Usage: " << std::endl;
std::cout << argv[0] << " publisher ";
std::cout << "[--disable]" ;
std::cout << "[--keep_duration <duration_ms>] ";
std::cout << "[--sleep <writer_sleep_ms>] ";
std::cout << "[--samples <samples>]" << std::endl;

std::cout << "OR" << std::endl;
std::cout << argv[0] << " subscriber ";
std::cout << "[--disable]" << std::endl;

Log::Reset();
return 0;
}

switch(type)
{
case 1:
{
DisablePositiveACKsPublisher mypub;
if (mypub.init(use_disable_positive_acks > 0, keep_duration_ms))
{
mypub.run(count, writer_sleep_ms);
}
break;
}
case 2:
{
DisablePositiveACKsSubscriber mysub;
if (mysub.init(use_disable_positive_acks > 0))
{
mysub.run(count);
}
break;
}
}
Domain::stopAll();
Log::Reset();
return 0;
}
126 changes: 126 additions & 0 deletions examples/C++/DisablePositiveACKs/DisablePositiveACKsPublisher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file DisablePositiveACKsPublisher.cpp
*
*/

#include "DisablePositiveACKsPublisher.h"
#include <fastrtps/participant/Participant.h>
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/PublisherAttributes.h>
#include <fastrtps/publisher/Publisher.h>
#include <fastrtps/Domain.h>
#include <fastrtps/utils/eClock.h>

#include <thread>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;

DisablePositiveACKsPublisher::DisablePositiveACKsPublisher()
: participant_(nullptr)
, publisher_(nullptr)
{
}

bool DisablePositiveACKsPublisher::init(
bool disable_positive_acks,
uint32_t keep_duration_ms)
{
hello_.index(0);
hello_.message("DisablePositiveACKs");

ParticipantAttributes PParam;
PParam.rtps.setName("Participant_pub");
participant_ = Domain::createParticipant(PParam);
if( participant_ == nullptr )
{
return false;
}

Domain::registerType(participant_,&type_);

PublisherAttributes Wparam;
Wparam.topic.topicKind = NO_KEY;
Wparam.topic.topicDataType = "Topic";
Wparam.topic.topicName = "DisablePositiveACKsTopic";
Wparam.topic.historyQos.kind = KEEP_ALL_HISTORY_QOS;
Wparam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
Wparam.qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
Wparam.qos.m_disablePositiveACKs.enabled = disable_positive_acks;
Wparam.qos.m_disablePositiveACKs.duration = rtps::Duration_t(keep_duration_ms * 1e-3);
publisher_ = Domain::createPublisher(participant_, Wparam, (PublisherListener*) &listener);
if( publisher_ == nullptr )
{
return false;
}

return true;
}

DisablePositiveACKsPublisher::~DisablePositiveACKsPublisher()
{
Domain::removeParticipant(participant_);
}

void DisablePositiveACKsPublisher::PubListener::onPublicationMatched(Publisher* /*pub*/, MatchingInfo& info)
{
if(info.status == MATCHED_MATCHING)
{
n_matched++;
std::cout << "Publisher matched"<<std::endl;
}
else
{
n_matched--;
std::cout << "Publisher unmatched"<<std::endl;
}
}

void DisablePositiveACKsPublisher::run(
uint32_t samples,
uint32_t write_sleep_ms)
{
std::cout << "Publisher running" << std::endl;

samples = ( samples == 0 ) ? 10 : samples;
for( uint32_t i = 0; i < samples; ++i )
{
if( !publish() )
{
--i;
}
else
{
std::cout << "Message with index: " << hello_.index() << " SENT" << std::endl;
}
eClock::my_sleep(write_sleep_ms);
}

std::cout << "Please press enter to stop the Publisher" << std::endl;
std::cin.ignore();
}

bool DisablePositiveACKsPublisher::publish(bool waitForListener)
{
if(!waitForListener || listener.n_matched>0)
{
hello_.index(hello_.index()+1);
publisher_->write((void*)&hello_);
return true;
}
return false;
}
Loading

0 comments on commit f809cdf

Please sign in to comment.