Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Experimental API for may-happen in parallel parallelism #39773

Open
wants to merge 97 commits into
base: master
Choose a base branch
from

Conversation

tkf
Copy link
Member

@tkf tkf commented Feb 21, 2021

tl;dr How about adding optimizable task parallel API to Julia?

Introduction

How to teach parallelism to the Julia compiler

(If you've already seen @vchuravy's PR #31086, maybe this part is redundant.)

How we currently implement the task parallel API in Julia introduces a couple of obstacles for supporting high-performance parallel programs. In particular, the compiler cannot analyze and optimize the child tasks in the context of the surrounding code. This limits the benefit parallel programs can obtain from existing analysis and optimizations like type inference and constant propagations. Furthermore, the notion of tasks in Julia supports complex concurrent communication mechanisms that imposes a hard limitation for the scheduler to implement an efficient scheduling strategy.

Tapir (Schardl et al., 2019) is a parallel IR that can be added to existing IR in the SSA form. They demonstrated that Tapir can be added to LLVM (aka Tapir/LLVM; a part of OpenCilk) relatively "easily" and existing programs written in Cilk can benefit from pre-existing optimizations such as loop invariant code motion (LICM), common sub-expression elimination (CSE), and others that were already written for serial programs. In principle, a similar strategy should work for any existing compiler with SSA IR developed for serial programs, including the Julia compiler. That is to say, with Tapir in the Julia IR, we should be able to unlock the optimizations in Julia for parallel programs.

Tapir enables the parallelism in the compiler by limiting its focus to parallel programs with the serial-projection property (@vchuravy and I call this type of parallelism the may-happen in parallel parallelism for clarity). Although this type of parallel programs cannot use unconstrained concurrency communication primitives (e.g., take!(channel)), it can be used for a vast majority of the parallel programs that are compute-intensive; i.e., the type of programs for which Julia is already optimized/targeting. Furthermore, having a natural way to express this type of computation can be beneficial not only for the compiler but also for the scheduler.

A strategy for optimizable task parallel API

This PR implements Tapir in the Julia compiler, in Julia. I've been working with @vchuravy and TB Schardl (@neboat) to extend and complete @vchuravy's PR #31086 that uses OpenCilk (which includes a fork of LLVM and clang) for supporting Tapir at LLVM level (Tapir/LLVM). This project still has to solve many obstacles since extracting out Julia tasks at a late stage of LLVM pass is very hard (you can see my VERY work-in-progress fork at https://github.com/cesmix-mit/julia). We observed that many benefits of Tapir can actually be realized at the level of Julia's compilation passes (see below). For example, type inference and constant propagation implemented in the Julia compiler can penetrate through @spawn code blocks with Tapir. So, I implemented Tapir in pure Julia and made it work without the dependency on OpenCilk. Since this can be done without taking on a dependency on a non-standard extension to the LLVM compilation pipeline, we think this is a good sweet spot for start adding parallelism support to Julia in a way fully integrated into the compiler.

Although there are more works to be done to turn this into a mergeable/releasable (but experimental) state, I think it'd be beneficial to open a PR at this stage because to start discussing:

  • if we want parallelism support integrated into the compiler
  • what kind of frontend API we want
  • implementation strategy

I'm very interested in receiving feedback!

Proposed API

Here is an example usage of the Tapir API implemented in this PR for the moment. Following the tradition, it computes the Fibonacci number in parallel:

using Base.Experimental: Tapir

function fib(N)
    if N <= 1
        return N
    end
    local x1, x2                      # task output
    Tapir.@sync begin                                  # -.
        Tapir.@spawn x1 = fib(N - 1)  # child task     #  | syncregion
        x2 = fib(N - 2)                                #  |
    end                                                # -'
    return x1 + x2
end
  • Tapir.@sync begin ... end denotes the syncregion in which Tapir tasks can be spawned.
  • Tapir.@spawn ... denotes the code block that may run in parallel with respect to other tasks (including the parent task; e.g., fib(N - 2) in the example above).
  • The task output variables, i.e., the variables that are accessed after the syncregion must be declared with local. In the example above, x1 and x2 are the task output. Declaring with local is required because, just like the standard @sync, Tapir.@sync creates a scope (i.e., it is a let block). Task output variables can also be initialized before Tapir.@sync.
  • The code written with Tapir is expected to have the serial projection property (see below for the motivation). In particular, the code must be valid after removing Tapir-related syntax (i.e., replacing Tapir.@sync and Tapir.@spawn with let blocks).
  • It is undefined behavior (for now 1) to assign local variables in one task and read them in another task, since such behavior leads to data races.
  • @goto into or out of the tasks is not allowed

One of the important consideration is to keep this API very minimal to let us add more optimizations like OpenCilk-based lowering at a later stage of LLVM. I've designed this API first with my experimental branch of OpenCilk-based Tapir support. My OpenCilk-based implementation was not perfect but I think this is reasonably constrained to fully implement it.

Loop-based API: Although OpenCilk and @vchuravy's original PR support parallel for loop, I propose to not include it for the initial support in Base. On one hand, it is straight forward to implement a simple parallel loop framework from just this API. On the other hand, there are a lot of consideration to be made in the design space if we want to have extensible data parallelism and I feel it's beyond the scope of this PR.


Questions

More explicit task output declaration?

Current handling of task output variables may be "too automatic" and makes reasoning about the code harder for Julia programmers. For example, consider following code that has no race for now:

function ...
    # ... hundreds of lines (no code mentioning x) ...
    Tapir.@sync begin
        Tapir.@spawn begin
            x = f()
            h(x)
        end
        x = g()
        h(x)
    end
    ...
end

After sometimes, it may be re-written as

function ...
    if ...
       x = ...
       return x
    end
    # ... hundreds of lines (no code mentioning x) ...
    Tapir.@sync begin
        Tapir.@spawn begin
            x = f()
            h(x)
        end
        x = g()
        h(x)
    end
    ...
end

This code now has a race since x is updated by the child and parent tasks. Although this particular example can be rejected by the compiler, it is impossible to do so in general. So, it may be worth considering adding a declaration of task outputs. Maybe something like

Tapir.@output a b c
Tapir.@sync begin
    Tapir.@spawn ...
    ...
end

or

Tapir.@sync (a, b, c) begin
    Tapir.@spawn ...
    ...
end

The compiler can then compare and verify that its understanding of task output variables and the variables specified by the programmer.

It would be even better if we can support a unified syntax like this for all types of tasks (@async and Threads.@spawn).

Bikeshedding the name

On one hand, the name of the module Base.Experimental.Tapir is not entirely appropriate since Tapir is the concept for IR and not the user-facing API. On the other hand, we cannot come up with a more appropriate name for this. It is mainly because there is no crisp terminology for this type of parallelism (even though Cilk has been showing the importance of this approach with multiple aspects). Another name could be Base.Experimental.Parallel. But "parallel" can also mean distributed or GPU-based parallelism. Providing @psync and @pspawn macros directly from Experimental is another approach.

Performance improvements

Demo 1: type inference

The return type of the example fib above and the simple threaded mapreduce example mapfold implemented in test/tapir.jl can be inferred even though they contain Tapir.@spawn:

julia> include("test/tapir.jl");

julia> @code_typed fib(3)
CodeInfo(
    ...
) => Int64

julia> @code_typed mapfold(x -> isodd(x) ? missing : x, +, 1:3)
CodeInfo(
    ...
) => Union{Missing, Int64}

As we all know, the improvement in type inference drastically change the performance when there is a tight loop following the parallel code (without a function boundary).

Demo 2: constant propagation

Here is another very minimal example for demonstrating performance benefit we observed. It (naively) computes the average on the sliding window in parallel:

using Base.Experimental: Tapir

@inline function avgfilter!(ys, xs, N)
    @assert axes(ys) == axes(xs)
    for offset in firstindex(xs)-1:lastindex(xs)-N
        y = zero(eltype(xs))
        for k in 1:N
            y += @inbounds xs[offset + k]
        end
        @inbounds ys[offset + 1] = y / N
    end
    return ys
end

function demo!(ys1, ys2, xs1, xs2)
    N = 32
    Tapir.@sync begin
        Tapir.@spawn avgfilter!(ys1, xs1, N)
        avgfilter!(ys2, xs2, N)
    end
    return ys1, ys2
end

For comparison, here are the same code using the current task system (Threads.@spawn) and the sequential version.:

function demo_current!(ys1, ys2, xs1, xs2)
    N = 32
    @sync begin
        Threads.@spawn avgfilter!(ys1, xs1, N)
        avgfilter!(ys2, xs2, N)
    end
    return ys1, ys2
end

function demo_seq!(ys1, ys2, xs1, xs2)
    N = 32
    avgfilter!(ys1, xs1, N)
    avgfilter!(ys2, xs2, N)
    return ys1, ys2
end

We can then run the benchmarks with

using BenchmarkTools
suite = BenchmarkGroup()
xs1 = randn(2^20)
ys1 = zero(xs1)
xs2 = randn(length(xs1))
ys2 = zero(xs2)
@assert demo!(zero(ys1), zero(ys2), xs1, xs2) == demo_current!(ys1, ys2, xs1, xs2)
@assert demo!(zero(ys1), zero(ys2), xs1, xs2) == demo_seq!(ys1, ys2, xs1, xs2)
suite["tapir"] = @benchmarkable demo!($ys1, $ys2, $xs1, $xs2)
suite["current"] = @benchmarkable demo_current!($ys1, $ys2, $xs1, $xs2)
suite["seq"] = @benchmarkable demo_seq!($ys1, $ys2, $xs1, $xs2)
results = run(suite, verbose = true)

With julia started with a single thread 2, we get

3-element BenchmarkTools.BenchmarkGroup:
  tags: []
  "tapir" => Trial(5.614 ms)
  "seq" => Trial(5.588 ms)
  "current" => Trial(23.537 ms)

i.e., Tapir and sequential programs have identical performance while the performance of the code written with Threads.@spawn (current) is much worse. This is because Julia can propagate the constant across task boundaries with Tapir. Subsequently, since LLVM can see that the innermost loop has a fixed loop count, the loop can be unrolled and vectorized.

It can be observed by introspecting the generated code:

julia> @code_typed demo!(ys1, ys2, xs1, xs2)
CodeInfo(
1 ── %1  = $(Expr(:syncregion))
│    %2  = (Base.Tapir.taskgroup)()::Channel{Any}
│    %3  = $(Expr(:new_opaque_closure, Tuple{}, false, Union{}, Any, opaque closure @0x00007fec1d9ce5e0 in Main, Core.Argument(2), Core.Ar
gument(4), :(%1)))::Any
...
) => Nothing

julia> m = Base.unsafe_pointer_to_objref(Base.reinterpret(Ptr{Cvoid}, 0x00007fec1d9ce5e0))
opaque closure @0x00007fec1d9ce5e0 in Main

julia> Base.uncompressed_ir(m)
CodeInfo(
...
│   ││┌ @ range.jl:740 within `iterate'
│   │││┌ @ promotion.jl:409 within `=='
│   ││││ %56 = (%51 === 32)::Bool
│   │││└
└
...

i.e., N = 32 is successfully propagated. On the other hand, in the current task system:

julia> @code_typed demo_current!(ys1, ys2, xs1, xs2)
CodeInfo(
...
│    %13  = π (32, Core.Const(32))
│    %14  = %new(var"#14#15"{Vector{Float64}, Vector{Float64}, Int64}, ys1, xs1, %13)::Core.PartialStruct(var"#14#15"{Vector{Float64}, Vec
tor{Float64}, Int64}, Any[Vector{Float64}, Vector{Float64}, Core.Const(32)])

i.e., N = 32 (%32) is captured as an Int64.

Indeed, the performance of the sequential parts of the code is crucial for observing the speedup. With julia -t2, we see:

3-element BenchmarkTools.BenchmarkGroup:
  tags: []
  "tapir" => Trial(2.799 ms)
  "seq" => Trial(5.583 ms)
  "current" => Trial(20.767 ms)

I think an important aspect of this example is that even a "little bit" of compiler optimizations enabled on the Julia side can be enough for triggering optimizations on the LLVM side yielding a substantial effect.


Demo 3: dead code elimination

The optimizations that can be done with forward analysis such as type inference and constant propagation are probably implementable for the current threading system in Julia with reasonable amount of effort. However, optimizations such as dead code elimination (DCE) that require backward analysis may be significantly more challenging given unconstrained concurrency of Julia's Task. In contrast, enabling Tapir at Julia IR level automatically triggers Julia's DCE (which, in turn, can trigger LLVM's DCE):

using Base.Experimental: Tapir

@inline function eliminatable_computation(xs)
    a = typemax(UInt)
    b = 0
    for x in xs
        a = (typemax(a) + a) ÷ ifelse(x == 0, 1, x)  # arbitrary computation that LLVM can eliminate
        b += x
    end
    return (a, b)
end

function demo_dce()
    local b1, b2
    Tapir.@sync begin
        Tapir.@spawn begin
            a1, b1 = eliminatable_computation(UInt(1):UInt(33554432))
        end
        a2, b2 = eliminatable_computation(UInt(3):UInt(33554432))
    end
    return b1 + b2   # a1 and a2 not used
end

In single thread julia, this takes 30 ms while an equivalent code with current threading system (i.e., replace Tapir. with Threads.) takes 250 ms.

Note that Julia only has to eliminate b1 and not the code inside eliminatable_computation. The rest of DCE can happen inside LLVM (which does not have to understand Julia's parallelism).

Motivation behind the restricted task semantics

As explained in the proposed API, the serial projection property restrict the set of programs expressible with Tapir. Although we have some degree of interoperability (see below), reasoning and guaranteeing forward progress are easy only when the programmer stick with the existing patterns. In particular, it means that we will not be able to drop Threads.@spawn or @async for expressing programs with unconstrained concurrency. However, this restricted semantics is still enough for expressing compute-oriented programs and allows more optimizations in the compiler and the scheduler.

As shown above, enabling Tapir at Julia level already unlocks some appealing set of optimizations for parallel programs. The serial projection property is useful for supporting optimizations that require backward analysis such as DCE in a straightforward manner. Once we manage to implement a proper OpenCilk integration, we expect to see the improvements from much richer set of existing optimizations at the LLVM level. This would have multiplicative effects when more LLVM-side optimizations are enabled (e.g., #36832). Furthermore, there is ongoing research on more cleverly using the parallel IR for going beyond unlocking pre-existing optimizations. For example, fusing arbitrary multiple Tasks to be executed in a single Task can introduce deadlock. However, this is not the case in Tapir tasks thanks to the serial projection property. This, in turn, let us implement more optimizations inside the compiler such as a task coalescing pass that is aware of a downstream vecotrizer pass. In addition to optimizing user programs, this type of task improves productivity tools such as race detector (ref productivity tools provided by OpenCilk).

In addition to the performance improvements on the side of the compiler, may-happen in parallel parallelism can help the parallel task runtime to handle the task scheduling cleverly. For example, having parallel IR makes it easy to implement continuation-stealing as used in Cilk (although it may not be compatible with the depth-first approach in Julia). Another possibility is to use a version of schedule that may fail to reduce contention when there are a lot of small tasks (as I discussed here). This is possible because we don't allow concurrency primitives in Tapir tasks and the task is not guaranteed to be executed in a dedicated Task. Since Tapir makes the call/task tree analyzable by the compiler, it may also help improving the depth-first scheduler.

Composability with existing task system

If we are going to have two notions of parallel tasks, it is important that the two systems can work together seamlessly. Of course, since the Tapir tasks cannot use arbitrary concurrency APIs, we can't support some task APIs inside Tapir.@spawn (e.g., take!(channel)). However, Tapir only requires the forward progress guarantee to be independent of other tasks within the same syncregion but not with respect to the code outside. Simplify put, we can invoke concurrency API as long as it does not communicate with other Tapir tasks in the same syncregion. For example, following code is valid:

@sync begin
    Threads.@spawn begin
        Tapir.@sync begin
            Tapir.@spawn begin
                put!(bounded_channel, 0)      # Thunk 1
            end
            f()                               # Thunk 2
        end
    end
    Threads.@spawn take!(bounded_channel)     # Thunk 3
end

as long as f() does not use the bounded_channel. That is to say, the author of this code guarantees the forward progress of Thunk 1 and Thunk 2 independent of each other but not with respect to Thunk 3. Therefore, the example above is a valid program.

Another class of useful composable idiom is the use of concurrency API that unconditionally guarantees forward progress. For example, put!(unbounded_channel, item) can make forward progress independent of other tasks (but this is not true for take!). This is also true for schedule(::Task). Thus, invoking Threads.@spawn inside Tapir.@sync is valid if (bot not only if 3) we do not invoke wait(task) in the Tapir.@sync. For example, the following code is valid

unbuffered_channel = Channel(0)
@sync begin
    Tapir.@sync begin
        Tapir.@spawn begin
            t1 = Threads.@spawn put!(unbuffered_channel, 0)
            # wait(t1)
        end
        t2 = Threads.@spawn take!(unbuffered_channel)
        # wait(t2)
    end
end

However, it is invalid if wait(t1) and wait(t2) are uncommented.


unbuffered_channel = Channel(0)
Tapir.@sync begin
    Tapir.@spawn begin
        @sync begin
            Threads.@spawn put!(unbuffered_channel, 0)
            take!(unbuffered_channel)
        end
    end
    do_something_else()  # does not touch `unbuffered_channel`
end

However, this is not an example of the idiom of using API that unconditionally guarantees forward progress.

Implementation strategy

Outlining

At the end of Julia's optimization passes (in run_passes) the child tasks are outlined into opaque closures and wrapped into a Task (see lower_tapir!(ir::IRCode) function). (Aside: @Keno's opaque closure was very useful for implementing outlining at a late stage! Actually, @vchuravy has been suggesting opaque closure would be the enabler for this type of transformation since the beginning of the project. But it's nice to see how things fit together in a real code.) The outlined tasks are spawned and synced using the helper functions defined in Base.Tapir.

Questions

  • We need to create a new closure at the end of Julia's optimization phase. I'm using jl_new_code_info_uninit and jl_make_opaque_closure_method for this. Are they allowed to be invoked in this part of the compiler?
  • In this PR, task outputs are handled by manual ad-hoc reg2mem-like passes (which is not very ideal). Currently (Julia 1.7-DEV), it looks like left over slots are rejected by the verifier. Would it make sense to allow slots in the SSA IR? Can code other than Tapir make use of it? For example, maybe slots can work as an alternative to Core.Box when combined with opaque closure?

Acknowledgment

Many thanks to @vchuravy for pre-reviewing the PR!

TODOs

To reviewers: please feel free to add new ones or move the things in wishlist to here

Wishlist

This is a list of nice-to-have things that are maybe not strictly required.

  • IR verifier for checking the invariance of Tapir (e.g., disallow @goto into a task)
  • Proper data flow analysis integration (do we need to use slot in the optimizer?)
  • Support nested syncregion (or at least error out in the frontend?)
  • Refine exception handling (esp. in the continuation)
  • More tests
  • Documentation (docstrings and manual/multi-threading.md)
  • Easy sub-CFG-to-opaque-closure interface? (useful for outlining of exceptions for GPUs)

Footnotes

  1. I think we can make it an error by analyzing this in the front end, in principle.

  2. Since this PR is mainly about compiler optimization and not about the scheduler, single-thread performance compared with sequential program is more informative than multi-thread performance.

  3. Note that invoking wait in a Tapir task can be valid in some cases. For example, if it is known that the set of the tasks spawned by Threads.@spawn eventually terminate independent of any forward progress in other Tapir task in the same syncregion, it is valid to wait on these tasks. For example, it is valid:

Co-authored-by: Valentin Churavy <vchuravy@mit.edu>
@ChrisRackauckas
Copy link
Member

Can IO be used inside of a Tapir task? That seems like something that requires a few more assumptions because the buffer is shared?

@vchuravy vchuravy mentioned this pull request Feb 21, 2021
8 tasks
@tkf
Copy link
Member Author

tkf commented Feb 21, 2021

I think I/O like reading or writing files from "normal" file system should be fine, unless you are doing something tricky. It's equivalent to the first example in "Composability with existing task system" section where something outside the Tapir tasks eventually guarantees the forward progress. But, it can introduce a deadlock if you do something tricky like using pipe to communicate between Tapir tasks.

@Keno
Copy link
Member

Keno commented Feb 22, 2021

I haven't reviewed this in detail, but I'm wondering whether the frontend (i.e. whatever macro is used) should generate OpaqueClosures directly. Generating them late definitely needs to be supported, but being closer to the frontend is definitely easier. Is there something here that would prevent that from working?

@tkf
Copy link
Member Author

tkf commented Feb 22, 2021

Ah, that's an interesting idea that I've never thought of! Yeah, I think it might make it easy to figure out input and output variables for tasks. One thing I'd worry about is the quality of type inference. I guess there is a lot of possible improvements over normal closure but I wonder what about the code without opaque closure? That is to say, what's the effect of replacing

$code

with

f = @opaque () -> $code
f()

in the inference, in particular, when $code re-assigns some variables?

Also, I wonder if allowing opaque closure to capture slots during inference and optimization helps this. (These slots are replaced with Ref-like objects just before LLVM passes, to avoid capturing any stack pointers.)

Edit: I think we still need to inline all opaque closures, at the beginning of the optimizations, to enable all the optimizers (unless the quality of optimization across opaque closure boundaries becomes indistinguishable from the code without opaque closures). But maybe some analysis before the optimization is easier this way.

@christophernhill
Copy link

@tkf and others I think Base.Experimental.Tapir is a good name. If not maybe Base.Experimental.Mhipp is a candidate a name, where Mhipp ~ may-happen-in-parallel parallel

@ViralBShah ViralBShah added the parallelism Parallel or distributed computation label Feb 23, 2021
@tkf tkf added the multithreading Base.Threads and related functionality label Feb 24, 2021
@JeffBezanson JeffBezanson self-assigned this Mar 1, 2021
Since optimized code is cached and used for inlining, we need to lower
Tapir after optimized code is cached, to enable Tapir for IPO (i.e.,
we want to cache the optimized code that still has `detach` etc.).
This patch moves Tapir lowering out of the standard `run_passes`
optimization phase, to avoid caching the code that includes lowered
code. Instead, lowering happens inside of `jl_emit_code` just before
emitting LLVM IR.
@tkf
Copy link
Member Author

tkf commented Mar 3, 2021

After experimenting with this patch, I found that pure-Julia Tapir has an interesting interaction with the inlining pass. Since Julia caches optimized code and use it for inlining, lowering ("targetting") Tapir to the concrete calls of the parallel runtime in the optimizer negates the benefits of Tapir for IPO. Ideally, Julia compiler can see Tapir instructions in the inlined code. I tried to address this in commit 0cff766 by moving the call to Tapir lowering to jl_emit_code C++ function. I guess this means it's better to consider Tapir lowering as "pre-LLVM pass" than "late Julia pass" (if it makes sense).

base/boot.jl Outdated Show resolved Hide resolved
base/compiler/optimize.jl Outdated Show resolved Hide resolved
base/compiler/optimize.jl Show resolved Hide resolved
base/compiler/optimize.jl Show resolved Hide resolved
base/compiler/optimize.jl Outdated Show resolved Hide resolved
base/compiler/ssair/passes.jl Outdated Show resolved Hide resolved
base/compiler/ssair/verify.jl Outdated Show resolved Hide resolved
base/compiler/tapirpasses.jl Outdated Show resolved Hide resolved
base/compiler/tapirpasses.jl Outdated Show resolved Hide resolved
base/compiler/tapirpasses.jl Outdated Show resolved Hide resolved
@tkf
Copy link
Member Author

tkf commented Mar 10, 2021

Discussed in multithreading BoF.

(1) Managing task outputs by assigning to local variables is hard to use. It would be better to have a future-like object that can be fetched/dereferenced after sync.

(2) We can merge normal @sync and Tapir.@sync. The new @sync emits the synchronization object/token for both normal task and Tapir task. It should be fairly easy to optimize out the thing that is not used (e.g., synchronization token for Tapir when @sync is used for normal tasks). We can then only have to provide a single API Tapir.@spawn.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
experimental multithreading Base.Threads and related functionality parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants