Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: re-enable MultiDestSendInStopPeriod test case #57

Merged
merged 2 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,3 @@ ten_extension_context_get_extension_info_by_name(
TEN_RUNTIME_PRIVATE_API bool ten_extension_context_start_extension_group(
ten_extension_context_t *self, ten_shared_ptr_t *requester,
ten_error_t *err);

TEN_RUNTIME_PRIVATE_API ten_extension_group_t *
ten_extension_context_find_extension_group_by_name(
ten_extension_context_t *self, ten_string_t *name);
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,7 @@ ten_extension_thread_get_attached_runloop(ten_extension_thread_t *self);

TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_process_acquire_lock_mode_task(void *self_, void *arg);

TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_stop_life_cycle_of_all_extensions(
ten_extension_thread_t *self);
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_on_extension_group_on_init_done(void *self_, void *arg);

TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_start_life_cycle_of_all_extensions(void *self_, void *arg);
ten_extension_thread_start_life_cycle_of_all_extensions_task(void *self_,
void *arg);

TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_stop_life_cycle_of_all_extensions_task(void *self,
void *arg);

TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_on_extension_group_on_deinit_done(void *self_, void *arg);
Expand Down
6 changes: 3 additions & 3 deletions core/include_internal/ten_runtime/msg/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ TEN_RUNTIME_PRIVATE_API void ten_msg_clear_and_set_dest_to_extension(
TEN_RUNTIME_PRIVATE_API void ten_msg_correct_dest(ten_shared_ptr_t *msg,
ten_engine_t *engine);

inline bool ten_raw_msg_is_cmd_base(ten_msg_t *self) {
inline bool ten_raw_msg_is_cmd_and_result(ten_msg_t *self) {
TEN_ASSERT(self && ten_raw_msg_check_integrity(self), "Should not happen.");

switch (self->type) {
Expand Down Expand Up @@ -342,9 +342,9 @@ inline ten_msg_t *ten_msg_get_raw_msg(ten_shared_ptr_t *self) {
return (ten_msg_t *)ten_shared_ptr_get_data(self);
}

inline bool ten_msg_is_cmd_base(ten_shared_ptr_t *self) {
inline bool ten_msg_is_cmd_and_result(ten_shared_ptr_t *self) {
TEN_ASSERT(self && ten_msg_check_integrity(self), "Should not happen.");
return ten_raw_msg_is_cmd_base(ten_msg_get_raw_msg(self));
return ten_raw_msg_is_cmd_and_result(ten_msg_get_raw_msg(self));
}

inline bool ten_msg_is_cmd(ten_shared_ptr_t *self) {
Expand Down
1 change: 0 additions & 1 deletion core/include_internal/ten_runtime/test/extension_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

#include "ten_runtime/ten_env_proxy/ten_env_proxy.h"

// =-=-= 改名
typedef struct ten_extension_test_t {
ten_thread_t *test_app_thread;
ten_string_t test_extension_addon_name;
Expand Down
2 changes: 1 addition & 1 deletion core/src/ten_runtime/app/msg_interface/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ static void ten_app_handle_in_msgs_async(ten_app_t *self) {
void ten_app_push_to_in_msgs_queue(ten_app_t *self, ten_shared_ptr_t *msg) {
TEN_ASSERT(self && ten_app_check_integrity(self, false),
"Should not happen.");
TEN_ASSERT(msg && ten_msg_is_cmd_base(msg), "Invalid argument.");
TEN_ASSERT(msg && ten_msg_is_cmd_and_result(msg), "Invalid argument.");
TEN_ASSERT(!ten_cmd_base_cmd_id_is_empty(msg), "Invalid argument.");
TEN_ASSERT(
ten_msg_get_src_app_uri(msg) && strlen(ten_msg_get_src_app_uri(msg)),
Expand Down
2 changes: 1 addition & 1 deletion core/src/ten_runtime/connection/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ void ten_connection_on_msgs(ten_connection_t *self, ten_list_t *msgs) {
ten_list_foreach (msgs, iter) {
ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node);

if (ten_msg_is_cmd_base(msg)) {
if (ten_msg_is_cmd_and_result(msg)) {
// For a command message, remember which connection this command is coming
// from.
ten_cmd_base_set_original_connection(msg, self);
Expand Down
22 changes: 12 additions & 10 deletions core/src/ten_runtime/engine/internal/extension_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,41 +88,43 @@ static void ten_engine_on_extension_msgs(ten_engine_t *self) {

ten_list_foreach (&extension_msgs_, iter) {
ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node);
TEN_ASSERT(msg && ten_msg_get_dest_cnt(msg) == 1,
"When this function is executed, there should be only one "
"destination remaining in the message's dest.");

if (ten_engine_is_closing(self) &&
!ten_msg_type_to_handle_when_closing(msg)) {
// Except some special messages, do not handle the message if the engine
// is closing.

continue;
}

ten_loc_t *dest_loc = ten_msg_get_first_dest_loc(msg);
TEN_ASSERT(dest_loc && ten_loc_check_integrity(dest_loc) &&
ten_msg_get_dest_cnt(msg) == 1,
TEN_ASSERT(dest_loc && ten_loc_check_integrity(dest_loc),
"Should not happen.");

if (!ten_string_is_equal(&dest_loc->app_uri, ten_app_get_uri(self->app))) {
TEN_ASSERT(!ten_string_is_empty(&dest_loc->app_uri),
"Should not happen.");

// Because the engine could add/remove remotes at runtime, so extension
// system would deliver those messages with remote destination to the
// engine. Therefore, we need to determine if this is the case, and route
// those messages to the specified remote.
// Since the engine dynamically adds/removes remotes, when the extension
// system needs to deliver a message to a remote, it requests the engine
// to handle it in order to avoid race conditions. Therefore, this is
// where we check if such a situation is occurring.

ten_engine_route_msg_to_remote(self, msg);
} else {
// Otherwise, enable the engine to handle the message.

ten_engine_handle_msg(self, msg);
ten_engine_dispatch_msg(self, msg);
}
}

ten_list_clear(&extension_msgs_);
}

static void ten_engine_on_extension_msgs_(void *engine_, TEN_UNUSED void *arg) {
static void ten_engine_on_extension_msgs_task(void *engine_,
TEN_UNUSED void *arg) {
ten_engine_t *engine = (ten_engine_t *)engine_;
TEN_ASSERT(engine && ten_engine_check_integrity(engine, true),
"Should not happen.");
Expand All @@ -139,7 +141,7 @@ static void ten_engine_on_extension_msgs_async(ten_engine_t *self) {
"Should not happen.");

ten_runloop_post_task_tail(ten_engine_get_attached_runloop(self),
ten_engine_on_extension_msgs_, self, NULL);
ten_engine_on_extension_msgs_task, self, NULL);
}

void ten_engine_push_to_extension_msgs_queue(ten_engine_t *self,
Expand Down
14 changes: 8 additions & 6 deletions core/src/ten_runtime/engine/msg_interface/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ static void ten_engine_handle_in_msgs_sync(ten_engine_t *self) {
TEN_ASSERT(!ten_msg_src_is_empty(msg),
"The message source should have been set.");

if (ten_msg_is_cmd_base(msg)) {
if (ten_msg_is_cmd_and_result(msg)) {
ten_connection_t *connection = ten_cmd_base_get_original_connection(msg);
if (connection) {
// If 'connection' is non-NULL, it means the command is from externally
Expand Down Expand Up @@ -187,7 +187,7 @@ void ten_engine_append_to_in_msgs_queue(ten_engine_t *self,
TEN_ASSERT(ten_engine_check_integrity(self, false),
"Invalid use of engine %p.", self);

TEN_ASSERT(cmd && ten_msg_is_cmd_base(cmd), "Should not happen.");
TEN_ASSERT(cmd && ten_msg_is_cmd_and_result(cmd), "Should not happen.");

ten_mutex_lock(self->in_msgs_lock);
ten_list_push_smart_ptr_back(&self->in_msgs, cmd);
Expand All @@ -208,7 +208,7 @@ void ten_engine_handle_msg(ten_engine_t *self, ten_shared_ptr_t *msg) {
return;
}

if (ten_msg_is_cmd_base(msg)) {
if (ten_msg_is_cmd_and_result(msg)) {
// Because the command ID is a critical information which is necessary for
// the correct handling of all command-type messages, we need to assign a
// command ID to messages which don't have one.
Expand All @@ -231,15 +231,17 @@ void ten_engine_dispatch_msg(ten_engine_t *self, ten_shared_ptr_t *msg) {
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(msg && ten_msg_check_integrity(msg), "Should not happen.");
TEN_ASSERT(ten_msg_get_dest_cnt(msg) == 1,
"When this function is executed, there should be only one "
"destination remaining in the message's dest.");

if (ten_engine_is_closing(self)) {
// Do not dispatch the message if the engine is closing.
return;
}

ten_loc_t *dest_loc = ten_msg_get_first_dest_loc(msg);
TEN_ASSERT(dest_loc && ten_loc_check_integrity(dest_loc) &&
ten_msg_get_dest_cnt(msg) == 1,
TEN_ASSERT(dest_loc && ten_loc_check_integrity(dest_loc),
"Should not happen.");

ten_app_t *app = self->app;
Expand All @@ -259,7 +261,7 @@ void ten_engine_dispatch_msg(ten_engine_t *self, ten_shared_ptr_t *msg) {
"The uri of the app should not be empty.");

// The message is _not_ for the current TEN app, so route the message to the
// correct TEN app.
// correct TEN app through the correct remote.
ten_engine_route_msg_to_remote(self, msg);
} else {
// The destination of the message is the current TEN app.
Expand Down
51 changes: 29 additions & 22 deletions core/src/ten_runtime/engine/on_xxx.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ void ten_engine_find_extension_info_for_all_extensions_of_extension_thread(
"Should not happen.");

TEN_UNUSED ten_extension_thread_t *extension_thread = arg;
TEN_ASSERT(
extension_thread &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: this function does not access this extension_thread,
// we just check if the arg is an ten_extension_thread_t.
ten_extension_thread_check_integrity(extension_thread, false),
"Should not happen.");
TEN_ASSERT(extension_thread &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: this function does not access this
// extension_thread, we just check if the arg is an
// ten_extension_thread_t.
ten_extension_thread_check_integrity(extension_thread, false),
"Should not happen.");

ten_list_foreach (&extension_thread->extensions, iter) {
ten_extension_t *extension = ten_ptr_listnode_get(iter.node);
Expand All @@ -139,12 +139,19 @@ void ten_engine_find_extension_info_for_all_extensions_of_extension_thread(
ten_string_get_raw_str(&extension->name));
}

ten_engine_on_extension_thread_is_ready(self, extension_thread);

ten_runloop_post_task_tail(
ten_extension_thread_get_attached_runloop(extension_thread),
ten_extension_thread_start_life_cycle_of_all_extensions, extension_thread,
NULL);
if (extension_thread->is_close_triggered) {
ten_runloop_post_task_tail(
ten_extension_thread_get_attached_runloop(extension_thread),
ten_extension_thread_stop_life_cycle_of_all_extensions_task,
extension_thread, NULL);
} else {
ten_engine_on_extension_thread_is_ready(self, extension_thread);

ten_runloop_post_task_tail(
ten_extension_thread_get_attached_runloop(extension_thread),
ten_extension_thread_start_life_cycle_of_all_extensions_task,
extension_thread, NULL);
}
}

void ten_engine_on_extension_thread_closed(void *self_, void *arg) {
Expand All @@ -153,13 +160,13 @@ void ten_engine_on_extension_thread_closed(void *self_, void *arg) {
"Should not happen.");

ten_extension_thread_t *extension_thread = arg;
TEN_ASSERT(
extension_thread &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: this function does not access this extension_thread,
// we just check if the arg is an ten_extension_thread_t.
ten_extension_thread_check_integrity(extension_thread, false),
"Should not happen.");
TEN_ASSERT(extension_thread &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: this function does not access this
// extension_thread, we just check if the arg is an
// ten_extension_thread_t.
ten_extension_thread_check_integrity(extension_thread, false),
"Should not happen.");

TEN_LOGD("[%s] Waiting for extension thread (%p) be reclaimed.",
ten_engine_get_name(self), extension_thread);
Expand Down Expand Up @@ -202,8 +209,8 @@ void ten_engine_on_addon_create_extension_group_done(void *self_, void *arg) {
ten_extension_group_t *extension_group = info->extension_group;
TEN_ASSERT(extension_group &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: The extension thread has not been created yet,
// so it is thread safe
// thread-check: The extension thread has not been created
// yet, so it is thread safe
ten_extension_group_check_integrity(extension_group, false),
"Should not happen.");

Expand Down
8 changes: 4 additions & 4 deletions core/src/ten_runtime/extension/extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ static ten_list_t *ten_extension_get_msg_dests_from_graph(
TEN_ASSERT(self && ten_extension_check_integrity(self, true) && msg,
"Should not happen.");

if (ten_msg_is_cmd_base(msg)) {
if (ten_msg_is_cmd_and_result(msg)) {
return ten_extension_get_msg_dests_from_graph_internal(
&self->extension_info->msg_dest_info.cmd, msg);
} else {
Expand Down Expand Up @@ -493,7 +493,7 @@ static bool ten_extension_determine_out_msg_dest_from_graph(
"Failed to find destination of a message (%s) from graph.",
msg_name);
} else {
if (ten_msg_is_cmd_base(msg)) {
if (ten_msg_is_cmd_and_result(msg)) {
TEN_LOGE("Failed to find destination of a command (%s) from graph.",
msg_name);
} else {
Expand Down Expand Up @@ -551,7 +551,7 @@ static TEN_EXTENSION_DETERMINE_OUT_MSGS_RESULT ten_extension_determine_out_msgs(
// Need to find the destinations from 2 databases:
// - graph: all messages without the cmd results.
// - IN path table: cmd results only.
if (ten_msg_is_cmd_base(msg)) {
if (ten_msg_is_cmd_and_result(msg)) {
if (ten_msg_get_type(msg) == TEN_MSG_TYPE_CMD_RESULT) {
// Find the destinations of a cmd result from the path table.
if (!in_path) {
Expand Down Expand Up @@ -620,7 +620,7 @@ bool ten_extension_handle_out_msg(ten_extension_t *self, ten_shared_ptr_t *msg,
ten_msg_set_src_to_extension(msg, self);

bool result = true;
const bool msg_is_cmd = ten_msg_is_cmd_base(msg);
const bool msg_is_cmd = ten_msg_is_cmd_and_result(msg);
bool msg_is_cmd_result = false;

if (ten_msg_get_type(msg) == TEN_MSG_TYPE_CMD_RESULT) {
Expand Down
6 changes: 3 additions & 3 deletions core/src/ten_runtime/extension/internal/close.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ void ten_extension_on_timer_closed(ten_timer_t *timer, void *on_closed_data) {
}

void ten_extension_do_pre_close_action(ten_extension_t *self) {
TEN_ASSERT(self && ten_extension_check_integrity(self, true) &&
self->extension_thread,
"Should not happen.");
TEN_ASSERT(self, "Should not happen.");
TEN_ASSERT(ten_extension_check_integrity(self, true), "Should not happen.");
TEN_ASSERT(self->extension_thread, "Should not happen.");

// Close the timers of the path tables.
ten_list_foreach (&self->path_timers, iter) {
Expand Down
11 changes: 4 additions & 7 deletions core/src/ten_runtime/extension/msg_handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) {
// command sent by this extension in any time can be delivered to this
// extension before its on_start().

if (self->state < TEN_EXTENSION_STATE_ON_START &&
!ten_msg_is_cmd_result(msg)) {
bool msg_is_cmd_result = ten_msg_is_cmd_result(msg);

if (self->state < TEN_EXTENSION_STATE_ON_START && !msg_is_cmd_result) {
// The extension is not initialized, and the msg is not a cmd result, so
// cache the msg to the pending list.
ten_list_push_smart_ptr_back(&self->pending_msgs, msg);
Expand All @@ -96,14 +97,10 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) {
goto done;
}

bool msg_is_cmd_result = false;

// Because 'commands' has 'results', TEN will perform some bookkeeping for
// 'commands' before sending it to the extension.

if (ten_msg_get_type(msg) == TEN_MSG_TYPE_CMD_RESULT) {
msg_is_cmd_result = true;

if (msg_is_cmd_result) {
// Set the cmd result to the corresponding OUT path to indicate that
// there has been a cmd result flow through that OUT path.
ten_path_t *out_path =
Expand Down
29 changes: 0 additions & 29 deletions core/src/ten_runtime/extension_context/extension_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -593,32 +593,3 @@ bool ten_extension_context_start_extension_group(
done:
return result;
}

ten_extension_group_t *ten_extension_context_find_extension_group_by_name(
ten_extension_context_t *self, ten_string_t *name) {
TEN_ASSERT(self, "Invalid argument.");
TEN_ASSERT(
ten_extension_context_check_integrity(
self,
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: The graph info is read only, so it's thread-safe.
false),
"Invalid use of extension_context %p.", self);

ten_list_foreach (&self->extension_groups, iter) {
ten_extension_group_t *extension_group = ten_ptr_listnode_get(iter.node);
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: The target-extension-group and the
// current-extension-group are different, so the extension threads
// are different, too. Because the name of the extension group can
// not be changed after they are created, so it's thread-safe.
TEN_ASSERT(extension_group &&
ten_extension_group_check_integrity(extension_group, false),
"Should not happen.");

if (ten_string_is_equal(&extension_group->name, name)) {
return extension_group;
}
}
return NULL;
}
2 changes: 1 addition & 1 deletion core/src/ten_runtime/extension_group/extension_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ void ten_extension_group_set_addon(ten_extension_group_t *self,

ten_shared_ptr_t *ten_extension_group_create_invalid_dest_status(
ten_shared_ptr_t *origin_cmd, ten_string_t *target_group_name) {
TEN_ASSERT(origin_cmd && ten_msg_is_cmd_base(origin_cmd),
TEN_ASSERT(origin_cmd && ten_msg_is_cmd_and_result(origin_cmd),
"Should not happen.");
TEN_ASSERT(target_group_name, "Should not happen.");

Expand Down
Loading
Loading