Skip to content

Commit

Permalink
Improve throughput performance #4 (rx defragmentation focus) (#754)
Browse files Browse the repository at this point in the history
* fix: bad nt string access in msgcodec test

* feat: wrap fragment instead of copy

* feat: remove extra wbuf init allocation

* feat: add iobuf null functions

* feat: add wbuf moved as zbuf function

* feat: add defrag buff state and allocate only as needed

* feat: move defrag buffer instead of copy

* fix: wbuf moved memory leak

* fix: memory leak on write filter

* feat: add defrag buffer changes to multicast

* fix: transport memory leak
  • Loading branch information
jean-roland authored Oct 22, 2024
1 parent 3fea89c commit b6a4599
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 115 deletions.
5 changes: 5 additions & 0 deletions include/zenoh-pico/protocol/iobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ typedef struct {
bool _is_alloc;
} _z_iosli_t;

static inline _z_iosli_t _z_iosli_null(void) { return (_z_iosli_t){0}; }
_z_iosli_t _z_iosli_make(size_t capacity);
_z_iosli_t *_z_iosli_new(size_t capacity);
_z_iosli_t _z_iosli_wrap(const uint8_t *buf, size_t length, size_t r_pos, size_t w_pos);
_z_iosli_t _z_iosli_steal(_z_iosli_t *ios);

size_t _z_iosli_readable(const _z_iosli_t *ios);
uint8_t _z_iosli_read(_z_iosli_t *ios);
Expand Down Expand Up @@ -68,6 +70,7 @@ typedef struct {
} _z_zbuf_t;

static inline size_t _z_zbuf_get_ref_count(const _z_zbuf_t *zbf) { return _z_slice_simple_rc_count(&zbf->_slice); }
static inline _z_zbuf_t _z_zbuf_null(void) { return (_z_zbuf_t){0}; }
_z_zbuf_t _z_zbuf_make(size_t capacity);
_z_zbuf_t _z_zbuf_view(_z_zbuf_t *zbf, size_t length);
/// Constructs a _borrowing_ reader on `slice`
Expand Down Expand Up @@ -106,6 +109,7 @@ typedef struct {
size_t _expansion_step;
} _z_wbuf_t;

static inline _z_wbuf_t _z_wbuf_null(void) { return (_z_wbuf_t){0}; }
_z_wbuf_t _z_wbuf_make(size_t capacity, bool is_expandable);

size_t _z_wbuf_capacity(const _z_wbuf_t *wbf);
Expand All @@ -127,6 +131,7 @@ _z_iosli_t *_z_wbuf_get_iosli(const _z_wbuf_t *wbf, size_t idx);
size_t _z_wbuf_len_iosli(const _z_wbuf_t *wbf);

_z_zbuf_t _z_wbuf_to_zbuf(const _z_wbuf_t *wbf);
_z_zbuf_t _z_wbuf_moved_as_zbuf(_z_wbuf_t *wbf);
z_result_t _z_wbuf_siphon(_z_wbuf_t *dst, _z_wbuf_t *src, size_t length);

void _z_wbuf_copy(_z_wbuf_t *dst, const _z_wbuf_t *src);
Expand Down
10 changes: 10 additions & 0 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/transport.h"

enum _z_dbuf_state_e {
_Z_DBUF_STATE_NULL = 0,
_Z_DBUF_STATE_INIT = 1,
_Z_DBUF_STATE_OVERFLOW = 2,
};

typedef struct {
#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffers
uint8_t _state_reliable;
uint8_t _state_best_effort;
_z_wbuf_t _dbuf_reliable;
_z_wbuf_t _dbuf_best_effort;
#endif
Expand Down Expand Up @@ -77,6 +85,8 @@ typedef struct {

#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffer
uint8_t _state_reliable;
uint8_t _state_best_effort;
_z_wbuf_t _dbuf_reliable;
_z_wbuf_t _dbuf_best_effort;
#endif
Expand Down
3 changes: 2 additions & 1 deletion src/net/filtering.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ z_result_t _z_write_filter_create(_z_publisher_t *pub) {

z_result_t _z_write_filter_destroy(_z_publisher_t *pub) {
if (pub->_filter.ctx != NULL) {
_Z_RETURN_IF_ERR(_z_remove_interest(_Z_RC_IN_VAL(&pub->_zn), pub->_filter._interest_id));
z_result_t res = _z_remove_interest(_Z_RC_IN_VAL(&pub->_zn), pub->_filter._interest_id);
z_free(pub->_filter.ctx);
pub->_filter.ctx = NULL;
return res;
}
return _Z_RES_OK;
}
Expand Down
4 changes: 1 addition & 3 deletions src/protocol/codec/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,7 @@ z_result_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t
if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) {
ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x05);
}

_z_slice_t slice = _z_slice_alias_buf((uint8_t *)_z_zbuf_start(zbf), _z_zbuf_len(zbf));
_z_slice_copy(&msg->_payload, &slice);
msg->_payload = _z_slice_alias_buf((uint8_t *)_z_zbuf_start(zbf), _z_zbuf_len(zbf));
zbf->_ios._r_pos = zbf->_ios._w_pos;

return ret;
Expand Down
41 changes: 30 additions & 11 deletions src/protocol/iobuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ _z_iosli_t _z_iosli_wrap(const uint8_t *buf, size_t length, size_t r_pos, size_t
return ios;
}

_z_iosli_t _z_iosli_steal(_z_iosli_t *ios) {
_z_iosli_t new_ios = *ios;
*ios = _z_iosli_null();
return new_ios;
}

void __z_iosli_init(_z_iosli_t *ios, size_t capacity) {
ios->_r_pos = 0;
ios->_w_pos = 0;
Expand Down Expand Up @@ -168,7 +174,7 @@ _z_iosli_t *_z_iosli_clone(const _z_iosli_t *src) {

/*------------------ ZBuf ------------------*/
_z_zbuf_t _z_zbuf_make(size_t capacity) {
_z_zbuf_t zbf = {0};
_z_zbuf_t zbf = _z_zbuf_null();
zbf._ios = _z_iosli_make(capacity);
if (_z_zbuf_capacity(&zbf) == 0) {
return zbf;
Expand Down Expand Up @@ -284,15 +290,8 @@ size_t _z_wbuf_len_iosli(const _z_wbuf_t *wbf) { return _z_iosli_vec_len(&wbf->_

_z_wbuf_t _z_wbuf_make(size_t capacity, bool is_expandable) {
_z_wbuf_t wbf;
if (is_expandable == true) {
// Preallocate 4 slots, this is usually what we expect
// when fragmenting a zenoh data message with attachment
wbf._ioss = _z_iosli_vec_make(4);
_z_wbuf_add_iosli(&wbf, __z_wbuf_new_iosli(capacity));
} else {
wbf._ioss = _z_iosli_vec_make(1);
_z_wbuf_add_iosli(&wbf, __z_wbuf_new_iosli(capacity));
}
wbf._ioss = _z_iosli_vec_make(1);
_z_wbuf_add_iosli(&wbf, __z_wbuf_new_iosli(capacity));
wbf._w_idx = 0; // This __must__ come after adding ioslices to reset w_idx
wbf._r_idx = 0;
wbf._expansion_step = is_expandable ? capacity : 0;
Expand Down Expand Up @@ -541,6 +540,23 @@ _z_zbuf_t _z_wbuf_to_zbuf(const _z_wbuf_t *wbf) {
return zbf;
}

_z_zbuf_t _z_wbuf_moved_as_zbuf(_z_wbuf_t *wbf) {
// Can only move single buffer wbuf
assert(_z_iosli_vec_len(&wbf->_ioss) == 1);

_z_zbuf_t zbf = _z_zbuf_null();
_z_iosli_t *ios = _z_wbuf_get_iosli(wbf, 0);
zbf._ios = _z_iosli_steal(ios);
_z_slice_t s = _z_slice_from_buf_custom_deleter(zbf._ios._buf, zbf._ios._capacity, _z_delete_context_default());
zbf._slice = _z_slice_simple_rc_new_from_val(&s);
if (_Z_RC_IS_NULL(&zbf._slice)) {
_Z_ERROR("slice rc creation failed");
}
zbf._ios._is_alloc = false;
_z_wbuf_clear(wbf);
return zbf;
}

z_result_t _z_wbuf_siphon(_z_wbuf_t *dst, _z_wbuf_t *src, size_t length) {
z_result_t ret = _Z_RES_OK;
size_t llength = length;
Expand Down Expand Up @@ -593,7 +609,10 @@ void _z_wbuf_reset(_z_wbuf_t *wbf) {
}
}

void _z_wbuf_clear(_z_wbuf_t *wbf) { _z_iosli_vec_clear(&wbf->_ioss); }
void _z_wbuf_clear(_z_wbuf_t *wbf) {
_z_iosli_vec_clear(&wbf->_ioss);
*wbf = _z_wbuf_null();
}

void _z_wbuf_free(_z_wbuf_t **wbf) {
_z_wbuf_t *ptr = *wbf;
Expand Down
1 change: 1 addition & 0 deletions src/transport/multicast/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ void *_zp_multicast_read_task(void *ztm_arg) {
_z_slice_clear(&addr);
} else {
_Z_ERROR("Dropping message due to processing error: %d", ret);
_z_slice_clear(&addr);
continue;
}
} else {
Expand Down
96 changes: 61 additions & 35 deletions src/transport/multicast/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm,
case _Z_MID_T_FRAME: {
_Z_DEBUG("Received _Z_FRAME message");
if (entry == NULL) {
_Z_INFO("Dropping _Z_FRAME from unknown peer");
break;
}
// Note that we receive data from peer
entry->_received = true;

// Check if the SN is correct
Expand All @@ -148,6 +150,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm,
entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._frame._sn;
} else {
#if Z_FEATURE_FRAGMENTATION == 1
entry->_state_reliable = _Z_DBUF_STATE_NULL;
_z_wbuf_clear(&entry->_dbuf_reliable);
#endif
_Z_INFO("Reliable message dropped because it is out of order");
Expand All @@ -159,6 +162,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm,
entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._frame._sn;
} else {
#if Z_FEATURE_FRAGMENTATION == 1
entry->_state_best_effort = _Z_DBUF_STATE_NULL;
_z_wbuf_clear(&entry->_dbuf_best_effort);
#endif
_Z_INFO("Best effort message dropped because it is out of order");
Expand All @@ -184,48 +188,78 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm,
_Z_DEBUG("Received Z_FRAGMENT message");
#if Z_FEATURE_FRAGMENTATION == 1
if (entry == NULL) {
_Z_INFO("Dropping Z_FRAGMENT from unknown peer");
break;
}
// Note that we receive data from the peer
entry->_received = true;

_z_wbuf_t *dbuf = _Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)
? &entry->_dbuf_reliable
: &entry->_dbuf_best_effort; // Select the right defragmentation buffer

bool drop = false;
if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) {
// Filling the wbuf capacity as a way to signaling the last fragment to reset the dbuf
// Otherwise, last (smaller) fragments can be understood as a complete message
_z_wbuf_write_bytes(dbuf, t_msg->_body._fragment._payload.start, 0, _z_wbuf_space_left(dbuf));
drop = true;
_z_wbuf_t *dbuf;
uint8_t *dbuf_state;
// Select the right defragmentation buffer
if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)) {
dbuf = &entry->_dbuf_reliable;
dbuf_state = &entry->_state_reliable;
} else {
_z_wbuf_write_bytes(dbuf, t_msg->_body._fragment._payload.start, 0,
t_msg->_body._fragment._payload.len);
dbuf = &entry->_dbuf_best_effort;
dbuf_state = &entry->_state_best_effort;
}

// Allocate buffer if needed
if (*dbuf_state == _Z_DBUF_STATE_NULL) {
*dbuf = _z_wbuf_make(Z_FRAG_MAX_SIZE, false);
if (_z_wbuf_capacity(dbuf) != Z_FRAG_MAX_SIZE) {
_Z_ERROR("Not enough memory to allocate peer defragmentation buffer");
ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY;
break;
}
*dbuf_state = _Z_DBUF_STATE_INIT;
}
// Process fragment data
if (*dbuf_state == _Z_DBUF_STATE_INIT) {
// Check overflow
if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) {
*dbuf_state = _Z_DBUF_STATE_OVERFLOW;
} else {
// Fill buffer
_z_wbuf_write_bytes(dbuf, t_msg->_body._fragment._payload.start, 0,
t_msg->_body._fragment._payload.len);
}
}
// Process final fragment
if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_M) == false) {
if (drop == true) { // Drop message if it exceeds the fragmentation size
_z_wbuf_reset(dbuf);
// Drop message if it exceeds the fragmentation size
if (*dbuf_state == _Z_DBUF_STATE_OVERFLOW) {
_Z_INFO("Fragment dropped because defragmentation buffer has overflown");
_z_wbuf_clear(dbuf);
*dbuf_state = _Z_DBUF_STATE_NULL;
break;
}

_z_zbuf_t zbf = _z_wbuf_to_zbuf(dbuf); // Convert the defragmentation buffer into a decoding buffer

// Convert the defragmentation buffer into a decoding buffer
_z_zbuf_t zbf = _z_wbuf_moved_as_zbuf(dbuf);
if (_z_zbuf_capacity(&zbf) == 0) {
_Z_ERROR("Failed to convert defragmentation buffer into a decoding buffer!");
_z_wbuf_clear(dbuf);
*dbuf_state = _Z_DBUF_STATE_NULL;
ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY;
break;
}
// Decode message
_z_zenoh_message_t zm;
ret = _z_network_message_decode(&zm, &zbf);
zm._reliability = _z_t_msg_get_reliability(t_msg);
if (ret == _Z_RES_OK) {
uint16_t mapping = entry->_peer_id;
_z_msg_fix_mapping(&zm, mapping);
_z_handle_network_message(ztm->_session, &zm, mapping);
_z_msg_clear(&zm); // Clear must be explicitly called for fragmented zenoh messages. Non-fragmented
// zenoh messages are released when their transport message is released.
} else {
_Z_INFO("Failed to decode defragmented message");
ret = _Z_ERR_MESSAGE_DESERIALIZATION_FAILED;
}

// Fragmented messages must be cleared. Non-fragmented messages are released with their transport.
_z_msg_clear(&zm);
// Free the decoding buffer
_z_zbuf_clear(&zbf);
// Reset the defragmentation buffer
_z_wbuf_reset(dbuf);
*dbuf_state = _Z_DBUF_STATE_NULL;
}
#else
_Z_INFO("Fragment dropped because fragmentation feature is deactivated");
Expand Down Expand Up @@ -280,18 +314,10 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm,
_z_conduit_sn_list_decrement(entry->_sn_res, &entry->_sn_rx_sns);

#if Z_FEATURE_FRAGMENTATION == 1
#if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 1
entry->_dbuf_reliable = _z_wbuf_make(0, true);
entry->_dbuf_best_effort = _z_wbuf_make(0, true);
#else
entry->_dbuf_reliable = _z_wbuf_make(Z_FRAG_MAX_SIZE, false);
entry->_dbuf_best_effort = _z_wbuf_make(Z_FRAG_MAX_SIZE, false);

if ((_z_wbuf_capacity(&entry->_dbuf_reliable) != Z_FRAG_MAX_SIZE) ||
(_z_wbuf_capacity(&entry->_dbuf_best_effort) != Z_FRAG_MAX_SIZE)) {
_Z_ERROR("Not enough memory to allocate peer defragmentation buffers!");
}
#endif
entry->_state_reliable = _Z_DBUF_STATE_NULL;
entry->_state_best_effort = _Z_DBUF_STATE_NULL;
entry->_dbuf_reliable = _z_wbuf_null();
entry->_dbuf_best_effort = _z_wbuf_null();
#endif
// Update lease time (set as ms during)
entry->_lease = t_msg->_body._join._lease;
Expand Down
2 changes: 2 additions & 0 deletions src/transport/peer_entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ void _z_transport_peer_entry_clear(_z_transport_peer_entry_t *src) {

void _z_transport_peer_entry_copy(_z_transport_peer_entry_t *dst, const _z_transport_peer_entry_t *src) {
#if Z_FEATURE_FRAGMENTATION == 1
dst->_state_reliable = src->_state_reliable;
dst->_state_best_effort = src->_state_best_effort;
_z_wbuf_copy(&dst->_dbuf_reliable, &src->_dbuf_reliable);
_z_wbuf_copy(&dst->_dbuf_best_effort, &src->_dbuf_best_effort);
#endif
Expand Down
Loading

0 comments on commit b6a4599

Please sign in to comment.