Skip to content

Commit

Permalink
Limit size of packets for receiving message to limit the size of
Browse files Browse the repository at this point in the history
allocated buffers. Use std::array instead of std::vector for these
packets.
  • Loading branch information
lutzbichler committed Jul 14, 2014
1 parent ecd8282 commit 9802b61
Show file tree
Hide file tree
Showing 20 changed files with 103 additions and 97 deletions.
40 changes: 20 additions & 20 deletions examples/client-sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,37 +103,36 @@ class client_sample {
<< "/"
<< std::setw(4) << std::setfill('0') << std::hex << _response->get_session()
<< "]";
if (session_ == _response->get_session()) {
send();
} else {
std::cout << "WRONG RESPONSE!" << std::endl;
}
send();
}

void send() {
if (!be_quiet_)
{
std::unique_lock< std::mutex > its_lock(mutex_);
std::lock_guard< std::mutex > its_lock(mutex_);
blocked_ = true;
condition_.notify_one();
}
}

void run() {
while (running_) {
std::unique_lock< std::mutex > its_lock(mutex_);
condition_.wait(its_lock);
std::this_thread::sleep_for(std::chrono::milliseconds(cycle_));
app_->send(request_, true, use_tcp_);
VSOMEIP_INFO << "Client/Session ["
<< std::setw(4) << std::setfill('0') << std::hex << request_->get_client()
<< "/"
<< std::setw(4) << std::setfill('0') << std::hex << request_->get_session()
<< "] sent a request to Service ["
<< std::setw(4) << std::setfill('0') << std::hex << request_->get_service()
<< "."
<< std::setw(4) << std::setfill('0') << std::hex << request_->get_instance()
<< "]";
session_ = request_->get_session();
{
std::unique_lock< std::mutex > its_lock(mutex_);
while (!blocked_) condition_.wait(its_lock);
std::this_thread::sleep_for(std::chrono::milliseconds(cycle_));
app_->send(request_, true, use_tcp_);
VSOMEIP_INFO << "Client/Session ["
<< std::setw(4) << std::setfill('0') << std::hex << request_->get_client()
<< "/"
<< std::setw(4) << std::setfill('0') << std::hex << request_->get_session()
<< "] sent a request to Service ["
<< std::setw(4) << std::setfill('0') << std::hex << request_->get_service()
<< "."
<< std::setw(4) << std::setfill('0') << std::hex << request_->get_instance()
<< "]";
blocked_ = false;
}
}
}

Expand All @@ -148,6 +147,7 @@ class client_sample {
std::condition_variable condition_;
std::thread sender_;
bool running_;
bool blocked_;
};


Expand Down
2 changes: 1 addition & 1 deletion examples/service-sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class service_sample {

std::shared_ptr< vsomeip::payload > its_payload = vsomeip::runtime::get()->create_payload();
std::vector< vsomeip::byte_t > its_payload_data;
for (std::size_t i = 0; i < 6; ++i) its_payload_data.push_back(i % 256);
for (std::size_t i = 0; i < 120; ++i) its_payload_data.push_back(i % 256);
its_payload->set_data(its_payload_data);
its_response->set_payload(its_payload);

Expand Down
8 changes: 6 additions & 2 deletions implementation/endpoints/include/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@
#include <array>
#include <memory>

#include <vsomeip/defines.hpp>
#include <vsomeip/primitive_types.hpp>

namespace vsomeip {

typedef std::vector< byte_t > buffer_t;
typedef std::shared_ptr< buffer_t > buffer_ptr_t;
typedef std::array< byte_t, VSOMEIP_PACKET_SIZE > packet_buffer_t;
typedef std::shared_ptr< packet_buffer_t > packet_buffer_ptr_t;

typedef std::vector< byte_t > message_buffer_t;
typedef std::shared_ptr< message_buffer_t > message_buffer_ptr_t;

} // namespace vsomeip

Expand Down
11 changes: 5 additions & 6 deletions implementation/endpoints/include/client_endpoint_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class client_endpoint_impl
public:
void connect_cbk(boost::system::error_code const &_error);
void wait_connect_cbk(boost::system::error_code const &_error);
void send_cbk(std::shared_ptr< buffer_t >, boost::system::error_code const &_error, std::size_t _bytes);
void send_cbk(message_buffer_ptr_t _buffer, boost::system::error_code const &_error, std::size_t _bytes);
void flush_cbk(boost::system::error_code const &_error);
void receive_cbk(buffer_ptr_t _buffer, boost::system::error_code const &_error, std::size_t _bytes);
void receive_cbk(packet_buffer_ptr_t _buffer, boost::system::error_code const &_error, std::size_t _bytes);

public:
virtual void connect() = 0;
Expand All @@ -66,16 +66,15 @@ class client_endpoint_impl
bool is_connected_;

// send data
//std::deque< std::vector< byte_t > > packet_queue_;
std::shared_ptr< std::vector< byte_t > > packetizer_;
message_buffer_ptr_t packetizer_;

// receive data
std::vector< byte_t > message_;
message_buffer_t message_;

std::mutex mutex_;

uint32_t queued_;
virtual void send_queued(std::shared_ptr< std::vector< byte_t > >) = 0;
virtual void send_queued(message_buffer_ptr_t) = 0;
};

} // namespace vsomeip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class local_client_endpoint_impl

void start();

void send_queued(buffer_ptr_t _data);
void send_queued(message_buffer_ptr_t _data);

void join(const std::string &);
void leave(const std::string &);
Expand All @@ -40,7 +40,7 @@ class local_client_endpoint_impl
void receive();

void send_tag_cbk(boost::system::error_code const &_error, std::size_t _bytes);
void receive_cbk(buffer_ptr_t _buffer,
void receive_cbk(packet_buffer_ptr_t _buffer,
boost::system::error_code const &_error, std::size_t _bytes);
};

Expand Down
13 changes: 6 additions & 7 deletions implementation/endpoints/include/local_server_endpoint_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class local_server_endpoint_impl
: public local_server_endpoint_base_impl {

public:
local_server_endpoint_impl(std::shared_ptr< endpoint_host > _host, endpoint_type _local, boost::asio::io_service &_io);
local_server_endpoint_impl(std::shared_ptr< endpoint_host > _host,
endpoint_type _local, boost::asio::io_service &_io);
virtual ~local_server_endpoint_impl();

void start();
Expand All @@ -37,9 +38,7 @@ class local_server_endpoint_impl
void restart();
void receive();

const uint8_t * get_buffer() const;

void send_queued(endpoint_type _target, std::shared_ptr< buffer_t > _data);
void send_queued(endpoint_type _target, message_buffer_ptr_t _data);
endpoint_type get_remote() const;

void join(const std::string &);
Expand All @@ -57,7 +56,7 @@ class local_server_endpoint_impl

void start();

void send_queued(buffer_ptr_t _data);
void send_queued(message_buffer_ptr_t _data);

private:
connection(local_server_endpoint_impl *_owner);
Expand All @@ -68,10 +67,10 @@ class local_server_endpoint_impl
local_server_endpoint_impl *server_;

// the current message
std::vector< byte_t > message_;
message_buffer_t message_;

private:
void receive_cbk(buffer_ptr_t _buffer,
void receive_cbk(packet_buffer_ptr_t _buffer,
boost::system::error_code const &_error, std::size_t _bytes);
};

Expand Down
6 changes: 3 additions & 3 deletions implementation/endpoints/include/server_endpoint_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ class server_endpoint_impl

public:
void connect_cbk(boost::system::error_code const &_error);
void send_cbk(std::shared_ptr< buffer_t > _data,
void send_cbk(message_buffer_ptr_t _buffer,
boost::system::error_code const &_error, std::size_t _bytes);
void flush_cbk(endpoint_type _target, const boost::system::error_code &_error);

public:
virtual void send_queued(endpoint_type _target, buffer_ptr_t _buffer) = 0;
virtual void send_queued(endpoint_type _target, message_buffer_ptr_t _buffer) = 0;
virtual endpoint_type get_remote() const = 0;

protected:
std::map< endpoint_type, std::shared_ptr< buffer_t > > packetizer_;
std::map< endpoint_type, message_buffer_ptr_t > packetizer_;
std::map< client_t, std::map< session_t, endpoint_type > > clients_;

boost::asio::system_timer flush_timer_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class tcp_client_endpoint_impl
virtual ~tcp_client_endpoint_impl();

void start();
void send_queued(buffer_ptr_t _buffer);
void send_queued(message_buffer_ptr_t _buffer);

void join(const std::string &);
void leave(const std::string &);
Expand Down
12 changes: 6 additions & 6 deletions implementation/endpoints/include/tcp_server_endpoint_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ class tcp_server_endpoint_impl
: public tcp_server_endpoint_base_impl {

public:
tcp_server_endpoint_impl(std::shared_ptr< endpoint_host > _host, endpoint_type _local, boost::asio::io_service &_io);
tcp_server_endpoint_impl(std::shared_ptr< endpoint_host > _host,
endpoint_type _local, boost::asio::io_service &_io);
virtual ~tcp_server_endpoint_impl();

void start();
void stop();

void send_queued(endpoint_type _target, std::shared_ptr< buffer_t > _data);
void send_queued(endpoint_type _target, message_buffer_ptr_t _buffer);
endpoint_type get_remote() const;

void join(const std::string &);
Expand All @@ -46,7 +47,6 @@ class tcp_server_endpoint_impl
// TODO: think about a better design!
void receive();
void restart();
const uint8_t * get_buffer() const;

private:
class connection
Expand All @@ -61,7 +61,7 @@ class tcp_server_endpoint_impl
void start();
void stop();

void send_queued(buffer_ptr_t _buffer);
void send_queued(message_buffer_ptr_t _buffer);

private:
connection(tcp_server_endpoint_impl *_owner);
Expand All @@ -70,10 +70,10 @@ class tcp_server_endpoint_impl
tcp_server_endpoint_impl::socket_type socket_;
tcp_server_endpoint_impl *server_;

std::vector< byte_t > message_;
message_buffer_t message_;

private:
void receive_cbk(buffer_ptr_t _buffer,
void receive_cbk(packet_buffer_ptr_t _buffer,
boost::system::error_code const &_error, std::size_t _bytes);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class udp_client_endpoint_impl
virtual ~udp_client_endpoint_impl();

void start();
void send_queued(buffer_ptr_t _buffer);
void send_queued(message_buffer_ptr_t _buffer);

void join(const std::string &_multicast_address);
void leave(const std::string &_multicast_address);
Expand Down
9 changes: 3 additions & 6 deletions implementation/endpoints/include/udp_server_endpoint_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ class udp_server_endpoint_impl
void restart();
void receive();

const uint8_t * get_buffer() const;

void send_queued(endpoint_type _target, buffer_ptr_t _buffer);
void send_queued(endpoint_type _target, message_buffer_ptr_t _buffer);
endpoint_type get_remote() const;

void join(const std::string &_multicast_address);
Expand All @@ -52,15 +50,14 @@ class udp_server_endpoint_impl
bool is_udp() const;

public:
void receive_cbk(buffer_ptr_t _buffer,
void receive_cbk(packet_buffer_ptr_t _buffer,
boost::system::error_code const &_error, std::size_t _size);

private:
socket_type socket_;

endpoint_type remote_;

std::vector< byte_t > message_;
message_buffer_t message_;
};

} // namespace vsomeip
Expand Down
12 changes: 6 additions & 6 deletions implementation/endpoints/src/client_endpoint_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ client_endpoint_impl< Protocol, MaxBufferSize >::client_endpoint_impl(
connect_timer_(_io),
flush_timer_(_io),
remote_(_remote),
packetizer_(std::make_shared< buffer_t >()),
packetizer_(std::make_shared< message_buffer_t >()),
connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable
is_connected_(false) {
}
Expand Down Expand Up @@ -72,15 +72,15 @@ bool client_endpoint_impl< Protocol, MaxBufferSize >::send(
#endif
if (packetizer_->size() + _size > MaxBufferSize) {
send_queued(packetizer_);
packetizer_ = std::make_shared< buffer_t >();
packetizer_ = std::make_shared< message_buffer_t >();
}

packetizer_->insert(packetizer_->end(), _data, _data + _size);

if (_flush) {
flush_timer_.cancel();
send_queued(packetizer_);
packetizer_ = std::make_shared< buffer_t >();
packetizer_ = std::make_shared< message_buffer_t >();
} else {
flush_timer_.expires_from_now(
std::chrono::milliseconds(VSOMEIP_DEFAULT_FLUSH_TIMEOUT)); // TODO: use config variable
Expand All @@ -102,7 +102,7 @@ bool client_endpoint_impl< Protocol, MaxBufferSize >::flush() {

if (!packetizer_->empty()) {
send_queued(packetizer_);
packetizer_ = std::make_shared< buffer_t >();
packetizer_ = std::make_shared< message_buffer_t >();
} else {
is_successful = false;
}
Expand Down Expand Up @@ -156,7 +156,7 @@ void client_endpoint_impl< Protocol, MaxBufferSize >::wait_connect_cbk(

template < typename Protocol, int MaxBufferSize >
void client_endpoint_impl< Protocol, MaxBufferSize >::send_cbk(
buffer_ptr_t _buffer,
message_buffer_ptr_t _buffer,
boost::system::error_code const &_error, std::size_t _bytes) {
#if 0
std::stringstream msg;
Expand All @@ -182,7 +182,7 @@ void client_endpoint_impl< Protocol, MaxBufferSize >::flush_cbk(

template < typename Protocol, int MaxBufferSize >
void client_endpoint_impl< Protocol, MaxBufferSize >::receive_cbk(
buffer_ptr_t _buffer,
packet_buffer_ptr_t _buffer,
boost::system::error_code const &_error, std::size_t _bytes) {

if (!_error && 0 < _bytes) {
Expand Down
14 changes: 7 additions & 7 deletions implementation/endpoints/src/local_client_endpoint_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,21 @@ void local_client_endpoint_impl::connect() {
}

void local_client_endpoint_impl::receive() {
std::shared_ptr< buffer_t > its_data
= std::make_shared< buffer_t >(VSOMEIP_MAX_LOCAL_MESSAGE_SIZE);
packet_buffer_ptr_t its_buffer
= std::make_shared< packet_buffer_t >();
socket_.async_receive(
boost::asio::buffer(*its_data),
boost::asio::buffer(*its_buffer),
std::bind(
&local_client_endpoint_impl::receive_cbk,
std::dynamic_pointer_cast< local_client_endpoint_impl >(shared_from_this()),
its_data,
its_buffer,
std::placeholders::_1,
std::placeholders::_2
)
);
}

void local_client_endpoint_impl::send_queued(buffer_ptr_t _buffer) {
void local_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
#if 0
std::stringstream msg;
msg << "lce<" << this << ">::sq: ";
Expand Down Expand Up @@ -122,9 +122,9 @@ void local_client_endpoint_impl::send_tag_cbk(
}

void local_client_endpoint_impl::receive_cbk(
buffer_ptr_t _buffer,
packet_buffer_ptr_t _buffer,
boost::system::error_code const &_error, std::size_t _bytes) {
VSOMEIP_ERROR << "Local endpoints must not receive messages!";
VSOMEIP_ERROR << "Local client endpoints must not receive messages!";
}

} // namespace vsomeip
Loading

0 comments on commit 9802b61

Please sign in to comment.