Skip to content

Commit

Permalink
Update z_pull examples with the channels
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Aug 7, 2024
1 parent 38fd099 commit 076ed28
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 235 deletions.
84 changes: 43 additions & 41 deletions examples/arduino/z_pull.ino
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,10 @@

#define KEYEXPR "demo/example/**"

// @TODO
// z_owned_pull_subscriber_t sub;
const size_t INTERVAL = 5000;
const size_t SIZE = 3;

// @TODO
// void data_handler(const z_loaned_sample_t *sample, void *arg) {
// z_view_string_t keystr;
// z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr);
// std::string val((const char *)sample->payload.start, sample->payload.len);

// Serial.print(" >> [Subscription listener] Received (");
// Serial.print(z_string_data(z_view_string_loan(&keystr)));
// Serial.print(", ");
// Serial.print(val.c_str());
// Serial.println(")");

// }
z_owned_ring_handler_sample_t handler;

void setup() {
// Initialize Serial for debug
Expand Down Expand Up @@ -86,37 +74,51 @@ void setup() {
}
Serial.println("OK");

// Start the receive and the session lease loop for zenoh-pico
zp_start_read_task(z_session_loan_mut(&s), NULL);
zp_start_lease_task(z_session_loan_mut(&s), NULL);

// Declare Zenoh subscriber
Serial.print("Declaring Subscriber on ");
Serial.print(KEYEXPR);
Serial.println(" ...");
// @TODO
// z_owned_closure_sample_t callback;
// z_closure_sample(&callback, data_handler, NULL, NULL);
// @TODO
// z_view_keyexpr_t ke;
// z_view_keyexpr_from_str_unchecked(&ke, KEYEXPR);
// sub = z_declare_pull_subscriber(z_session_loan(&s), z_view_keyexpr_loan(&ke), z_closure_sample_move(&callback),
// NULL); if (!z_pull_subscriber_check(&sub)) {
// Serial.println("Unable to declare subscriber.");
// while (1) {
// ;
// }
// }
// Serial.println("OK");
// Serial.println("Zenoh setup finished!");
Serial.println("Pull Subscriber not supported... exiting");
if (zp_start_read_task(z_session_loan_mut(&s), NULL) < 0 || zp_start_lease_task(z_session_loan_mut(&s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_close(z_session_move(&s));
return;
}

printf("Declaring Subscriber on '%s'...\n", KEYEXPR);
z_owned_closure_sample_t closure;
z_ring_channel_sample_new(&closure, &handler, SIZE);
z_owned_subscriber_t sub;
z_view_keyexpr_t ke;
z_view_keyexpr_from_str(&ke, KEYEXPR);
if (z_declare_subscriber(&sub, z_session_loan(&s), z_view_keyexpr_loan(&ke), z_closure_sample_move(&closure),
NULL) < 0) {
Serial.println("Unable to declare subscriber.");
return;
}

Serial.println("OK");
Serial.println("Zenoh setup finished!");

delay(300);
}

void loop() {
delay(5000);
// z_subscriber_pull(z_pull_subscriber_loan(&sub));
z_owned_sample_t sample;
z_result_t res;
for (res = z_ring_handler_sample_try_recv(z_ring_handler_sample_loan(&handler), &sample); res == Z_OK;
res = z_ring_handler_sample_try_recv(z_ring_handler_sample_loan(&handler), &sample)) {
z_view_string_t keystr;
z_keyexpr_as_view_string(z_sample_keyexpr(z_sample_loan(&sample)), &keystr);
z_owned_string_t value;
z_bytes_deserialize_into_string(z_sample_payload(z_sample_loan(&sample)), &value);
Serial.print(">> [Subscriber] Pulled (");
Serial.print(z_string_data(z_view_string_loan(&keystr)));
Serial.print(": ");
Serial.print(z_string_data(z_string_loan(&value)));
Serial.println(")");

z_string_drop(z_string_move(&value));
z_sample_drop(z_sample_move(&sample));
}
if (res == Z_CHANNEL_NODATA) {
delay(INTERVAL);
}
}
#else
void setup() {
Expand Down
82 changes: 46 additions & 36 deletions examples/espidf/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ static int s_retry_count = 0;

#define KEYEXPR "demo/example/**"

const size_t INTERVAL = 5000;
const size_t SIZE = 3;

static void event_handler(void* arg, esp_event_base_t event_base, int32_t event_id, void* event_data) {
if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_START) {
esp_wifi_connect();
Expand Down Expand Up @@ -100,15 +103,6 @@ void wifi_init_sta(void) {
vEventGroupDelete(s_event_group_handler);
}

// @TODO
// void data_handler(const z_loaned_sample_t* sample, void* arg) {
// z_view_string_t keystr;
// z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr);
// printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_string_data(z_loan(keystr)),
// (int)sample->payload.len,
// sample->payload.start);
// }

void app_main() {
esp_err_t ret = nvs_flash_init();
if (ret == ESP_ERR_NVS_NO_FREE_PAGES || ret == ESP_ERR_NVS_NEW_VERSION_FOUND) {
Expand Down Expand Up @@ -143,33 +137,49 @@ void app_main() {
}
printf("OK\n");

// Start the receive and the session lease loop for zenoh-pico
zp_start_read_task(z_loan_mut(s), NULL);
zp_start_lease_task(z_loan_mut(s), NULL);

// @TODO
// z_owned_closure_sample_t callback;
// z_closure(&callback, data_handler);
printf("Declaring Subscriber on '%s'...", KEYEXPR);
// @TODO
// z_view_keyexpr_t ke;
// z_view_keyexpr_from_str_unchecked(&ke, KEYEXPR);
// z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_loan(ke), z_move(callback), NULL);
// if (!z_check(sub)) {
// printf("Unable to declare subscriber.\n");
// exit(-1);
// }
// printf("OK!\n");

// while (1) {
// sleep(5);
// printf("Pulling data from '%s'...\n", KEYEXPR);
// z_subscriber_pull(z_loan(sub));
// }

// printf("Closing Zenoh Session...");
// z_undeclare_pull_subscriber(z_move(sub));
printf("Pull Subscriber not supported... exiting\n");
// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_close(z_session_move(&s));
exit(-1);
}

printf("Declaring Subscriber on '%s'...\n", KEYEXPR);
z_owned_closure_sample_t closure;
z_owned_ring_handler_sample_t handler;
z_ring_channel_sample_new(&closure, &handler, SIZE);
z_owned_subscriber_t sub;
z_view_keyexpr_t ke;
z_view_keyexpr_from_str(&ke, KEYEXPR);
if (z_declare_subscriber(&sub, z_loan(s), z_loan(ke), z_move(closure), NULL) < 0) {
printf("Unable to declare subscriber.\n");
exit(-1);
}

printf("Pulling data every %zu ms... Ring size: %zd\n", INTERVAL, SIZE);
z_owned_sample_t sample;
while (true) {
z_result_t res;
for (res = z_try_recv(z_loan(handler), &sample); res == Z_OK; res = z_try_recv(z_loan(handler), &sample)) {
z_view_string_t keystr;
z_keyexpr_as_view_string(z_sample_keyexpr(z_loan(sample)), &keystr);
z_owned_string_t value;
z_bytes_deserialize_into_string(z_sample_payload(z_loan(sample)), &value);
printf(">> [Subscriber] Pulled ('%s': '%s')\n", z_string_data(z_loan(keystr)),
z_string_data(z_loan(value)));
z_drop(z_move(value));
z_drop(z_move(sample));
}
if (res == Z_CHANNEL_NODATA) {
printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", INTERVAL);
z_sleep_ms(INTERVAL);
} else {
break;
}
}

z_undeclare_subscriber(z_move(sub));
z_drop(z_move(handler));

z_close(z_move(s));
printf("OK!\n");
Expand Down
65 changes: 37 additions & 28 deletions examples/freertos_plus_tcp/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#include <zenoh-pico.h>

#if Z_FEATURE_PUBLICATION == 1
#if Z_FEATURE_SUBSCRIPTION == 1
#define CLIENT_OR_PEER 0 // 0: Client mode; 1: Peer mode
#if CLIENT_OR_PEER == 0
#define MODE "client"
Expand All @@ -28,19 +28,13 @@

#define KEYEXPR "demo/example/**"

// @TODO
// void data_handler(const z_loaned_sample_t *sample, void *ctx) {
// (void)(ctx);
// z_view_string_t keystr;
// z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr);
// printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_string_data(z_loan(keystr)), (int)sample->payload.len,
// sample->payload.start);
// }
const size_t INTERVAL = 5000;
const size_t SIZE = 3;

void app_main(void) {
z_owned_config_t config;
z_config_default(&config);
zp_config_insert(z_config_loan_mut(&config), Z_CONFIG_MODE_KEY, MODE);
zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, MODE);
if (strcmp(CONNECT, "") != 0) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, CONNECT);
}
Expand All @@ -59,27 +53,42 @@ void app_main(void) {
return;
}

// @TODO
// z_owned_closure_sample_t callback;
// z_closure(&callback, data_handler);
printf("Declaring Subscriber on '%s'...\n", KEYEXPR);
// @TODO
// z_view_keyexpr_t ke;
// z_view_keyexpr_from_str_unchecked(&ke, KEYEXPR);
// z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_loan(ke), z_move(callback), NULL);
// if (!z_check(sub)) {
// printf("Unable to declare subscriber.\n");
// return;
// }
z_owned_closure_sample_t closure;
z_owned_ring_handler_sample_t handler;
z_ring_channel_sample_new(&closure, &handler, SIZE);
z_owned_subscriber_t sub;
z_view_keyexpr_t ke;
z_view_keyexpr_from_str(&ke, KEYEXPR);
if (z_declare_subscriber(&sub, z_loan(s), z_loan(ke), z_move(closure), NULL) < 0) {
printf("Unable to declare subscriber.\n");
return;
}

// while (1) {
// z_sleep_s(5);
// printf("Pulling data from '%s'...\n", KEYEXPR);
// z_subscriber_pull(z_loan(sub));
// }
printf("Pulling data every %zu ms... Ring size: %zd\n", INTERVAL, SIZE);
z_owned_sample_t sample;
while (true) {
z_result_t res;
for (res = z_try_recv(z_loan(handler), &sample); res == Z_OK; res = z_try_recv(z_loan(handler), &sample)) {
z_view_string_t keystr;
z_keyexpr_as_view_string(z_sample_keyexpr(z_loan(sample)), &keystr);
z_owned_string_t value;
z_bytes_deserialize_into_string(z_sample_payload(z_loan(sample)), &value);
printf(">> [Subscriber] Pulled ('%s': '%s')\n", z_string_data(z_loan(keystr)),
z_string_data(z_loan(value)));
z_drop(z_move(value));
z_drop(z_move(sample));
}
if (res == Z_CHANNEL_NODATA) {
printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", INTERVAL);
z_sleep_ms(INTERVAL);
} else {
break;
}
}

// z_undeclare_pull_subscriber(z_move(sub));
printf("Pull Subscriber not supported... exiting\n");
z_undeclare_subscriber(z_move(sub));
z_drop(z_move(handler));

z_close(z_move(s));
}
Expand Down
75 changes: 44 additions & 31 deletions examples/mbed/z_pull.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,8 @@

#define KEYEXPR "demo/example/**"

// @TODO
// void data_handler(const z_loaned_sample_t *sample, void *arg) {
// z_view_string_t keystr;
// z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr);
// printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_string_data(z_view_string_loan(&keystr)),
// (int)sample->payload.len,
// sample->payload.start);
// }
const size_t INTERVAL = 5000;
const size_t SIZE = 3;

int main(int argc, char **argv) {
randLIB_seed_random();
Expand All @@ -63,32 +57,51 @@ int main(int argc, char **argv) {
}
printf("OK\n");

// Start the receive and the session lease loop for zenoh-pico
zp_start_read_task(z_session_loan_mut(&s), NULL);
zp_start_lease_task(z_session_loan_mut(&s), NULL);
// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_session_loan_mut(&s), NULL) < 0 || zp_start_lease_task(z_session_loan_mut(&s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_close(z_session_move(&s));
return -1;
}

// @TODO
// z_owned_closure_sample_t callback;
// z_closure_sample(&callback, data_handler, NULL, NULL);
printf("Declaring Subscriber on '%s'...", KEYEXPR);
// @TODO
// z_owned_pull_subscriber_t sub =
// z_declare_pull_subscriber(z_session_loan(&s), z_loan(ke), z_closure_sample_move(&callback), NULL);
// if (!z_pull_subscriber_check(&sub)) {
// printf("Unable to declare subscriber.\n");
// exit(-1);
// }
// printf("OK!\n");
printf("Declaring Subscriber on '%s'...\n", KEYEXPR);
z_owned_closure_sample_t closure;
z_owned_ring_handler_sample_t handler;
z_ring_channel_sample_new(&closure, &handler, SIZE);
z_owned_subscriber_t sub;
z_view_keyexpr_t ke;
z_view_keyexpr_from_str(&ke, KEYEXPR);
if (z_declare_subscriber(&sub, z_session_loan(&s), z_view_keyexpr_loan(&ke), z_closure_sample_move(&closure),
NULL) < 0) {
printf("Unable to declare subscriber.\n");
return -1;
}

// while (1) {
// z_sleep_s(5);
// printf("Pulling data from '%s'...\n", KEYEXPR);
// z_subscriber_pull(z_pull_subscriber_loan(&sub));
// }
printf("Pulling data every %zu ms... Ring size: %zd\n", INTERVAL, SIZE);
z_owned_sample_t sample;
while (true) {
z_result_t res;
for (res = z_ring_handler_sample_try_recv(z_ring_handler_sample_loan(&handler), &sample); res == Z_OK;
res = z_ring_handler_sample_try_recv(z_ring_handler_sample_loan(&handler), &sample)) {
z_view_string_t keystr;
z_keyexpr_as_view_string(z_sample_keyexpr(z_sample_loan(&sample)), &keystr);
z_owned_string_t value;
z_bytes_deserialize_into_string(z_sample_payload(z_sample_loan(&sample)), &value);
printf(">> [Subscriber] Pulled ('%s': '%s')\n", z_string_data(z_view_string_loan(&keystr)),
z_string_data(z_string_loan(&value)));
z_string_drop(z_string_move(&value));
z_sample_drop(z_sample_move(&sample));
}
if (res == Z_CHANNEL_NODATA) {
printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", INTERVAL);
z_sleep_ms(INTERVAL);
} else {
break;
}
}

// printf("Closing Zenoh Session...");
// z_undeclare_pull_subscriber(z_pull_subscriber_move(&sub));
printf("Pull Subscriber not supported... exiting\n");
z_undeclare_subscriber(z_subscriber_move(&sub));
z_ring_handler_sample_drop(z_ring_handler_sample_move(&handler));

z_close(z_session_move(&s));
printf("OK!\n");
Expand Down
Loading

0 comments on commit 076ed28

Please sign in to comment.