From c1d8fbc8212b27055feb63d53ceea6950c5ae823 Mon Sep 17 00:00:00 2001 From: shivammonaka Date: Thu, 11 Jan 2024 18:37:11 +0530 Subject: [PATCH] Introduced callback to Pthread, Win32 and OpenMP backend --- cblas.h | 5 + common_interface.h | 5 + driver/others/CMakeLists.txt | 1 + driver/others/Makefile | 5 +- driver/others/blas_server.c | 267 ++++++++++++++++----------- driver/others/blas_server_callback.c | 12 ++ driver/others/blas_server_omp.c | 30 +-- driver/others/blas_server_win32.c | 226 ++++++++++++++--------- 8 files changed, 342 insertions(+), 209 deletions(-) create mode 100644 driver/others/blas_server_callback.c diff --git a/cblas.h b/cblas.h index beaa32cc2b..ab8b7dcde4 100644 --- a/cblas.h +++ b/cblas.h @@ -26,6 +26,11 @@ char* openblas_get_config(void); /*Get the CPU corename on runtime.*/ char* openblas_get_corename(void); +/*Set the threading backend to a custom callback.*/ +typedef void (*openblas_dojob_callback)(int thread_num, void *jobdata, int dojob_data); +typedef void (*openblas_threads_callback)(int sync, openblas_dojob_callback dojob, int numjobs, size_t jobdata_elsize, void *jobdata, int dojob_data); +void openblas_set_threads_callback_function(openblas_threads_callback callback); + #ifdef OPENBLAS_OS_LINUX /* Sets thread affinity for OpenBLAS threads. `thread_idx` is in [0, openblas_get_num_threads()-1]. */ int openblas_setaffinity(int thread_idx, size_t cpusetsize, cpu_set_t* cpu_set); diff --git a/common_interface.h b/common_interface.h index 5a2e1654c9..efd3c6649d 100644 --- a/common_interface.h +++ b/common_interface.h @@ -47,6 +47,11 @@ int BLASFUNC(xerbla)(char *, blasint *info, blasint); void openblas_set_num_threads_(int *); +/*Set the threading backend to a custom callback.*/ +typedef void (*openblas_dojob_callback)(int thread_num, void *jobdata, int dojob_data); +typedef void (*openblas_threads_callback)(int sync, openblas_dojob_callback dojob, int numjobs, size_t jobdata_elsize, void *jobdata, int dojob_data); +extern openblas_threads_callback openblas_threads_callback_; + FLOATRET BLASFUNC(sdot) (blasint *, float *, blasint *, float *, blasint *); FLOATRET BLASFUNC(sdsdot)(blasint *, float *, float *, blasint *, float *, blasint *); diff --git a/driver/others/CMakeLists.txt b/driver/others/CMakeLists.txt index 1a38740a32..192bf2d8fc 100644 --- a/driver/others/CMakeLists.txt +++ b/driver/others/CMakeLists.txt @@ -25,6 +25,7 @@ if (USE_THREAD) ${BLAS_SERVER} divtable.c # TODO: Makefile has -UDOUBLE blas_l1_thread.c + blas_server_callback.c ) if (NOT NO_AFFINITY) diff --git a/driver/others/Makefile b/driver/others/Makefile index e4e9ee108f..ff7e3e96db 100644 --- a/driver/others/Makefile +++ b/driver/others/Makefile @@ -6,7 +6,7 @@ COMMONOBJS = memory.$(SUFFIX) xerbla.$(SUFFIX) c_abs.$(SUFFIX) z_abs.$(SUFFIX) #COMMONOBJS += slamch.$(SUFFIX) slamc3.$(SUFFIX) dlamch.$(SUFFIX) dlamc3.$(SUFFIX) ifdef SMP -COMMONOBJS += blas_server.$(SUFFIX) divtable.$(SUFFIX) blasL1thread.$(SUFFIX) +COMMONOBJS += blas_server.$(SUFFIX) divtable.$(SUFFIX) blasL1thread.$(SUFFIX) blas_server_callback.$(SUFFIX) ifneq ($(NO_AFFINITY), 1) COMMONOBJS += init.$(SUFFIX) endif @@ -140,6 +140,9 @@ memory.$(SUFFIX) : $(MEMORY) ../../common.h ../../param.h blas_server.$(SUFFIX) : $(BLAS_SERVER) ../../common.h ../../common_thread.h ../../param.h $(CC) $(CFLAGS) -c $< -o $(@F) +blas_server_callback.$(SUFFIX) : blas_server_callback.c ../../common.h + $(CC) $(CFLAGS) -c $< -o $(@F) + openblas_set_num_threads.$(SUFFIX) : openblas_set_num_threads.c $(CC) $(CFLAGS) -c $< -o $(@F) diff --git a/driver/others/blas_server.c b/driver/others/blas_server.c index 2531c57e9a..0a9ded6c1c 100644 --- a/driver/others/blas_server.c +++ b/driver/others/blas_server.c @@ -115,6 +115,8 @@ int blas_server_avail __attribute__((aligned(ATTRIBUTE_SIZE))) = 0; int blas_omp_threads_local = 1; +static void * blas_thread_buffer[MAX_CPU_NUMBER]; + /* Local Variables */ #if defined(USE_PTHREAD_LOCK) static pthread_mutex_t server_lock = PTHREAD_MUTEX_INITIALIZER; @@ -190,6 +192,10 @@ static int main_status[MAX_CPU_NUMBER]; BLASLONG exit_time[MAX_CPU_NUMBER]; #endif +//Prototypes +static void exec_threads(int , blas_queue_t *, int); +static void adjust_thread_buffers(); + static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){ if (!(mode & BLAS_COMPLEX)){ @@ -375,7 +381,6 @@ static void* blas_thread_server(void *arg){ /* Thread identifier */ BLASLONG cpu = (BLASLONG)arg; unsigned int last_tick; - void *buffer, *sa, *sb; blas_queue_t *queue; blas_queue_t *tscq; @@ -395,8 +400,6 @@ blas_queue_t *tscq; main_status[cpu] = MAIN_ENTER; #endif - buffer = blas_memory_alloc(2); - #ifdef SMP_DEBUG fprintf(STDERR, "Server[%2ld] Thread has just been spawned!\n", cpu); #endif @@ -456,109 +459,7 @@ blas_queue_t *tscq; start = rpcc(); #endif - if (queue) { - int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = (int (*)(blas_arg_t *, void *, void *, void *, void *, BLASLONG))queue -> routine; - - atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)1); - - sa = queue -> sa; - sb = queue -> sb; - -#ifdef SMP_DEBUG - if (queue -> args) { - fprintf(STDERR, "Server[%2ld] Calculation started. Mode = 0x%03x M = %3ld N=%3ld K=%3ld\n", - cpu, queue->mode, queue-> args ->m, queue->args->n, queue->args->k); - } -#endif - -#ifdef CONSISTENT_FPCSR -#ifdef __aarch64__ - __asm__ __volatile__ ("msr fpcr, %0" : : "r" (queue -> sse_mode)); -#else - __asm__ __volatile__ ("ldmxcsr %0" : : "m" (queue -> sse_mode)); - __asm__ __volatile__ ("fldcw %0" : : "m" (queue -> x87_mode)); -#endif -#endif - -#ifdef MONITOR - main_status[cpu] = MAIN_RUNNING1; -#endif - - if (sa == NULL) sa = (void *)((BLASLONG)buffer + GEMM_OFFSET_A); - - if (sb == NULL) { - if (!(queue -> mode & BLAS_COMPLEX)){ -#ifdef EXPRECISION - if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){ - sb = (void *)(((BLASLONG)sa + ((QGEMM_P * QGEMM_Q * sizeof(xdouble) - + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); - } else -#endif - if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE) { -#ifdef BUILD_DOUBLE - sb = (void *)(((BLASLONG)sa + ((DGEMM_P * DGEMM_Q * sizeof(double) - + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); -#endif - } else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) { -#ifdef BUILD_SINGLE - sb = (void *)(((BLASLONG)sa + ((SGEMM_P * SGEMM_Q * sizeof(float) - + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); -#endif - } else { - /* Other types in future */ - } - } else { -#ifdef EXPRECISION - if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){ - sb = (void *)(((BLASLONG)sa + ((XGEMM_P * XGEMM_Q * 2 * sizeof(xdouble) - + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); - } else -#endif - if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE){ -#ifdef BUILD_COMPLEX16 - sb = (void *)(((BLASLONG)sa + ((ZGEMM_P * ZGEMM_Q * 2 * sizeof(double) - + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); -#endif - } else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) { -#ifdef BUILD_COMPLEX - sb = (void *)(((BLASLONG)sa + ((CGEMM_P * CGEMM_Q * 2 * sizeof(float) - + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); -#endif - } else { - /* Other types in future */ - } - } - queue->sb=sb; - } - -#ifdef MONITOR - main_status[cpu] = MAIN_RUNNING2; -#endif - - if (queue -> mode & BLAS_LEGACY) { - legacy_exec(routine, queue -> mode, queue -> args, sb); - } else - if (queue -> mode & BLAS_PTHREAD) { - void (*pthreadcompat)(void *) = (void(*)(void*))queue -> routine; - (pthreadcompat)(queue -> args); - } else - (routine)(queue -> args, queue -> range_m, queue -> range_n, sa, sb, queue -> position); - -#ifdef SMP_DEBUG - fprintf(STDERR, "Server[%2ld] Calculation finished!\n", cpu); -#endif - -#ifdef MONITOR - main_status[cpu] = MAIN_FINISH; -#endif - - // arm: make sure all results are written out _before_ - // thread is marked as done and other threads use them - MB; - atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)0); - - - } + exec_threads(cpu, queue, 0); #ifdef MONITOR main_status[cpu] = MAIN_DONE; @@ -580,8 +481,6 @@ blas_queue_t *tscq; fprintf(STDERR, "Server[%2ld] Shutdown!\n", cpu); #endif - blas_memory_free(buffer); - //pthread_exit(NULL); return NULL; @@ -663,6 +562,9 @@ int blas_thread_init(void){ LOCK_COMMAND(&server_lock); + // Adjust thread buffers + adjust_thread_buffers(); + if (!blas_server_avail){ thread_timeout_env=openblas_thread_timeout(); @@ -893,6 +795,18 @@ int exec_blas(BLASLONG num, blas_queue_t *queue){ fprintf(STDERR, "Exec_blas is called. Number of executing threads : %ld\n", num); #endif +//Redirect to caller's callback routine +if (openblas_threads_callback_) { + int buf_index = 0, i = 0; +#ifndef USE_SIMPLE_THREADED_LEVEL3 + for (i = 0; i < num; i ++) + queue[i].position = i; +#endif + openblas_threads_callback_(1, (openblas_dojob_callback) exec_threads, num, sizeof(blas_queue_t), (void*) queue, buf_index); + return 0; + } + + #ifdef __ELF__ if (omp_in_parallel && (num > 1)) { if (omp_in_parallel() > 0) { @@ -1066,6 +980,14 @@ int BLASFUNC(blas_thread_shutdown)(void){ LOCK_COMMAND(&server_lock); + //Free buffers allocated for threads + for(i=0; i routine; + + atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)1); + + void *buffer = blas_thread_buffer[cpu]; + void *sa = queue -> sa; + void *sb = queue -> sb; + + #ifdef SMP_DEBUG + if (queue -> args) { + fprintf(STDERR, "Server[%2ld] Calculation started. Mode = 0x%03x M = %3ld N=%3ld K=%3ld\n", + cpu, queue->mode, queue-> args ->m, queue->args->n, queue->args->k); + } + #endif + + #ifdef CONSISTENT_FPCSR + #ifdef __aarch64__ + __asm__ __volatile__ ("msr fpcr, %0" : : "r" (queue -> sse_mode)); + #else + __asm__ __volatile__ ("ldmxcsr %0" : : "m" (queue -> sse_mode)); + __asm__ __volatile__ ("fldcw %0" : : "m" (queue -> x87_mode)); + #endif + #endif + + #ifdef MONITOR + main_status[cpu] = MAIN_RUNNING1; + #endif + + if (sa == NULL) sa = (void *)((BLASLONG)buffer + GEMM_OFFSET_A); + + if (sb == NULL) { + if (!(queue -> mode & BLAS_COMPLEX)){ + #ifdef EXPRECISION + if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){ + sb = (void *)(((BLASLONG)sa + ((QGEMM_P * QGEMM_Q * sizeof(xdouble) + + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); + } else + #endif + if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE) { + #ifdef BUILD_DOUBLE + sb = (void *)(((BLASLONG)sa + ((DGEMM_P * DGEMM_Q * sizeof(double) + + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); + #endif + } else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) { + #ifdef BUILD_SINGLE + sb = (void *)(((BLASLONG)sa + ((SGEMM_P * SGEMM_Q * sizeof(float) + + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); + #endif + } else { + /* Other types in future */ + } + } else { + #ifdef EXPRECISION + if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){ + sb = (void *)(((BLASLONG)sa + ((XGEMM_P * XGEMM_Q * 2 * sizeof(xdouble) + + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); + } else + #endif + if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE){ + #ifdef BUILD_COMPLEX16 + sb = (void *)(((BLASLONG)sa + ((ZGEMM_P * ZGEMM_Q * 2 * sizeof(double) + + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); + #endif + } else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) { + #ifdef BUILD_COMPLEX + sb = (void *)(((BLASLONG)sa + ((CGEMM_P * CGEMM_Q * 2 * sizeof(float) + + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); + #endif + } else { + /* Other types in future */ + } + } + queue->sb=sb; + } + + #ifdef MONITOR + main_status[cpu] = MAIN_RUNNING2; + #endif + + if (queue -> mode & BLAS_LEGACY) { + legacy_exec(routine, queue -> mode, queue -> args, sb); + } else + if (queue -> mode & BLAS_PTHREAD) { + void (*pthreadcompat)(void *) = (void(*)(void*))queue -> routine; + (pthreadcompat)(queue -> args); + } else + (routine)(queue -> args, queue -> range_m, queue -> range_n, sa, sb, queue -> position); + + #ifdef SMP_DEBUG + fprintf(STDERR, "Server[%2ld] Calculation finished!\n", cpu); + #endif + + #ifdef MONITOR + main_status[cpu] = MAIN_FINISH; + #endif + + // arm: make sure all results are written out _before_ + // thread is marked as done and other threads use them + MB; + atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)0); + + + } + +} +#endif \ No newline at end of file diff --git a/driver/others/blas_server_callback.c b/driver/others/blas_server_callback.c new file mode 100644 index 0000000000..48cf3541a8 --- /dev/null +++ b/driver/others/blas_server_callback.c @@ -0,0 +1,12 @@ +#include "common.h" + +/* global variable to change threading backend from openblas-managed to caller-managed */ +openblas_threads_callback openblas_threads_callback_ = 0; + +/* non-threadsafe function should be called before any other + openblas function to change how threads are managed */ + +void openblas_set_threads_callback_function(openblas_threads_callback callback) +{ + openblas_threads_callback_ = callback; +} \ No newline at end of file diff --git a/driver/others/blas_server_omp.c b/driver/others/blas_server_omp.c index 6f2ea8623d..2d06c1c705 100644 --- a/driver/others/blas_server_omp.c +++ b/driver/others/blas_server_omp.c @@ -285,7 +285,7 @@ static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){ } } -static void exec_threads(blas_queue_t *queue, int buf_index){ +static void exec_threads(int thread_num, blas_queue_t *queue, int buf_index){ void *buffer, *sa, *sb; int pos=0, release_flag=0; @@ -305,7 +305,7 @@ static void exec_threads(blas_queue_t *queue, int buf_index){ if ((sa == NULL) && (sb == NULL) && ((queue -> mode & BLAS_PTHREAD) == 0)) { - pos = omp_get_thread_num(); + pos= thread_num; buffer = blas_thread_buffer[buf_index][pos]; //fallback @@ -420,18 +420,25 @@ while (true) { break; } } - if (i != MAX_PARALLEL_NUMBER) - break; -} -if (openblas_omp_adaptive_env() != 0) { -#pragma omp parallel for num_threads(num) schedule(OMP_SCHED) - for (i = 0; i < num; i ++) { + if(i != MAX_PARALLEL_NUMBER) + break; + } + /*For caller-managed threading, if caller has registered the callback, pass exec_thread as callback function*/ + if (openblas_threads_callback_) { +#ifndef USE_SIMPLE_THREADED_LEVEL3 + for (i = 0; i < num; i ++) + queue[i].position = i; +#endif + openblas_threads_callback_(1, (openblas_dojob_callback) exec_threads, num, sizeof(blas_queue_t), (void*) queue, buf_index); + } else { + if (openblas_omp_adaptive_env() != 0) { + #pragma omp parallel for num_threads(num) schedule(OMP_SCHED) + for (i = 0; i < num; i ++) { #ifndef USE_SIMPLE_THREADED_LEVEL3 queue[i].position = i; #endif - - exec_threads(&queue[i], buf_index); + exec_threads(omp_get_thread_num(), &queue[i], buf_index); } } else { #pragma omp parallel for schedule(OMP_SCHED) @@ -441,9 +448,10 @@ if (openblas_omp_adaptive_env() != 0) { queue[i].position = i; #endif - exec_threads(&queue[i], buf_index); + exec_threads(omp_get_thread_num(), &queue[i], buf_index); } } +} #ifdef HAVE_C11 atomic_store(&blas_buffer_inuse[buf_index], false); diff --git a/driver/others/blas_server_win32.c b/driver/others/blas_server_win32.c index 2ad8b8c5fd..5710b904bc 100644 --- a/driver/others/blas_server_win32.c +++ b/driver/others/blas_server_win32.c @@ -1,3 +1,4 @@ + /*********************************************************************/ /* Copyright 2009, 2010 The University of Texas at Austin. */ /* All rights reserved. */ @@ -67,6 +68,8 @@ int blas_server_avail = 0; int blas_omp_threads_local = 1; +static void * blas_thread_buffer[MAX_CPU_NUMBER]; + /* Local Variables */ static BLASULONG server_lock = 0; @@ -74,6 +77,10 @@ static HANDLE blas_threads [MAX_CPU_NUMBER]; static DWORD blas_threads_id[MAX_CPU_NUMBER]; static volatile int thread_target; // target num of live threads, volatile for cross-thread reads +//Prototypes +static void exec_threads(int , blas_queue_t *, int); +static void adjust_thread_buffers(); + // // Legacy code path // @@ -207,13 +214,9 @@ static DWORD WINAPI blas_thread_server(void *arg) { /* Thread identifier */ BLASLONG cpu = (BLASLONG)arg; - - void *buffer, *sa, *sb; + blas_queue_t *queue; - /* Each server needs each buffer */ - buffer = blas_memory_alloc(2); - MT_TRACE("Server[%2ld] Thread is started!\n", cpu); while (1) { @@ -240,87 +243,8 @@ static DWORD WINAPI blas_thread_server(void *arg) { LeaveCriticalSection(&queue_lock); - if (queue) { - int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = queue -> routine; - - sa = queue -> sa; - sb = queue -> sb; - - #ifdef CONSISTENT_FPCSR - __asm__ __volatile__ ("ldmxcsr %0" : : "m" (queue -> sse_mode)); - __asm__ __volatile__ ("fldcw %0" : : "m" (queue -> x87_mode)); - #endif - - MT_TRACE("Server[%2ld] Started. Mode = 0x%03x M = %3ld N=%3ld K=%3ld\n", - cpu, queue->mode, queue-> args ->m, queue->args->n, queue->args->k); - - // fprintf(stderr, "queue start[%ld]!!!\n", cpu); - - #ifdef MONITOR - main_status[cpu] = MAIN_RUNNING1; - #endif - - if (sa == NULL) - sa = (void *)((BLASLONG)buffer + GEMM_OFFSET_A); - - if (sb == NULL) { - if (!(queue -> mode & BLAS_COMPLEX)) { -#ifdef EXPRECISION - if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE) { - sb = (void *)(((BLASLONG)sa + ((XGEMM_P * XGEMM_Q * sizeof(xdouble) - + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); - } else -#endif - if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE) { -#ifdef BUILD_DOUBLE - sb = (void *)(((BLASLONG)sa + ((DGEMM_P * DGEMM_Q * sizeof(double) - + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); -#endif - } else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) { -#ifdef BUILD_SINGLE - sb = (void *)(((BLASLONG)sa + ((SGEMM_P * SGEMM_Q * sizeof(float) - + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); -#endif - } else { - /* Other types in future */ - } - } else { -#ifdef EXPRECISION - if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){ - sb = (void *)(((BLASLONG)sa + ((XGEMM_P * XGEMM_Q * 2 * sizeof(xdouble) - + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); - } else -#endif - if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE){ -#ifdef BUILD_COMPLEX16 - sb = (void *)(((BLASLONG)sa + ((ZGEMM_P * ZGEMM_Q * 2 * sizeof(double) - + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); -#endif - } else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) { -#ifdef BUILD_COMPLEX - sb = (void *)(((BLASLONG)sa + ((CGEMM_P * CGEMM_Q * 2 * sizeof(float) - + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); -#endif - } else { - /* Other types in future */ - } - } - queue->sb=sb; - } - - #ifdef MONITOR - main_status[cpu] = MAIN_RUNNING2; - #endif - - if (!(queue -> mode & BLAS_LEGACY)) { - (routine)(queue -> args, queue -> range_m, queue -> range_n, sa, sb, queue -> position); - } else { - legacy_exec(routine, queue -> mode, queue -> args, sb); - } - } else { - continue; //if queue == NULL - } - + exec_threads(cpu, queue, 0); + MT_TRACE("Server[%2ld] Finished!\n", cpu); queue->finished = 1; @@ -330,8 +254,6 @@ static DWORD WINAPI blas_thread_server(void *arg) { MT_TRACE("Server[%2ld] Shutdown!\n", cpu); - blas_memory_free(buffer); - return 0; } @@ -345,6 +267,8 @@ int blas_thread_init(void) { LOCK_COMMAND(&server_lock); + adjust_thread_buffers(); + MT_TRACE("Initializing Thread(Num. threads = %d)\n", blas_cpu_number); if (!blas_server_avail) { @@ -473,6 +397,17 @@ int exec_blas(BLASLONG num, blas_queue_t *queue) { if ((num <= 0) || (queue == NULL)) return 0; + //Redirect to caller's callback routine + if (openblas_threads_callback_) { + int buf_index = 0, i = 0; +#ifndef USE_SIMPLE_THREADED_LEVEL3 + for (i = 0; i < num; i ++) + queue[i].position = i; +#endif + openblas_threads_callback_(1, (openblas_dojob_callback) exec_threads, num, sizeof(blas_queue_t), (void*) queue, buf_index); + return 0; + } + if ((num > 1) && queue -> next) exec_blas_async(1, queue -> next); @@ -507,6 +442,14 @@ int BLASFUNC(blas_thread_shutdown)(void) { LOCK_COMMAND(&server_lock); + //Free buffers allocated for threads + for(i=0; i sa; + sb = queue -> sb; + + int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = queue -> routine; + + #ifdef CONSISTENT_FPCSR + __asm__ __volatile__ ("ldmxcsr %0" : : "m" (queue -> sse_mode)); + __asm__ __volatile__ ("fldcw %0" : : "m" (queue -> x87_mode)); + #endif + + MT_TRACE("Server[%2ld] Started. Mode = 0x%03x M = %3ld N=%3ld K=%3ld\n", + cpu, queue->mode, queue-> args ->m, queue->args->n, queue->args->k); + + // fprintf(stderr, "queue start[%ld]!!!\n", cpu); + + #ifdef MONITOR + main_status[cpu] = MAIN_RUNNING1; + #endif + + if (sa == NULL) + sa = (void *)((BLASLONG)buffer + GEMM_OFFSET_A); + + if (sb == NULL) { + if (!(queue -> mode & BLAS_COMPLEX)) { + #ifdef EXPRECISION + if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE) { + sb = (void *)(((BLASLONG)sa + ((XGEMM_P * XGEMM_Q * sizeof(xdouble) + + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); + } else + #endif + if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE) { + #ifdef BUILD_DOUBLE + sb = (void *)(((BLASLONG)sa + ((DGEMM_P * DGEMM_Q * sizeof(double) + + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); + #endif + } else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) { + #ifdef BUILD_SINGLE + sb = (void *)(((BLASLONG)sa + ((SGEMM_P * SGEMM_Q * sizeof(float) + + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); + #endif + } else { + /* Other types in future */ + } + } else { + #ifdef EXPRECISION + if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){ + sb = (void *)(((BLASLONG)sa + ((XGEMM_P * XGEMM_Q * 2 * sizeof(xdouble) + + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); + } else + #endif + if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE){ + #ifdef BUILD_COMPLEX16 + sb = (void *)(((BLASLONG)sa + ((ZGEMM_P * ZGEMM_Q * 2 * sizeof(double) + + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); + #endif + } else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) { + #ifdef BUILD_COMPLEX + sb = (void *)(((BLASLONG)sa + ((CGEMM_P * CGEMM_Q * 2 * sizeof(float) + + GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B); + #endif + } else { + /* Other types in future */ + } + } + queue->sb=sb; + } + + #ifdef MONITOR + main_status[cpu] = MAIN_RUNNING2; + #endif + + if (!(queue -> mode & BLAS_LEGACY)) { + (routine)(queue -> args, queue -> range_m, queue -> range_n, sa, sb, queue -> position); + } else { + legacy_exec(routine, queue -> mode, queue -> args, sb); + } + + } + +} \ No newline at end of file