Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use session to create timestamp #530

Merged
merged 12 commits into from
Jul 11, 2024
9 changes: 9 additions & 0 deletions examples/unix/c11/z_pub_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <zenoh-pico.h>

Expand Down Expand Up @@ -144,6 +145,11 @@ int main(int argc, char **argv) {
// Create encoding
z_owned_encoding_t encoding;

// Create timestamp
z_timestamp_t ts;
time_t now = time(NULL);
z_timestamp_new(&ts, z_loan(s), (uint64_t)now);

// Publish data
printf("Press CTRL-C to quit...\n");
char buf[256];
Expand All @@ -167,6 +173,9 @@ int main(int argc, char **argv) {
z_encoding_from_str(&encoding, "zenoh/string;utf8");
options.encoding = z_move(encoding);

// Add timestamp
options.timestamp = &ts;

z_publisher_put(z_loan(pub), z_move(payload), &options);
}
// Clean up
Expand Down
7 changes: 7 additions & 0 deletions examples/unix/c11/z_sub_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//

#include <ctype.h>
#include <inttypes.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
Expand Down Expand Up @@ -79,6 +80,12 @@ void data_handler(const z_loaned_sample_t *sample, void *ctx) {

printf(">> [Subscriber] Received ('%s': '%s')\n", z_string_data(z_loan(keystr)), z_string_data(z_loan(value)));
printf(" with encoding: %s\n", z_string_data(z_loan(encoding)));

// Check timestamp
const z_timestamp_t *ts = z_sample_timestamp(sample);
if (ts != NULL) {
printf(" with timestamp: %" PRIu64 "\n", z_timestamp_npt64_time(ts));
}
// Check attachment
kv_pairs_t kvp = {.current_idx = 0, .len = KVP_LEN, .data = (kv_pair_t *)malloc(KVP_LEN * sizeof(kv_pair_t))};
parse_attachment(&kvp, z_sample_attachment(sample));
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -948,13 +948,13 @@ int8_t z_bytes_writer_write(z_loaned_bytes_writer_t *writer, const uint8_t *src,
*
* Parameters:
* ts: An uninitialized :c:type:`z_timestamp_t`.
* zs: Pointer to a :c:type:`z_loaned_session_t` to get the id from.
* npt64_time: NPT64 time.
* zid: id associated with this timestamp
*
* Return:
* ``0`` if encode successful, ``negative value`` otherwise.
*/
int8_t z_timestamp_new(z_timestamp_t *ts, const z_id_t *zid, uint64_t npt64_time);
int8_t z_timestamp_new(z_timestamp_t *ts, const z_loaned_session_t *zs, uint64_t npt64_time);

/**
* Returns NPT64 time associated with this timestamp.
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void _z_sample_free(_z_sample_t **sample);
int8_t _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src);
_z_sample_t _z_sample_duplicate(const _z_sample_t *src);

_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, _z_timestamp_t timestamp,
_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment);

Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t

_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub);
int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment);
void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub);
void _z_flush_subscriptions(_z_session_t *zn);
Expand Down
4 changes: 2 additions & 2 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,8 @@ int8_t z_bytes_writer_write(z_loaned_bytes_writer_t *writer, const uint8_t *src,
return _z_bytes_writer_write(writer, src, len);
}

int8_t z_timestamp_new(z_timestamp_t *ts, const z_id_t *zid, uint64_t npt64_time) {
ts->id = *zid;
int8_t z_timestamp_new(z_timestamp_t *ts, const z_loaned_session_t *zs, uint64_t npt64_time) {
ts->id = zs->in->val._local_zid;
ts->time = npt64_time;
return _Z_RES_OK;
}
Expand Down
6 changes: 3 additions & 3 deletions src/net/sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,21 @@ _z_sample_t _z_sample_duplicate(const _z_sample_t *src) {
}

#if Z_FEATURE_SUBSCRIPTION == 1
_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t timestamp,
_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment) {
_z_sample_t s = _z_sample_null();
s.keyexpr = _z_keyexpr_steal(key);
s.kind = kind;
s.timestamp = timestamp;
s.timestamp = _z_timestamp_duplicate(timestamp);
s.qos = qos;
_z_bytes_copy(&s.payload, &payload);
_z_bytes_copy(&s.attachment, &attachment);
_z_encoding_move(&s.encoding, encoding);
return s;
}
#else
_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t timestamp,
_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment) {
_ZP_UNUSED(key);
Expand Down
60 changes: 28 additions & 32 deletions src/protocol/codec/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ int8_t _z_id_encode_as_slice(_z_wbuf_t *wbf, const _z_id_t *id) {
uint8_t len = _z_id_len(*id);

if (len != 0) {
printf("ZIDLEN: %d\n", len);
_z_slice_t buf = _z_slice_wrap(id->id, len);
ret = _z_slice_encode(wbf, &buf);
} else {
Expand Down Expand Up @@ -325,42 +324,39 @@ int8_t _z_push_body_decode_extensions(_z_msg_ext_t *extension, void *ctx) {

int8_t _z_push_body_decode(_z_push_body_t *pshb, _z_zbuf_t *zbf, uint8_t header) {
int8_t ret = _Z_RES_OK;
if (ret == _Z_RES_OK) {
switch (_Z_MID(header)) {
case _Z_MID_Z_PUT: {
pshb->_is_put = true;
pshb->_body._put = (_z_msg_put_t){0};
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_P_T)) {
_Z_RETURN_IF_ERR(_z_timestamp_decode(&pshb->_body._put._commons._timestamp, zbf));
}
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_P_E)) {
_Z_RETURN_IF_ERR(_z_encoding_decode(&pshb->_body._put._encoding, zbf));
}
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_push_body_decode_extensions, pshb));
}
if (ret == _Z_RES_OK) {
_Z_RETURN_IF_ERR(_z_bytes_decode(&pshb->_body._put._payload, zbf));
}
break;
switch (_Z_MID(header)) {
case _Z_MID_Z_PUT: {
pshb->_is_put = true;
pshb->_body._put = (_z_msg_put_t){0};
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_P_T)) {
_Z_RETURN_IF_ERR(_z_timestamp_decode(&pshb->_body._put._commons._timestamp, zbf));
}
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_P_E)) {
_Z_RETURN_IF_ERR(_z_encoding_decode(&pshb->_body._put._encoding, zbf));
}
case _Z_MID_Z_DEL: {
pshb->_is_put = false;
pshb->_body._del = (_z_msg_del_t){0};
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_D_T)) {
_Z_RETURN_IF_ERR(_z_timestamp_decode(&pshb->_body._put._commons._timestamp, zbf));
}
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_push_body_decode_extensions, pshb));
}
break;
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_push_body_decode_extensions, pshb));
}
default: {
ret = _Z_ERR_MESSAGE_ZENOH_UNKNOWN;
if (ret == _Z_RES_OK) {
_Z_RETURN_IF_ERR(_z_bytes_decode(&pshb->_body._put._payload, zbf));
}
break;
}
case _Z_MID_Z_DEL: {
pshb->_is_put = false;
pshb->_body._del = (_z_msg_del_t){0};
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_D_T)) {
_Z_RETURN_IF_ERR(_z_timestamp_decode(&pshb->_body._put._commons._timestamp, zbf));
}
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_push_body_decode_extensions, pshb));
}
break;
}
default: {
ret = _Z_ERR_MESSAGE_ZENOH_UNKNOWN;
}
}

return ret;
}

Expand Down
4 changes: 2 additions & 2 deletions src/session/push.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) {
if (push->_body._is_put) {
ret =
_z_trigger_subscriptions(zn, push->_key, push->_body._body._put._payload, &push->_body._body._put._encoding,
kind, push->_timestamp, push->_qos, push->_body._body._put._attachment);
kind, &push->_timestamp, push->_qos, push->_body._body._put._attachment);
} else {
_z_encoding_t encoding = _z_encoding_null();
_z_bytes_t payload = _z_bytes_null();
ret = _z_trigger_subscriptions(zn, push->_key, payload, &encoding, kind, push->_timestamp, push->_qos,
ret = _z_trigger_subscriptions(zn, push->_key, payload, &encoding, kind, &push->_timestamp, push->_qos,
push->_body._body._put._attachment);
}
#else
Expand Down
4 changes: 2 additions & 2 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ int8_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *msg,
#if Z_FEATURE_SUBSCRIPTION == 1
_z_msg_put_t put = req->_body._put;
ret = _z_trigger_subscriptions(zn, req->_key, put._payload, &put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp, req->_ext_qos, put._attachment);
&put._commons._timestamp, req->_ext_qos, put._attachment);
#endif
if (ret == _Z_RES_OK) {
_z_network_message_t final = _z_n_msg_make_response_final(req->_rid);
Expand All @@ -114,7 +114,7 @@ int8_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *msg,
_z_msg_del_t del = req->_body._del;
_z_encoding_t encoding = _z_encoding_null();
ret = _z_trigger_subscriptions(zn, req->_key, _z_bytes_null(), &encoding, Z_SAMPLE_KIND_DELETE,
del._commons._timestamp, req->_ext_qos, del._attachment);
&del._commons._timestamp, req->_ext_qos, del._attachment);
#endif
if (ret == _Z_RES_OK) {
_z_network_message_t final = _z_n_msg_make_response_final(req->_rid);
Expand Down
7 changes: 4 additions & 3 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,14 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_n_qos_t qos, const _z_bytes_t attachment) {
_z_encoding_t encoding = _z_encoding_null();
int8_t ret = _z_trigger_subscriptions(zn, keyexpr, payload, &encoding, Z_SAMPLE_KIND_PUT, _z_timestamp_null(), qos,
attachment);
_z_timestamp_t timestamp = _z_timestamp_null();
int8_t ret =
_z_trigger_subscriptions(zn, keyexpr, payload, &encoding, Z_SAMPLE_KIND_PUT, &timestamp, qos, attachment);
(void)ret;
}

int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment) {
int8_t ret = _Z_RES_OK;

Expand Down
6 changes: 1 addition & 5 deletions src/session/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ void _z_timestamp_clear(_z_timestamp_t *tstamp) {
tstamp->time = 0;
}

_Bool _z_timestamp_check(const _z_timestamp_t *stamp) {
for (uint8_t i = 0; i < sizeof(_z_id_t); ++i)
if (stamp->id.id[i]) return true;
return false;
}
_Bool _z_timestamp_check(const _z_timestamp_t *stamp) { return _z_id_check(stamp->id); }

int8_t _z_session_generate_zid(_z_id_t *bs, uint8_t size) {
int8_t ret = _Z_RES_OK;
Expand Down
Loading