- 
                Notifications
    
You must be signed in to change notification settings  - Fork 8k
 
Make FFI callbacks thread safe #12823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 13 commits
16be72b
              1de9669
              56c24d4
              6423d3a
              98eb079
              d762cfe
              29d6550
              ba483e6
              bce091d
              083cc9e
              34353aa
              9164b16
              20dd9b1
              e20a564
              af12a99
              261a1d3
              2f71fa3
              391ca94
              e8b2b5f
              e2fce83
              dc5496f
              37ee7de
              d2b2192
              38f88d0
              7e5697e
              6f6a737
              9d2e24a
              bd953ba
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -227,6 +227,10 @@ static ZEND_COLD void zend_ffi_return_unsupported(zend_ffi_type *type); | |
| static ZEND_COLD void zend_ffi_pass_unsupported(zend_ffi_type *type); | ||
| static ZEND_COLD void zend_ffi_assign_incompatible(zval *arg, zend_ffi_type *type); | ||
| static bool zend_ffi_is_compatible_type(zend_ffi_type *dst_type, zend_ffi_type *src_type); | ||
| static void zend_ffi_wait_cond( | ||
| pthread_mutex_t *mutex, pthread_cond_t *cond, | ||
| zend_atomic_bool *flag, bool wanted_value, bool release | ||
| ); | ||
| 
     | 
||
| #if FFI_CLOSURES | ||
| static void *zend_ffi_create_callback(zend_ffi_type *type, zval *value); | ||
| 
          
            
          
           | 
    @@ -935,9 +939,28 @@ static void zend_ffi_callback_hash_dtor(zval *zv) /* {{{ */ | |
| } | ||
| /* }}} */ | ||
| 
     | 
||
| static void zend_ffi_callback_trampoline(ffi_cif* cif, void* ret, void** args, void* data) /* {{{ */ | ||
| { | ||
| zend_ffi_callback_data *callback_data = (zend_ffi_callback_data*)data; | ||
| static void (*orig_interrupt_function)(zend_execute_data *execute_data); | ||
| 
     | 
||
| static void zend_ffi_dispatch_callback_end(void){ /* {{{ */ | ||
| zend_atomic_bool_store_ex(&FFI_G(callback_in_progress), false); | ||
| bool is_main_thread = FFI_G(callback_tid) == FFI_G(main_tid); | ||
| if(!is_main_thread){ | ||
| // unlock interrupt handler | ||
| pthread_cond_broadcast(&FFI_G(vm_unlock)); | ||
| pthread_mutex_unlock(&FFI_G(vm_request_lock)); | ||
| } | ||
| } | ||
| /* }}} */ | ||
| 
     | 
||
| static void zend_ffi_dispatch_callback(void){ /* {{{ */ | ||
| // this function must always run on the main thread | ||
| //ZEND_ASSERT(pthread_self() == FFI_G(main_tid)); | ||
| 
     | 
||
| if (!zend_atomic_bool_load_ex(&FFI_G(callback_in_progress))) { | ||
| return; | ||
| } | ||
| 
     | 
||
| zend_ffi_callback_data *callback_data = FFI_G(callback_data).data; | ||
| zend_fcall_info fci; | ||
| zend_ffi_type *ret_type; | ||
| zval retval; | ||
| 
        
          
        
         | 
    @@ -951,13 +974,14 @@ static void zend_ffi_callback_trampoline(ffi_cif* cif, void* ret, void** args, v | |
| fci.param_count = callback_data->arg_count; | ||
| fci.named_params = NULL; | ||
| 
     | 
||
| 
     | 
||
| if (callback_data->type->func.args) { | ||
| int n = 0; | ||
| zend_ffi_type *arg_type; | ||
| 
     | 
||
| ZEND_HASH_PACKED_FOREACH_PTR(callback_data->type->func.args, arg_type) { | ||
| arg_type = ZEND_FFI_TYPE(arg_type); | ||
| zend_ffi_cdata_to_zval(NULL, args[n], arg_type, BP_VAR_R, &fci.params[n], (zend_ffi_flags)(arg_type->attr & ZEND_FFI_ATTR_CONST), 0, 0); | ||
| zend_ffi_cdata_to_zval(NULL, FFI_G(callback_data).args[n], arg_type, BP_VAR_R, &fci.params[n], (zend_ffi_flags)(arg_type->attr & ZEND_FFI_ATTR_CONST), 0, 0); | ||
| n++; | ||
| } ZEND_HASH_FOREACH_END(); | ||
| } | ||
| 
        
          
        
         | 
    @@ -977,18 +1001,113 @@ static void zend_ffi_callback_trampoline(ffi_cif* cif, void* ret, void** args, v | |
| free_alloca(fci.params, use_heap); | ||
| 
     | 
||
| if (EG(exception)) { | ||
| // we're about to do a hard exit. unlock all mutexes | ||
| zend_ffi_dispatch_callback_end(); | ||
| pthread_mutex_unlock(&FFI_G(vm_request_lock)); | ||
| zend_error_noreturn(E_ERROR, "Throwing from FFI callbacks is not allowed"); | ||
| } | ||
| 
     | 
||
| ret_type = ZEND_FFI_TYPE(callback_data->type->func.ret_type); | ||
| if (ret_type->kind != ZEND_FFI_TYPE_VOID) { | ||
| zend_ffi_zval_to_cdata(ret, ret_type, &retval); | ||
| zend_ffi_zval_to_cdata(FFI_G(callback_data).ret, ret_type, &retval); | ||
| } | ||
| 
     | 
||
| zval_ptr_dtor(&retval); | ||
| } | ||
| /* }}} */ | ||
| 
     | 
||
| static void zend_ffi_interrupt_function(zend_execute_data *execute_data){ /* {{{ */ | ||
| pthread_mutex_lock(&FFI_G(vm_request_lock)); | ||
| if (!zend_atomic_bool_load_ex(&FFI_G(callback_in_progress))) { | ||
| goto end; | ||
| } | ||
| 
     | 
||
| bool is_main_tid = FFI_G(callback_tid) == FFI_G(main_tid); | ||
| if(!is_main_tid){ | ||
| 
     | 
||
| // notify calling thread and release | ||
| pthread_cond_broadcast(&FFI_G(vm_ack)); | ||
| 
     | 
||
| // release mutex and wait for the unlock signal | ||
| pthread_cond_wait(&FFI_G(vm_unlock), &FFI_G(vm_request_lock)); | ||
| } | ||
| 
     | 
||
| end: | ||
| pthread_mutex_unlock(&FFI_G(vm_request_lock)); | ||
| if (orig_interrupt_function) { | ||
| orig_interrupt_function(execute_data); | ||
| } | ||
| } | ||
| /* }}} */ | ||
| 
     | 
||
| static void zend_ffi_wait_cond( | ||
| pthread_mutex_t *mutex, pthread_cond_t *cond, | ||
| zend_atomic_bool *flag, bool wanted_value, bool release | ||
| ){ /* {{{ */ | ||
| // get lock, first | ||
| pthread_mutex_lock(mutex); | ||
| 
     | 
||
| // if we acquired the lock before the request could be serviced | ||
| // unlock it and wait for the flag | ||
| if(flag == NULL){ | ||
| pthread_cond_wait(cond, mutex); | ||
| } else { | ||
| while(zend_atomic_bool_load_ex(flag) != wanted_value){ | ||
| pthread_cond_wait(cond, mutex); | ||
| } | ||
| } | ||
| 
     | 
||
| if(release){ | ||
| pthread_mutex_unlock(mutex); | ||
| } | ||
| } | ||
| /* }}} */ | ||
| 
     | 
||
| static void zend_ffi_wait_request_barrier(bool release){ /* {{{ */ | ||
| zend_ffi_wait_cond(&FFI_G(vm_request_lock), &FFI_G(vm_unlock), &FFI_G(callback_in_progress), false, release); | ||
| } | ||
| /* }}} */ | ||
| 
     | 
||
| static void zend_ffi_callback_trampoline(ffi_cif* cif, void* ret, void** args, void* data) /* {{{ */ | ||
| { | ||
| // wait for a previously initiated request to complete | ||
| zend_ffi_wait_request_barrier(false); | ||
| 
         
      Comment on lines
    
      +1038
     to 
      +1041
    
   
  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This may a regular in-main-thread callback, that doesn't require any locks. Ideally, we should make distinct between regular and "thread" callback $libc->pthread_create(
		FFI::addr($tid), NULL,
		FFI::thread_callback($thread_func), FFI::addr($arg)
	); | 
||
| { | ||
| // mutex is now locked, and request is not pending. | ||
| // start a new one | ||
| zend_atomic_bool_store_ex(&FFI_G(callback_in_progress), true); | ||
| 
     | 
||
| zend_ffi_call_data call_data = { | ||
| .cif = cif, | ||
| .ret = ret, | ||
| .args = args, | ||
| .data = (zend_ffi_callback_data *)data | ||
| }; | ||
| FFI_G(callback_data) = call_data; | ||
| 
     | 
||
| FFI_G(callback_tid) = pthread_self(); | ||
| bool is_main_thread = FFI_G(callback_tid) == FFI_G(main_tid); | ||
| 
     | 
||
| if(!is_main_thread){ | ||
| // post interrupt request to synchronize with the main thread | ||
| zend_atomic_bool_store_ex(&EG(vm_interrupt), true); | ||
| 
     | 
||
| // release mutex and wait for ack | ||
| pthread_cond_wait(&FFI_G(vm_ack), &FFI_G(vm_request_lock)); | ||
| 
     | 
||
| // prepare the stack call info/limits for the current thread | ||
| zend_call_stack_init(); | ||
| } | ||
| 
     | 
||
| // dispatch the callback | ||
| zend_ffi_dispatch_callback(); | ||
| 
     | 
||
| zend_ffi_dispatch_callback_end(); | ||
| } | ||
| pthread_mutex_unlock(&FFI_G(vm_request_lock)); | ||
| } | ||
| /* }}} */ | ||
| 
     | 
||
| static void *zend_ffi_create_callback(zend_ffi_type *type, zval *value) /* {{{ */ | ||
| { | ||
| zend_fcall_info_cache fcc; | ||
| 
          
            
          
           | 
    @@ -5518,13 +5637,25 @@ ZEND_MINIT_FUNCTION(ffi) | |
| return zend_ffi_preload(FFI_G(preload)); | ||
| } | ||
| 
     | 
||
| FFI_G(main_tid) = pthread_self(); | ||
| 
     | 
||
| zend_atomic_bool_store_ex(&FFI_G(callback_in_progress), false); | ||
| orig_interrupt_function = zend_interrupt_function; | ||
| zend_interrupt_function = zend_ffi_interrupt_function; | ||
| 
     | 
||
| pthread_mutex_init(&FFI_G(vm_request_lock), NULL); | ||
| pthread_cond_init(&FFI_G(vm_ack), NULL); | ||
| pthread_cond_init(&FFI_G(vm_unlock), NULL); | ||
| 
     | 
||
| return SUCCESS; | ||
| } | ||
| /* }}} */ | ||
| 
     | 
||
| /* {{{ ZEND_RSHUTDOWN_FUNCTION */ | ||
| ZEND_RSHUTDOWN_FUNCTION(ffi) | ||
| { | ||
| zend_ffi_wait_request_barrier(true); | ||
| 
     | 
||
| if (FFI_G(callbacks)) { | ||
| zend_hash_destroy(FFI_G(callbacks)); | ||
| efree(FFI_G(callbacks)); | ||
| 
          
            
          
           | 
    @@ -5636,6 +5767,7 @@ static ZEND_GINIT_FUNCTION(ffi) | |
| /* {{{ ZEND_GINIT_FUNCTION */ | ||
| static ZEND_GSHUTDOWN_FUNCTION(ffi) | ||
| { | ||
| zend_ffi_wait_request_barrier(true); | ||
| if (ffi_globals->scopes) { | ||
| zend_hash_destroy(ffi_globals->scopes); | ||
| free(ffi_globals->scopes); | ||
| 
          
            
          
           | 
    ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -17,6 +17,9 @@ | |
| #ifndef PHP_FFI_H | ||
| #define PHP_FFI_H | ||
| 
     | 
||
| #include <ffi.h> | ||
| #include <pthread.h> | ||
                
       | 
||
| 
     | 
||
| extern zend_module_entry ffi_module_entry; | ||
| #define phpext_ffi_ptr &ffi_module_entry | ||
| 
     | 
||
| 
        
          
        
         | 
    @@ -27,6 +30,15 @@ typedef enum _zend_ffi_api_restriction { | |
| } zend_ffi_api_restriction; | ||
| 
     | 
||
| typedef struct _zend_ffi_type zend_ffi_type; | ||
| typedef struct _zend_ffi_callback_data zend_ffi_callback_data; | ||
| 
     | 
||
| 
     | 
||
| typedef struct _zend_ffi_call_data { | ||
| ffi_cif* cif; | ||
| void* ret; | ||
| void** args; | ||
| zend_ffi_callback_data* data; | ||
| } zend_ffi_call_data; | ||
| 
     | 
||
| ZEND_BEGIN_MODULE_GLOBALS(ffi) | ||
| zend_ffi_api_restriction restriction; | ||
| 
        
          
        
         | 
    @@ -35,6 +47,15 @@ ZEND_BEGIN_MODULE_GLOBALS(ffi) | |
| /* predefined ffi_types */ | ||
| HashTable types; | ||
| 
     | 
||
| zend_atomic_bool callback_in_progress; | ||
| 
     | 
||
| pthread_mutex_t vm_request_lock; | ||
| pthread_cond_t vm_ack; | ||
| pthread_cond_t vm_unlock; | ||
| pthread_t callback_tid; | ||
| pthread_t main_tid; | ||
| zend_ffi_call_data callback_data; | ||
| 
     | 
||
| /* preloading */ | ||
| char *preload; | ||
| HashTable *scopes; /* list of preloaded scopes */ | ||
| 
          
            
          
           | 
    ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| --TEST-- | ||
| FFI Thread safe callbacks | ||
| --EXTENSIONS-- | ||
| ffi | ||
| --SKIPIF-- | ||
| <?php | ||
| try { | ||
| $libc = FFI::cdef(' | ||
| int pthread_create(void *restrict thread, | ||
| const void *restrict attr, | ||
| void *(*start_routine)(void *), | ||
| void *arg); | ||
| ', 'libc.so.6'); | ||
| } catch(Throwable $_){ | ||
| die('skip libc.so.6 not available'); | ||
| } | ||
| ?> | ||
| --INI-- | ||
| ffi.enable=1 | ||
| --FILE-- | ||
| <?php | ||
| $libc = FFI::cdef(' | ||
| typedef uint64_t pthread_t; | ||
| int pthread_create(pthread_t *thread, | ||
| const void* attr, | ||
| void *(*start_routine)(void *), | ||
| void *arg); | ||
| int pthread_detach(pthread_t thread); | ||
| ', 'libc.so.6'); | ||
| 
     | 
||
| $tid = $libc->new($libc->type('pthread_t')); | ||
| $accum = 0; | ||
| $thread_func = function($arg) use($libc, &$accum){ | ||
| //$v = $libc->cast('int *', $arg)[0]; | ||
| FFI::free($arg); | ||
| usleep(10 * 1000); | ||
| $accum++; | ||
| //print("."); | ||
| }; | ||
| 
     | 
||
| for($i=0; $i<100; $i++){ | ||
| $arg = $libc->new('int', false); | ||
| $arg->cdata = $i; | ||
| 
     | 
||
| $libc->pthread_create( | ||
| FFI::addr($tid), NULL, | ||
| $thread_func, FFI::addr($arg) | ||
| ); | ||
| $libc->pthread_detach($tid->cdata); | ||
| } | ||
| 
     | 
||
| while($accum != 100){ | ||
| //print("w"); | ||
| usleep(1000); | ||
| } | ||
| print($accum); | ||
| ?> | ||
| --EXPECT-- | ||
| 100 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should check if this is FFI related interrupt. This may be a POSIX signal or something else...