Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: refactor event callbacks #458

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fixup! api: refactor event callbacks
  • Loading branch information
Danielius1922 committed Jun 6, 2023
commit 350f558ee55fec333abd8c29a8f6dd9708991fc9
1 change: 1 addition & 0 deletions api/oc_event_callback_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ size_t oc_periodic_observe_callback_count(void);

#endif /* OC_SERVER */

/** @brief Process for timed events. */
OC_PROCESS_NAME(oc_timed_callback_events);

#ifdef __cplusplus
Expand Down
134 changes: 76 additions & 58 deletions api/oc_ri.c
Original file line number Diff line number Diff line change
Expand Up @@ -735,19 +735,6 @@ oc_ri_free_resource_properties(oc_resource_t *resource)
}
}

#ifdef OC_SERVER
static oc_event_callback_retval_t
oc_observe_notification_resource_defaults_delayed(void *data)
{
oc_resource_defaults_data_t *resource_defaults_data =
(oc_resource_defaults_data_t *)data;
notify_resource_defaults_observer(resource_defaults_data->resource,
resource_defaults_data->iface_mask, NULL);
oc_ri_dealloc_resource_defaults(resource_defaults_data);
return OC_EVENT_DONE;
}
#endif /* OC_SERVER */

oc_interface_mask_t
oc_ri_get_interface_mask(const char *iface, size_t iface_len)
{
Expand Down Expand Up @@ -950,6 +937,43 @@ ri_get_ocf_version_from_header(const coap_packet_t *request)

#ifdef OC_SERVER

#ifdef OC_COLLECTIONS
static bool
ri_add_collection_observation(oc_collection_t *collection,
const oc_endpoint_t *endpoint, bool is_batch)
{
oc_link_t *links = (oc_link_t *)oc_list_head(collection->links);
#ifdef OC_SECURITY
for (; links != NULL; links = links->next) {
if (links->resource == NULL ||
(links->resource->properties & OC_OBSERVABLE) == 0 ||
oc_sec_check_acl(OC_GET, links->resource, endpoint)) {
continue;
}
return false;
}
#else /* !OC_SECURITY */
(void)endpoint;
#endif /* OC_SECURITY */
if (is_batch) {
links = (oc_link_t *)oc_list_head(collection->links);
for (; links != NULL; links = links->next) {
if (links->resource == NULL ||
(links->resource->properties & OC_PERIODIC) == 0) {
continue;
}
if (!oc_periodic_observe_callback_add(links->resource)) {
// TODO: shouldn't we remove the periodic observe of links added by this
// call?
return false;
}
}
}
return true;
}

#endif /* OC_COLLECTIONS */

static bool
ri_add_observation(const coap_packet_t *request, const coap_packet_t *response,
oc_resource_t *resource, bool resource_is_collection,
Expand All @@ -971,27 +995,11 @@ ri_add_observation(const coap_packet_t *request, const coap_packet_t *response,
#ifdef OC_COLLECTIONS
if (resource_is_collection) {
oc_collection_t *collection = (oc_collection_t *)resource;
oc_link_t *links = (oc_link_t *)oc_list_head(collection->links);
#ifdef OC_SECURITY
for (; links != NULL; links = links->next) {
if (links->resource == NULL ||
(links->resource->properties & OC_OBSERVABLE) == 0 ||
oc_sec_check_acl(OC_GET, links->resource, endpoint)) {
continue;
}
if (!ri_add_collection_observation(collection, endpoint,
iface_query == OC_IF_B)) {
// TODO: shouldn't we remove the periodic observe callback here?
return false;
}
#endif /* OC_SECURITY */
if (iface_query == OC_IF_B) {
links = (oc_link_t *)oc_list_head(collection->links);
for (; links != NULL; links = links->next) {
if (links->resource != NULL &&
(links->resource->properties & OC_PERIODIC) != 0) {
oc_periodic_observe_callback_add(links->resource);
}
}
}
}
#else /* !OC_COLLECTIONS */
(void)resource_is_collection;
Expand Down Expand Up @@ -1070,6 +1078,18 @@ ri_handle_observation(const coap_packet_t *request, coap_packet_t *response,
}
return 2;
}

static oc_event_callback_retval_t
oc_observe_notification_resource_defaults_delayed(void *data)
{
oc_resource_defaults_data_t *resource_defaults_data =
(oc_resource_defaults_data_t *)data;
notify_resource_defaults_observer(resource_defaults_data->resource,
resource_defaults_data->iface_mask, NULL);
oc_ri_dealloc_resource_defaults(resource_defaults_data);
return OC_EVENT_DONE;
}

#endif /* OC_SERVER */

typedef struct
Expand All @@ -1096,7 +1116,8 @@ static void
ri_invoke_coap_entity_set_response(coap_packet_t *response,
ri_invoke_coap_entity_set_response_ctx_t ctx)
{
oc_response_buffer_t *response_buffer = ctx.response_obj->response_buffer;
const oc_response_buffer_t *response_buffer =
ctx.response_obj->response_buffer;

#ifdef OC_SERVER
oc_response_t *response_obj = ctx.response_obj;
Expand Down Expand Up @@ -1365,31 +1386,29 @@ oc_ri_invoke_coap_entity_handler(const coap_packet_t *request,
bool response_state_allocated = false;
bool enable_realloc_rep = false;
#endif /* OC_DYNAMIC_ALLOCATION */
if (cur_resource && !bad_request) {
if (!(*ctx.response_state)) {
OC_DBG("creating new block-wise response state");
*ctx.response_state = oc_blockwise_alloc_response_buffer(
uri_path, uri_path_len, endpoint, method, OC_BLOCKWISE_SERVER,
OC_MIN_APP_DATA_SIZE);
if (!(*ctx.response_state)) {
OC_ERR("failure to alloc response state");
bad_request = true;
} else {
if (cur_resource && !bad_request && *ctx.response_state == NULL) {
OC_DBG("creating new block-wise response state");
*ctx.response_state = oc_blockwise_alloc_response_buffer(
uri_path, uri_path_len, endpoint, method, OC_BLOCKWISE_SERVER,
OC_MIN_APP_DATA_SIZE);
if (*ctx.response_state == NULL) {
OC_ERR("failure to alloc response state");
bad_request = true;
} else {
#ifdef OC_DYNAMIC_ALLOCATION
#ifdef OC_APP_DATA_BUFFER_POOL
if (!request_buffer->block)
if (!request_buffer->block)
#endif /* OC_APP_DATA_BUFFER_POOL */
{
response_state_allocated = true;
}
{
response_state_allocated = true;
}
#endif /* OC_DYNAMIC_ALLOCATION */
if (uri_query_len > 0) {
oc_new_string(&(*ctx.response_state)->uri_query, uri_query,
uri_query_len);
}
response_buffer.buffer = (*ctx.response_state)->buffer;
response_buffer.buffer_size = OC_MIN_APP_DATA_SIZE;
if (uri_query_len > 0) {
oc_new_string(&(*ctx.response_state)->uri_query, uri_query,
uri_query_len);
}
response_buffer.buffer = (*ctx.response_state)->buffer;
response_buffer.buffer_size = OC_MIN_APP_DATA_SIZE;
}
}
#else /* OC_BLOCK_WISE */
Expand Down Expand Up @@ -1510,14 +1529,13 @@ oc_ri_invoke_coap_entity_handler(const coap_packet_t *request,
int32_t observe = 2;
if (success && response_buffer.code < oc_status_code(OC_STATUS_BAD_REQUEST)) {
#ifdef OC_BLOCK_WISE
observe = ri_handle_observation(request, response, cur_resource,
resource_is_collection, ctx.block2_size,
endpoint, iface_query);
uint16_t block2_size = ctx.block2_size;
#else /* !OC_BLOCK_WISE */
observe =
ri_handle_observation(request, response, cur_resource,
resource_is_collection, 0, endpoint, iface_query);
uint16_t block2_size = 0;
#endif /* OC_BLOCK_WISE */
observe = ri_handle_observation(request, response, cur_resource,
resource_is_collection, block2_size,
endpoint, iface_query);
}
#endif /* OC_SERVER */

Expand Down
1 change: 1 addition & 0 deletions api/oc_ri_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ typedef struct oc_ri_invoke_coap_entity_handler_ctx_t
#endif /* OC_BLOCK_WISE */
} oc_ri_invoke_coap_entity_handler_ctx_t;

/** @brief Handle a coap request. */
bool oc_ri_invoke_coap_entity_handler(
const coap_packet_t *request, coap_packet_t *response,
oc_endpoint_t *endpoint, oc_ri_invoke_coap_entity_handler_ctx_t ctx)
Expand Down
23 changes: 13 additions & 10 deletions api/unittest/eventcallbacktest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ class TestObserveCallbackWithServer : public testing::Test {
#if defined(OC_SERVER) && defined(OC_COLLECTIONS)
static oc_collection_t *CreateSwitchesCollection(const std::string &uri)
{
oc_collection_t *col = reinterpret_cast<oc_collection_t *>(
auto *col = reinterpret_cast<oc_collection_t *>(
oc_new_collection(nullptr, uri.c_str(), 1, 0));
oc_resource_bind_resource_type(&col->res, "oic.wk.col");
oc_collection_add_supported_rt(&col->res, "oic.r.switch.binary");
Expand All @@ -436,8 +436,6 @@ class TestObserveCallbackWithServer : public testing::Test {
oc_resource_set_access_in_RFOTM(&col->res, true, OC_PERM_RETRIEVE);
#endif /* OC_HAS_FEATURE_RESOURCE_ACCESS_IN_RFOTM */
#endif /* OC_SECURITY */
// TODO: should oc_add_resource work here?
// EXPECT_TRUE(oc_add_resource(&col->res));
oc_add_collection(&col->res);
return col;
}
Expand Down Expand Up @@ -527,7 +525,6 @@ TEST_F(TestObserveCallbackWithServer, Observe)
};
auto observe = [](oc_client_response_t *cr) {
EXPECT_EQ(OC_STATUS_OK, cr->code);
// EXPECT_EQ(0, cr->observe_option);
oc::TestDevice::Terminate();
OC_DBG("OBSERVE(%d) payload: %s", cr->observe_option,
oc::RepPool::GetJson(cr->payload).data());
Expand All @@ -553,7 +550,8 @@ TEST_F(TestObserveCallbackWithServer, PeriodicObserve)
{
auto interval = 1s;
oc_resource_set_periodic_observable(
oc_core_get_resource_by_index(OCF_P, kDeviceID), interval.count());
oc_core_get_resource_by_index(OCF_P, kDeviceID),
static_cast<uint16_t>(interval.count()));

#ifdef OC_SECURITY
oc_sec_self_own(kDeviceID);
Expand All @@ -580,8 +578,12 @@ TEST_F(TestObserveCallbackWithServer, PeriodicObserve)
ASSERT_NE(nullptr, ep);
observe_data od{};
ASSERT_TRUE(oc_do_observe("/oic/p", ep, nullptr, observe, HIGH_QOS, &od));
oc::TestDevice::PoolEventsMs(std::chrono::milliseconds(interval).count() *
2.5f);
// give enough time to receive do processing and receive the initial
// notification (observe_option == 0)
uint64_t mseconds = std::chrono::milliseconds(700).count();
// and the 2 periodic notifications
mseconds += std::chrono::milliseconds(interval).count() * 2;
oc::TestDevice::PoolEventsMs(mseconds);
EXPECT_LE(3, od.counter);
EXPECT_EQ(1, oc_periodic_observe_callback_count());

Expand Down Expand Up @@ -649,11 +651,12 @@ TEST_F(TestObserveCallbackWithServer, ObserveCollection)
std::string json = oc::RepPool::GetJson(cr->payload).data();
OC_DBG("OBSERVE(%d) payload: %s", cr->observe_option, json.c_str());
// the payload should contain all subresources of the collection
for (oc_link_t *link =
for (auto *link =
static_cast<oc_link_t *>(oc_list_head(switches_.collection->links));
link != nullptr; link = link->next) {
std::string href =
std::string("\"href\":\"") + oc_string(link->resource->uri) + "\"";
std::string href = R"("href":")";
href += oc_string(link->resource->uri);
href += R"(")";
OC_DBG("find link(%s)", href.c_str());
EXPECT_TRUE(json.find(href) != std::string::npos);
}
Expand Down
2 changes: 2 additions & 0 deletions include/oc_core_res.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ bool oc_core_is_SVR(const oc_resource_t *resource, size_t device);
/**
* @brief determine if a resource is a vertical resource
*
* @note vertical resources are mostly custom resources specific to a device
*
* @param resource the resource
* @param device the device index to which the resource belongs too
* @return true : is vertical resource
Expand Down
29 changes: 15 additions & 14 deletions messaging/coap/observe.c
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,10 @@ send_notification(coap_observer_t *obs, oc_response_t *response,

OC_DBG("send_notification: creating separate response for "
"notification");
uint16_t block2_size = 0;
#ifdef OC_BLOCK_WISE
block2_size = obs->block2_size;
uint16_t block2_size = obs->block2_size;
#else /* !OC_BLOCK_WISE */
uint16_t block2_size = 0;
#endif /* OC_BLOCK_WISE */
if (coap_separate_accept(req, response->separate_response, &obs->endpoint,
obs->obs_counter, block2_size) == 1) {
Expand Down Expand Up @@ -1311,18 +1312,18 @@ coap_observe_handler(const coap_packet_t *request,
uint16_t block2_size, const oc_endpoint_t *endpoint,
oc_interface_mask_t iface_mask)
{
if (request->code == COAP_GET && response->code < 128) {
if (IS_OPTION(request, COAP_OPTION_OBSERVE)) {
if (request->observe == 0) {
return add_observer(resource, block2_size, endpoint, request->token,
request->token_len, request->uri_path,
request->uri_path_len, iface_mask);
}
if (request->observe == 1) {
return coap_remove_observer_by_token(endpoint, request->token,
request->token_len);
}
}
if (request->code != COAP_GET || response->code >= 128 ||
!IS_OPTION(request, COAP_OPTION_OBSERVE)) {
return -1;
}
if (request->observe == 0) {
return add_observer(resource, block2_size, endpoint, request->token,
request->token_len, request->uri_path,
request->uri_path_len, iface_mask);
}
if (request->observe == 1) {
return coap_remove_observer_by_token(endpoint, request->token,
request->token_len);
}
return -1;
}
Expand Down