Skip to content

Commit

Permalink
Integrating partr in progress
Browse files Browse the repository at this point in the history
Added partr code. Abstracted interface to threading infrastructure.
Original threading working. Parallel task runtime compiles.
  • Loading branch information
kpamnany committed Jun 30, 2017
1 parent 32422d4 commit 28a825b
Show file tree
Hide file tree
Showing 14 changed files with 1,460 additions and 385 deletions.
1 change: 1 addition & 0 deletions Make.inc
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,7 @@ endif
# Threads
ifneq ($(JULIA_THREADS), 0)
JCPPFLAGS += -DJULIA_ENABLE_THREADING -DJULIA_NUM_THREADS=$(JULIA_THREADS)
//JCPPFLAGS += -DJULIA_ENABLE_THREADING -DJULIA_NUM_THREADS=$(JULIA_THREADS) -DJULIA_ENABLE_PARTR
endif

# Intel VTune Amplifier
Expand Down
6 changes: 3 additions & 3 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ SRCS := \
jltypes gf typemap ast builtins module interpreter symbol \
dlload sys init task array dump staticdata toplevel jl_uv datatype \
simplevector APInt-C runtime_intrinsics runtime_ccall precompile \
threadgroup threading stackwalk gc gc-debug gc-pages method \
jlapi signal-handling safepoint jloptions timing subtype rtutils \
crc32c
threadgroup threading forkjoin-ti partr multiq synctreepool \
stackwalk gc gc-debug gc-pages method jlapi signal-handling \
safepoint jloptions timing subtype rtutils crc32c

ifeq ($(USEMSVC), 1)
SRCS += getopt
Expand Down
196 changes: 196 additions & 0 deletions src/forkjoin-ti.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// This file is a part of Julia. License is MIT: https://julialang.org/license

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>

#include "julia.h"
#include "julia_internal.h"

#ifdef __cplusplus
extern "C" {
#endif

#include "threading.h"
#include "threadgroup.h"

#ifdef JULIA_ENABLE_THREADING
#ifdef JULIA_ENABLE_FORKJOIN_TI

// thread state
enum {
TI_THREAD_INIT,
TI_THREAD_WORK
};

// passed to thread function
typedef struct {
int16_t volatile state;
ti_threadgroup_t *tg;
} ti_threadarg_t;

// work command to thread function
typedef struct {
jl_method_instance_t *mfunc;
jl_generic_fptr_t fptr;
jl_value_t **args;
uint32_t nargs;
jl_value_t *ret;
jl_module_t *current_module;
size_t world_age;
} ti_threadwork_t;

// for broadcasting work to threads
static ti_threadwork_t threadwork;

// only one thread group for now
static ti_threadgroup_t *tgworld;

void jl_init_threadinginfra(void) { }

void jl_init_threadarg(jl_threadarg_t *targ)
{
ti_threadarg_t *tiarg = (ti_threadarg_t *)malloc(sizeof (ti_threadarg_t));
tiarg->state = TI_THREAD_INIT;
targ->arg = (void *)tiarg;
}

void jl_init_started_threads(jl_threadarg_t **targs)
{
// set up the world thread group
ti_threadgroup_create(1, jl_n_threads, 1, &tgworld);
for (int i = 0; i < jl_n_threads; ++i)
ti_threadgroup_addthread(tgworld, i, NULL);

jl_ptls_t ptls = jl_get_ptls_states();
ti_threadgroup_initthread(tgworld, ptls->tid);

// give the threads the world thread group; they will block waiting for fork
for (int i = 0; i < jl_n_threads - 1; ++i) {
ti_threadarg_t *tiarg = (ti_threadarg_t *)targs[i]->arg;
tiarg->tg = tgworld;
jl_atomic_store_release(&tiarg->state, TI_THREAD_WORK);
}
}

// thread function: used by all except the main thread
void jl_threadfun(void *arg)
{
jl_ptls_t ptls = jl_get_ptls_states();
jl_threadarg_t *targ = (jl_threadarg_t *)arg;
ti_threadarg_t *tiarg = (ti_threadarg_t *)targ->arg;
ti_threadgroup_t *tg;
ti_threadwork_t *work;

// initialize this thread (set tid, create heap, etc.)
jl_init_threadtls(targ->tid);
jl_init_stack_limits(0);

// set up tasking
jl_init_root_task(ptls->stack_lo, ptls->stack_hi - ptls->stack_lo);
#ifdef COPY_STACKS
jl_set_base_ctx((char*)&arg);
#endif

// set the thread-local tid and wait for a thread group
while (jl_atomic_load_acquire(&tiarg->state) == TI_THREAD_INIT)
jl_cpu_pause();

// Assuming the functions called below doesn't contain unprotected GC
// critical region. In general, the following part of this function
// shouldn't call any managed code without calling `jl_gc_unsafe_enter`
// first.
jl_gc_state_set(ptls, JL_GC_STATE_SAFE, 0);
uv_barrier_wait(targ->barrier);

// initialize this thread in the thread group
tg = tiarg->tg;
ti_threadgroup_initthread(tg, ptls->tid);

// free the thread argument here
free(tiarg);
free(targ);

int init = 1;

// work loop
for (; ;) {
ti_threadgroup_fork(tg, ptls->tid, (void **)&work, init);
init = 0;

if (work) {
// TODO: before we support getting return value from
// the work, and after we have proper GC transition
// support in the codegen and runtime we don't need to
// enter GC unsafe region when starting the work.
int8_t gc_state = jl_gc_unsafe_enter(ptls);
// This is probably always NULL for now
jl_module_t *last_m = ptls->current_module;
size_t last_age = ptls->world_age;
JL_GC_PUSH1(&last_m);
ptls->current_module = work->current_module;
ptls->world_age = work->world_age;
jl_thread_run_fun(&work->fptr, work->mfunc, work->args, work->nargs);
ptls->current_module = last_m;
ptls->world_age = last_age;
JL_GC_POP();
jl_gc_unsafe_leave(ptls, gc_state);
}

ti_threadgroup_join(tg, ptls->tid);
}
}

// interface to user code: specialize and compile the user thread function
// and run it in all threads
JL_DLLEXPORT jl_value_t *jl_threading_run(jl_value_t *_args)
{
jl_ptls_t ptls = jl_get_ptls_states();
// GC safe
uint32_t nargs;
jl_value_t **args;
if (!jl_is_svec(_args)) {
nargs = 1;
args = &_args;
}
else {
nargs = jl_svec_len(_args);
args = jl_svec_data(_args);
}

int8_t gc_state = jl_gc_unsafe_enter(ptls);

threadwork.mfunc = jl_lookup_generic(args, nargs,
jl_int32hash_fast(jl_return_address()), ptls->world_age);
// Ignore constant return value for now.
if (jl_compile_method_internal(&threadwork.fptr, threadwork.mfunc))
return jl_nothing;
threadwork.args = args;
threadwork.nargs = nargs;
threadwork.ret = jl_nothing;
threadwork.current_module = ptls->current_module;
threadwork.world_age = ptls->world_age;

// fork the world thread group
ti_threadwork_t *tw = &threadwork;
ti_threadgroup_fork(tgworld, ptls->tid, (void **)&tw, 0);

// this thread must do work too
tw->ret = jl_thread_run_fun(&threadwork.fptr, threadwork.mfunc, args, nargs);

// wait for completion
ti_threadgroup_join(tgworld, ptls->tid);

jl_gc_unsafe_leave(ptls, gc_state);

return tw->ret;
}

#endif // JULIA_ENABLE_FORKJOIN_TI
#endif // JULIA_ENABLE_THREADING

#ifdef __cplusplus
}
#endif
18 changes: 18 additions & 0 deletions src/forkjoin-ti.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// This file is a part of Julia. License is MIT: https://julialang.org/license

#ifndef FORKJOINTI_H
#define FORKJOINTI_H

#include <stdint.h>

#ifdef __cplusplus
extern "C" {
#endif


#ifdef __cplusplus
}
#endif

#endif /* FORKJOINTI_H */

57 changes: 57 additions & 0 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,63 @@ JL_DLLEXPORT void JL_NORETURN jl_rethrow_other(jl_value_t *e);
JL_DLLEXPORT void JL_NORETURN jl_no_exc_handler(jl_value_t *e);

#ifdef JULIA_ENABLE_THREADING
#ifdef JULIA_ENABLE_PARTR
/* ptask settings */
#define TASK_IS_DETACHED 0x02
/* clean up the task on completion */
#define TASK_IS_STICKY 0x04
/* task is sticky to the thread that first runs it */

typedef struct _arriver_t arriver_t;
typedef struct _reducer_t reducer_t;

typedef struct _jl_ptask_t jl_ptask_t;

struct _jl_ptask_t {
/* to link this task into queues */
jl_ptask_t *next;

/* TODO: context and stack */

/* task entry point, arguments, result, reduction function */
void *(*f)(void *, int64_t, int64_t);
void *arg, *result;
int64_t start, end;

/* reduction function, for parfors */
void *(*rf)(void *, void *);

/* parent (first) task of a parfor set */
jl_ptask_t *parent;

/* to synchronize/reduce grains of a parfor */
arriver_t *arr;
reducer_t *red;

/* parfor reduction result */
void *red_result;

/* completion queue and lock */
jl_ptask_t *cq;
int8_t cq_lock;

/* task settings */
int8_t settings;

/* tid of the thread to which this task is sticky */
int16_t sticky_tid;

/* the index of this task in the set of grains of a parfor */
int16_t grain_num;

/* for the multiqueue */
int16_t prio;

/* to manage task pools */
int16_t pool, index, next_avail;
};
#endif // JULIA_ENABLE_PARTR

static inline void jl_lock_frame_push(jl_mutex_t *lock)
{
jl_ptls_t ptls = jl_get_ptls_states();
Expand Down
40 changes: 39 additions & 1 deletion src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,6 @@ extern ssize_t jl_tls_offset;
extern const int jl_tls_elf_support;
void jl_init_threading(void);
void jl_start_threads(void);
void jl_shutdown_threading(void);

// Whether the GC is running
extern char *jl_safepoint_pages;
Expand Down Expand Up @@ -711,6 +710,45 @@ STATIC_INLINE char *jl_copy_str(char **to, const char *from)
// Returns time in nanosec
JL_DLLEXPORT uint64_t jl_hrtime(void);

#ifdef JULIA_ENABLE_PARTR
// congruential random number generator
STATIC_INLINE void seed_cong(uint64_t *seed)
{
*seed = jl_hrtime();
}
STATIC_INLINE void unbias_cong(uint64_t max, uint64_t *unbias)
{
*unbias = UINT64_MAX - ((UINT64_MAX % max)+1);
}
STATIC_INLINE uint64_t cong(uint64_t max, uint64_t unbias, uint64_t *seed)
{
while ((*seed = 69069 * (*seed) + 362437) > unbias)
;
return *seed % max;
}

// multiq
void multiq_init();
int multiq_insert(jl_ptask_t *elem, int16_t priority);
jl_ptask_t *multiq_deletemin();
int16_t multiq_minprio();

// sync trees
typedef struct _arriver_t arriver_t;
typedef struct _reducer_t reducer_t;

/* interface */
void synctreepool_init();
void synctreepool_destroy();
arriver_t *arriver_alloc();
void arriver_free(arriver_t *);
reducer_t *reducer_alloc();
void reducer_free(reducer_t *);

int last_arriver(arriver_t *, int);
void *reduce(arriver_t *, reducer_t *, void *(*rf)(void *, void *), void *, int);
#endif // JULIA_ENABLE_PARTR

// libuv stuff:
JL_DLLEXPORT extern void *jl_dl_handle;
JL_DLLEXPORT extern void *jl_RTLD_DEFAULT_handle;
Expand Down
Loading

0 comments on commit 28a825b

Please sign in to comment.