Skip to content

Commit

Permalink
fixup! Split unpublish request into multiple request
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielius1922 committed Oct 28, 2023
1 parent 02f7d04 commit 86fb6c8
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 108 deletions.
203 changes: 117 additions & 86 deletions api/cloud/rd_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "oc_rep.h"
#include "rd_client_internal.h"
#include "util/oc_buffer_internal.h"
#include "util/oc_macros_internal.h"
#include "util/oc_secure_string_internal.h"

#include <assert.h>
Expand Down Expand Up @@ -150,38 +151,29 @@ rd_publish(const oc_link_t *links, const oc_endpoint_t *endpoint, size_t device,
return status;
}

static oc_write_buffer_t
rd_prepare_write_buffer(char *buffer, size_t buffer_size, oc_string_view_t id)
{
assert(buffer_size > 3 + id.length);
oc_write_buffer_t wb = {
.buffer = buffer,
.buffer_size = buffer_size,
.total = 0,
};
memcpy(wb.buffer, "di=", 3);
wb.buffer = wb.buffer + 3;
wb.buffer_size = wb.buffer_size - 3;
wb.total = wb.total + 3;
memcpy(wb.buffer, id.data, id.length);
wb.buffer = wb.buffer + id.length;
wb.buffer_size = wb.buffer_size - id.length;
wb.total = wb.total + id.length;
return wb;
}

static bool
rd_delete_do_request(const oc_endpoint_t *endpoint, const char *buffer,
oc_response_handler_t handler, oc_qos_t qos,
void *user_data)
rd_prepare_write_buffer(oc_write_buffer_t *wb, char *buffer, size_t buffer_size,
oc_string_view_t id)
{
OC_DBG("Unpublishing links: %s", buffer);
if (oc_do_delete(OC_RSRVD_RD_URI, endpoint, buffer, handler, qos,
user_data)) {
return true;
// enough room for at least one link
// di={id}&ins={$instanceID} + \0
if (buffer_size <=
/*di=*/3 + id.length + /*&ins=*/5 + /*min instanceID*/ 1 + /*\0*/ 1) {
OC_ERR("buffer too small");
return false;
}
OC_ERR("failed to unpublish links: %s", buffer);
return false;
wb->buffer = buffer;
wb->buffer_size = buffer_size;
wb->total = 0;
memcpy(wb->buffer, "di=", OC_CHAR_ARRAY_LEN("di="));
wb->buffer += OC_CHAR_ARRAY_LEN("di=");
wb->buffer_size -= OC_CHAR_ARRAY_LEN("di=");
wb->total += OC_CHAR_ARRAY_LEN("di=");
memcpy(wb->buffer, id.data, id.length);
wb->buffer += id.length;
wb->buffer_size -= id.length;
wb->total += id.length;
return true;
}

static coap_packet_t
Expand All @@ -208,78 +200,112 @@ rd_delete_check_packet_size(coap_packet_t *packet, const char *query,
return coap_check_header_size(hdr.size, 0);
}

typedef struct
{
oc_response_handler_t handler;
oc_qos_t qos;
void *user_data;
} rd_delete_packet_t;

static bool
rd_delete_with_device_id(oc_link_t *links, const oc_endpoint_t *endpoint,
oc_string_view_t id, oc_response_handler_t handler,
oc_qos_t qos, void *user_data,
rd_links_partition_t *links_partition)
rd_delete_send_packet(const oc_endpoint_t *endpoint, oc_string_view_t query,
void *data)
{
rd_delete_packet_t *pkt = (rd_delete_packet_t *)data;
OC_DBG("Unpublishing links (query=%s)", query.data);
if (!oc_do_delete(OC_RSRVD_RD_URI, endpoint, query.data, pkt->handler,
pkt->qos, pkt->user_data)) {
OC_ERR("failed to unpublish links (query=%s)", query.data);
return false;
}
return true;
}

// split the linked lists into two parts, the first part contains the
// links that were sent, the second part contains the links that were
// not sent
static void
rd_partition_links(rd_links_partition_t *partition, oc_link_t *links,
oc_link_t *deleted_tail)
{
if (deleted_tail == NULL) {
partition->deleted = NULL;
partition->not_deleted = links;
return;
}
partition->deleted = links;
partition->not_deleted = deleted_tail->next;
deleted_tail->next = NULL;
}

rd_delete_result_t
rd_delete_iterate_links(oc_link_t *links, const oc_endpoint_t *endpoint,
oc_string_view_t id, char *buffer, size_t buffer_size,
rd_delete_on_packet_ready_t on_packet_ready,
void *on_packet_ready_data,
rd_links_partition_t *links_partition)
{
assert(links != NULL);
assert(endpoint != NULL);
assert(handler != NULL);
assert(buffer != NULL);
assert(on_packet_ready != NULL);
assert(links_partition != NULL);

links_partition->deleted = links;

// TODO set the buffer size based on COAP_MAX_HEADER_SIZE
char buffer[COAP_MAX_HEADER_SIZE] = { 0 };
size_t buffer_size = OC_ARRAY_SIZE(buffer);
oc_write_buffer_t wb_start = rd_prepare_write_buffer(buffer, buffer_size, id);
oc_write_buffer_t wb = wb_start;
oc_link_t *prev_chunk_tail = NULL;
oc_link_t *chunk_head = links;
oc_link_t *link = links;
oc_link_t *prev_link = NULL;
oc_write_buffer_t wb_init;
if (!rd_prepare_write_buffer(&wb_init, buffer, buffer_size, id)) {
return RD_DELETE_ERROR;
}
oc_write_buffer_t wb = wb_init;
oc_link_t *deleted_tail = NULL;
coap_packet_t packet = rd_delete_packet(endpoint);
for (oc_link_t *link = links; link != NULL;) {
while (true) {
// written buffer up to this point
size_t written = wb.total;
if (oc_buffer_write(&wb, "&ins=%" PRId64 "", link->ins) > 0) {
if (!rd_delete_check_packet_size(&packet, buffer, wb.total)) {
buffer[written] = '\0';
if (!rd_delete_do_request(endpoint, buffer, handler, qos, user_data)) {
if (prev_chunk_tail != NULL) {
prev_chunk_tail->next = NULL;
}
links_partition->not_deleted = chunk_head;
return false;
}
// start a new chunk of links to unpublish
prev_chunk_tail = link;
chunk_head = link->next;
wb = wb_start;
continue;
}
// the buffer is full, it contains the truncated query so must take the
// previously written data only
bool buffer_full = oc_buffer_write(&wb, "&ins=%" PRId64 "", link->ins) < 0;
// we can't fit the query into the packet, send the packet with the
// previously written data only
bool packet_full =
!buffer_full && !rd_delete_check_packet_size(&packet, buffer, wb.total);

if (link->next == NULL) {
buffer[wb.total] = '\0';
if (!rd_delete_do_request(endpoint, buffer, handler, qos, user_data)) {
if (prev_chunk_tail != NULL) {
prev_chunk_tail->next = NULL;
}
links_partition->not_deleted = chunk_head;
return false;
}
break;
if (buffer_full || packet_full) {
buffer[written] = '\0';
oc_string_view_t query = oc_string_view(buffer, written);
if (!on_packet_ready(endpoint, query, on_packet_ready_data)) {
rd_partition_links(links_partition, links, deleted_tail);
return RD_DELETE_PARTIAL;
}
link = link->next;
// update the tail of the sent chunk
deleted_tail = prev_link;
// reinitialize the write buffer with the "di={id}" part only
wb = wb_init;
// don't advance the link pointer, we need to send the current link in the
// next packet
continue;
}

buffer[written] = '\0';
if (!rd_delete_do_request(endpoint, buffer, handler, qos, user_data)) {
if (prev_chunk_tail != NULL) {
prev_chunk_tail->next = NULL;
if (link->next == NULL) {
// this is the last link, send the packet with the query
buffer[wb.total] = '\0';
oc_string_view_t query = oc_string_view(buffer, wb.total);
if (!on_packet_ready(endpoint, query, on_packet_ready_data)) {
rd_partition_links(links_partition, links, deleted_tail);
return RD_DELETE_PARTIAL;
}
links_partition->not_deleted = chunk_head;
return false;
break;
}

// start a new chunk of links to unpublish
prev_chunk_tail = link;
chunk_head = link->next;
wb = wb_start;
// advance the link pointers
prev_link = link;
link = link->next;
}

return true;
links_partition->deleted = links;
links_partition->not_deleted = NULL;
return RD_DELETE_ALL;
}

rd_delete_result_t
Expand All @@ -297,10 +323,15 @@ rd_delete(oc_link_t *links, const oc_endpoint_t *endpoint, size_t device,
oc_uuid_to_str_v1(&device_info->di, uuid_buf, OC_ARRAY_SIZE(uuid_buf));
assert(uuid_len > 0);
oc_string_view_t uuid = oc_string_view(uuid_buf, (size_t)uuid_len);
return rd_delete_with_device_id(links, endpoint, uuid, handler, qos,
user_data, links_partition)
? RD_DELETE_ALL
: RD_DELETE_PARTIAL;
char buffer[COAP_MAX_HEADER_SIZE] = { 0 };
rd_delete_packet_t pkt = {
.handler = handler,
.qos = qos,
.user_data = user_data,
};
return rd_delete_iterate_links(links, endpoint, uuid, buffer,
OC_ARRAY_SIZE(buffer), rd_delete_send_packet,
&pkt, links_partition);
}

#endif /* OC_CLOUD */
28 changes: 27 additions & 1 deletion api/cloud/rd_client_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ typedef struct
{
oc_link_t *deleted; /// Linked list of deleted resource links.
oc_link_t *not_deleted; /// Linked list of not deleted resource links.
bool success; /// True if all resource links were deleted successfully.
} rd_links_partition_t;

typedef enum {
Expand All @@ -81,6 +80,33 @@ typedef enum {
RD_DELETE_ERROR = -1,
} rd_delete_result_t;

typedef bool (*rd_delete_on_packet_ready_t)(const oc_endpoint_t *endpoint,
oc_string_view_t query, void *data);

/**
* @brief Iterate resource links, write query to buffer and invoke
* on_packet_ready when the buffer or packet is full.
*
* @param links List of resource links which to iterate (cannot be NULL).
* @param endpoint The endpoint of the RD (cannot be NULL).
* @param id The id of the device to delete.
* @param buffer The buffer to write the query to (cannot be NULL).
* @param buffer_size The size of the buffer.
* @param on_packet_ready The callback to invoke when the buffer or packet is
* full (cannot be NULL).
* @param on_packet_ready_data The data to pass to the callback.
* @param links_partition The partition of links into deleted and not deleted
* (cannot be NULL). The input links list is split into two lists, one for
* deleted and one for not deleted links.
*
* @return rd_delete_result_t Result of the delete operation.
*/
rd_delete_result_t rd_delete_iterate_links(
oc_link_t *links, const oc_endpoint_t *endpoint, oc_string_view_t id,
char *buffer, size_t buffer_size, rd_delete_on_packet_ready_t on_packet_ready,
void *on_packet_ready_data, rd_links_partition_t *links_partition)
OC_NONNULL(1, 2, 4, 6, 8);

/**
@brief Delete RD resource from Resource Directory.
Expand Down
Loading

0 comments on commit 86fb6c8

Please sign in to comment.