Skip to content

Commit

Permalink
BF: CS-676 Improve qmaster shutdown performance for scenarios where t…
Browse files Browse the repository at this point in the history
…he master has more that 128 threads
  • Loading branch information
ernst-bablick committed Oct 10, 2024
1 parent a73d075 commit 45bcee1
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 11 deletions.
27 changes: 21 additions & 6 deletions source/daemons/qmaster/sge_thread_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
#include "uti/sge_os.h"
#include "uti/sge_profiling.h"
#include "uti/sge_rmon_macros.h"
#include "uti/sge_time.h"

#include "sgeobj/ocs_DataStore.h"

#include "sge_thread_ctrl.h"

#ifdef OBSERVE
# include "cull/cull_observe.h"
#endif
Expand Down Expand Up @@ -143,7 +146,7 @@ sge_reader_main(void *arg) {

// init monitoring
cl_thread_func_startup(thread_config);
sge_monitor_init(p_monitor, thread_config->thread_name, GDI_EXT, MT_WARNING, MT_ERROR);
sge_monitor_init(p_monitor, thread_config->thread_name, GDI_EXT, RT_WARNING, RT_ERROR);
sge_qmaster_thread_init(QMASTER, READER_THREAD, true);

/* register at profiling module */
Expand All @@ -164,7 +167,8 @@ sge_reader_main(void *arg) {

MONITOR_SET_QLEN(p_monitor, sge_tq_get_task_count(ReaderRequestQueue));

if (packet != nullptr) {
// handle the packet only if it is not nullptr and the shutdown has not started
if (packet != nullptr && !sge_thread_has_shutdown_started()) {
sge_gdi_task_class_t *task;
bool is_only_read_request = true;

Expand Down Expand Up @@ -216,7 +220,7 @@ sge_reader_main(void *arg) {

// handle the request (GDI/Report/Ack ...
if (packet->request_type == PACKET_GDI_REQUEST) {
// sge_usleep(3000000);
//sge_usleep(1000000);

task = packet->first_task;
while (task != nullptr) {
Expand Down Expand Up @@ -298,14 +302,25 @@ sge_reader_main(void *arg) {
thread_output_profiling("reader thread profiling summary:\n", &next_prof_output);

sge_monitor_output(p_monitor);
} else {
int execute = 0;
}

// pass the cancellation point at least once or stay here if shutdown was triggered
bool shutdown_started = false;
do {
// pthread cancellation point
int execute = 0;
pthread_cleanup_push(sge_reader_cleanup_monitor, static_cast<void *>(p_monitor));
cl_thread_func_testcancel(thread_config);
pthread_cleanup_pop(execute); // cleanup monitor
}

// shutdown in process?
shutdown_started = sge_thread_has_shutdown_started();

// if we will wait here than do not eat up all cpu time
if (shutdown_started) {
sge_usleep(25000);
}
} while (shutdown_started);
}

// Don't add cleanup code here. It will never be executed. Instead, register a cleanup function with
Expand Down
25 changes: 20 additions & 5 deletions source/daemons/qmaster/sge_thread_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@
#include "uti/sge_os.h"
#include "uti/sge_profiling.h"
#include "uti/sge_rmon_macros.h"
#include "uti/sge_time.h"

#include "sgeobj/ocs_DataStore.h"

#include "sge_thread_ctrl.h"

#ifdef OBSERVE
# include "cull/cull_observe.h"
#endif
Expand Down Expand Up @@ -193,7 +196,7 @@ sge_worker_main(void *arg) {

// init monitoring
cl_thread_func_startup(thread_config);
sge_monitor_init(p_monitor, thread_config->thread_name, GDI_EXT, MT_WARNING, MT_ERROR);
sge_monitor_init(p_monitor, thread_config->thread_name, GDI_EXT, WT_WARNING, WT_ERROR);
sge_qmaster_thread_init(QMASTER, WORKER_THREAD, true);

/* register at profiling module */
Expand All @@ -214,7 +217,8 @@ sge_worker_main(void *arg) {

MONITOR_SET_QLEN(p_monitor, sge_tq_get_task_count(GlobalRequestQueue));

if (packet != nullptr) {
// handle the packet only if it is not nullptr and the shutdown has not started
if (packet != nullptr && !sge_thread_has_shutdown_started()) {
sge_gdi_task_class_t *task;
bool is_only_read_request = true;

Expand Down Expand Up @@ -346,14 +350,25 @@ sge_worker_main(void *arg) {
thread_output_profiling("worker thread profiling summary:\n", &next_prof_output);

sge_monitor_output(p_monitor);
} else {
int execute = 0;
}

// pass the cancellation point at least once or stay here if shutdown was triggered
bool shutdown_started = false;
do {
// pthread cancellation point
int execute = 0;
pthread_cleanup_push(sge_worker_cleanup_monitor, static_cast<void *>(p_monitor));
cl_thread_func_testcancel(thread_config);
pthread_cleanup_pop(execute); // cleanup monitor
}

// shutdown in process?
shutdown_started = sge_thread_has_shutdown_started();

// if we will wait here than do not eat up all cpu time
if (shutdown_started) {
sge_usleep(25000);
}
} while (shutdown_started);
}

// Don't add cleanup code here. It will never be executed. Instead, register a cleanup function with
Expand Down

0 comments on commit 45bcee1

Please sign in to comment.