Skip to content

[SYCL] Add implementation of host-task with interop_handle argument #1749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions sycl/include/CL/sycl/detail/cg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,16 @@ class CGInteropTask : public CG {
class CGHostTask : public CG {
public:
std::unique_ptr<HostTask> MHostTask;
// queue for host-interop task
shared_ptr_class<detail::queue_impl> MQueue;
// context for host-interop task
shared_ptr_class<detail::context_impl> MContext;
vector_class<ArgDesc> MArgs;

CGHostTask(std::unique_ptr<HostTask> HostTask, vector_class<ArgDesc> Args,
CGHostTask(std::unique_ptr<HostTask> HostTask,
std::shared_ptr<detail::queue_impl> Queue,
std::shared_ptr<detail::context_impl> Context,
vector_class<ArgDesc> Args,
std::vector<std::vector<char>> ArgsStorage,
std::vector<detail::AccessorImplPtr> AccStorage,
std::vector<std::shared_ptr<const void>> SharedPtrStorage,
Expand All @@ -319,7 +326,8 @@ class CGHostTask : public CG {
: CG(Type, std::move(ArgsStorage), std::move(AccStorage),
std::move(SharedPtrStorage), std::move(Requirements),
std::move(Events), std::move(loc)),
MHostTask(std::move(HostTask)), MArgs(std::move(Args)) {}
MHostTask(std::move(HostTask)), MQueue(Queue), MContext(Context),
MArgs(std::move(Args)) {}
};

class CGBarrier : public CG {
Expand Down
16 changes: 16 additions & 0 deletions sycl/include/CL/sycl/handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <CL/sycl/detail/os_util.hpp>
#include <CL/sycl/event.hpp>
#include <CL/sycl/id.hpp>
#include <CL/sycl/interop_handle.hpp>
#include <CL/sycl/item.hpp>
#include <CL/sycl/kernel.hpp>
#include <CL/sycl/nd_item.hpp>
Expand Down Expand Up @@ -870,6 +871,21 @@ class __SYCL_EXPORT handler {
MCGType = detail::CG::CODEPLAY_HOST_TASK;
}

template <typename FuncT>
typename std::enable_if<
detail::check_fn_signature<typename std::remove_reference<FuncT>::type,
void(interop_handle)>::value>::type
codeplay_host_task(FuncT &&Func) {
throwIfActionIsCreated();

MNDRDesc.set(range<1>(1));
MArgs = std::move(MAssociatedAccesors);

MHostTask.reset(new detail::HostTask(std::move(Func)));

MCGType = detail::CG::CODEPLAY_HOST_TASK;
}

/// Defines and invokes a SYCL kernel function for the specified range and
/// offset.
///
Expand Down
1 change: 1 addition & 0 deletions sycl/source/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ set(SYCL_SOURCES
"sampler.cpp"
"stream.cpp"
"spirv_ops.cpp"
"interop_handle.cpp"
"$<$<PLATFORM_ID:Windows>:detail/windows_pi.cpp>"
"$<$<OR:$<PLATFORM_ID:Linux>,$<PLATFORM_ID:Darwin>>:detail/posix_pi.cpp>"
)
Expand Down
20 changes: 17 additions & 3 deletions sycl/source/detail/scheduler/commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ getPiEvents(const std::vector<EventImplPtr> &EventImpls) {

class DispatchHostTask {
ExecCGCommand *MThisCmd;
std::vector<interop_handle::ReqToMem> MReqToMem;

void waitForEvents() const {
std::map<const detail::plugin *, std::vector<EventImplPtr>>
Expand Down Expand Up @@ -187,7 +188,9 @@ class DispatchHostTask {
}

public:
DispatchHostTask(ExecCGCommand *ThisCmd) : MThisCmd{ThisCmd} {}
DispatchHostTask(ExecCGCommand *ThisCmd,
std::vector<interop_handle::ReqToMem> ReqToMem)
: MThisCmd{ThisCmd} {}

void operator()() const {
waitForEvents();
Expand All @@ -197,7 +200,17 @@ class DispatchHostTask {
CGHostTask &HostTask = static_cast<CGHostTask &>(MThisCmd->getCG());

// we're ready to call the user-defined lambda now
HostTask.MHostTask->call();
if (HostTask.MHostTask->isInteropTask()) {
auto Queue = HostTask.MQueue->get();
auto DeviceId = HostTask.MQueue->get_device().get();
auto Context = HostTask.MQueue->get_context().get();

interop_handle IH{MReqToMem, Queue, DeviceId, Context};

HostTask.MHostTask->call(IH);
} else
HostTask.MHostTask->call();

HostTask.MHostTask.reset();

// unblock user empty command here
Expand Down Expand Up @@ -1943,7 +1956,8 @@ cl_int ExecCGCommand::enqueueImp() {
}
}

MQueue->getThreadPool().submit<DispatchHostTask>(DispatchHostTask(this));
MQueue->getThreadPool().submit<DispatchHostTask>(DispatchHostTask(
this, std::move(ReqToMem)));

MShouldCompleteEventIfPossible = false;

Expand Down
9 changes: 5 additions & 4 deletions sycl/source/detail/scheduler/graph_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -927,10 +927,11 @@ void Scheduler::GraphBuilder::connectDepEvent(Command *const Cmd,
{
std::unique_ptr<detail::HostTask> HT(new detail::HostTask);
std::unique_ptr<detail::CG> ConnectCG(new detail::CGHostTask(
std::move(HT), /* Args = */ {}, /* ArgsStorage = */ {},
/* AccStorage = */ {}, /* SharedPtrStorage = */ {},
/* Requirements = */ {}, /* DepEvents = */ {DepEvent},
CG::CODEPLAY_HOST_TASK, /* Payload */ {}));
std::move(HT), /* Queue = */ {}, /* Context = */ {}, /* Args = */ {},
/* ArgsStorage = */ {}, /* AccStorage = */ {},
/* SharedPtrStorage = */ {}, /* Requirements = */ {},
/* DepEvents = */ {DepEvent}, CG::CODEPLAY_HOST_TASK,
/* Payload */ {}));
ConnectCmd = new ExecCGCommand(
std::move(ConnectCG), Scheduler::getInstance().getDefaultHostQueue());
}
Expand Down
7 changes: 4 additions & 3 deletions sycl/source/handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ event handler::finalize() {
break;
case detail::CG::CODEPLAY_HOST_TASK:
CommandGroup.reset(new detail::CGHostTask(
std::move(MHostTask), std::move(MArgs), std::move(MArgsStorage),
std::move(MAccStorage), std::move(MSharedPtrStorage),
std::move(MRequirements), std::move(MEvents), MCGType, MCodeLoc));
std::move(MHostTask), MQueue, MQueue->getContextImplPtr(),
std::move(MArgs), std::move(MArgsStorage), std::move(MAccStorage),
std::move(MSharedPtrStorage), std::move(MRequirements),
std::move(MEvents), MCGType, MCodeLoc));
break;
case detail::CG::BARRIER:
case detail::CG::BARRIER_WAITLIST:
Expand Down
202 changes: 202 additions & 0 deletions sycl/test/host-interop-task/interop-task-dependency.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// RUN: %clangxx -fsycl -fsycl-targets=%sycl_triple %s -o %t.out %threads_lib
// RUN: %CPU_RUN_PLACEHOLDER SYCL_PI_TRACE=-1 %t.out 2>&1 %CPU_CHECK_PLACEHOLDER
// RUN: %GPU_RUN_PLACEHOLDER SYCL_PI_TRACE=-1 %t.out 2>&1 %GPU_CHECK_PLACEHOLDER
// RUN: %ACC_RUN_PLACEHOLDER SYCL_PI_TRACE=-1 %t.out 2>&1 %ACC_CHECK_PLACEHOLDER

#include <atomic>
#include <condition_variable>
#include <future>
#include <mutex>
#include <thread>

#include <CL/sycl.hpp>

namespace S = cl::sycl;

struct Context {
std::atomic_bool Flag;
S::queue &Queue;
S::buffer<int, 1> Buf1;
S::buffer<int, 1> Buf2;
S::buffer<int, 1> Buf3;
std::mutex Mutex;
std::condition_variable CV;
};

void Thread1Fn(Context *Ctx) {
// 0. initialize resulting buffer with apriori wrong result
{
S::accessor<int, 1, S::access::mode::write,
S::access::target::host_buffer>
Acc(Ctx->Buf2);

for (size_t Idx = 0; Idx < Acc.get_count(); ++Idx)
Acc[Idx] = -1;
}

{
S::accessor<int, 1, S::access::mode::write,
S::access::target::host_buffer>
Acc(Ctx->Buf2);

for (size_t Idx = 0; Idx < Acc.get_count(); ++Idx)
Acc[Idx] = -2;
}

{
S::accessor<int, 1, S::access::mode::write,
S::access::target::host_buffer>
Acc(Ctx->Buf3);

for (size_t Idx = 0; Idx < Acc.get_count(); ++Idx)
Acc[Idx] = -3;
}

// 1. submit task writing to buffer 1
Ctx->Queue.submit([&](S::handler &CGH) {
S::accessor<int, 1, S::access::mode::write,
S::access::target::global_buffer>
GeneratorAcc(Ctx->Buf1, CGH);

auto GeneratorKernel = [GeneratorAcc]() {
for (size_t Idx = 0; Idx < GeneratorAcc.get_count(); ++Idx)
GeneratorAcc[Idx] = Idx;
};

CGH.single_task<class GeneratorTask>(GeneratorKernel);
});

// 2. submit host task writing from buf 1 to buf 2
auto HostTaskEvent = Ctx->Queue.submit([&](S::handler &CGH) {
S::accessor<int, 1, S::access::mode::read,
S::access::target::host_buffer>
CopierSrcAcc(Ctx->Buf1, CGH);
S::accessor<int, 1, S::access::mode::write,
S::access::target::host_buffer>
CopierDstAcc(Ctx->Buf2, CGH);

auto CopierHostTask = [CopierSrcAcc, CopierDstAcc, &Ctx](S::interop_handle IH) {
// TODO write through interop handle objects
//(void)IH.get_native_mem(CopierSrcAcc);
//(void)IH.get_native_mem(CopierDstAcc);
(void)IH.get_native_queue();
(void)IH.get_native_device();
(void)IH.get_native_context();
for (size_t Idx = 0; Idx < CopierDstAcc.get_count(); ++Idx)
CopierDstAcc[Idx] = CopierSrcAcc[Idx];

bool Expected = false;
bool Desired = true;
assert(Ctx->Flag.compare_exchange_strong(Expected, Desired));

{
std::lock_guard<std::mutex> Lock(Ctx->Mutex);
Ctx->CV.notify_all();
}
};

CGH.codeplay_host_task(CopierHostTask);
});

// 3. submit simple task to move data between two buffers
Ctx->Queue.submit([&](S::handler &CGH) {
S::accessor<int, 1, S::access::mode::read,
S::access::target::global_buffer>
SrcAcc(Ctx->Buf2, CGH);
S::accessor<int, 1, S::access::mode::write,
S::access::target::global_buffer>
DstAcc(Ctx->Buf3, CGH);

CGH.depends_on(HostTaskEvent);

auto CopierKernel = [SrcAcc, DstAcc]() {
for (size_t Idx = 0; Idx < DstAcc.get_count(); ++Idx)
DstAcc[Idx] = SrcAcc[Idx];
};

CGH.single_task<class CopierTask>(CopierKernel);
});

// 4. check data in buffer #3
{
S::accessor<int, 1, S::access::mode::read,
S::access::target::host_buffer>
Acc(Ctx->Buf3);

bool Failure = false;

for (size_t Idx = 0; Idx < Acc.get_count(); ++Idx) {
fprintf(stderr, "Third buffer [%3zu] = %i\n", Idx, Acc[Idx]);

Failure |= (Acc[Idx] != Idx);
}

assert(!Failure && "Invalid data in third buffer");
}
}

void Thread2Fn(Context *Ctx) {
std::unique_lock<std::mutex> Lock(Ctx->Mutex);

// T2.1. Wait until flag F is set eq true.
Ctx->CV.wait(Lock, [&Ctx] { return Ctx->Flag.load(); });

assert(Ctx->Flag.load());
}

void test() {
auto EH = [](S::exception_list EL) {
for (const std::exception_ptr &E : EL) {
throw E;
}
};

S::queue Queue(EH);

Context Ctx{{false}, Queue, {10}, {10}, {10}, {}, {}};

// 0. setup: thread 1 T1: exec smth; thread 2 T2: waits; init flag F = false
auto A1 = std::async(std::launch::async, Thread1Fn, &Ctx);
auto A2 = std::async(std::launch::async, Thread2Fn, &Ctx);

A1.get();
A2.get();

assert(Ctx.Flag.load());

// 3. check via host accessor that buf 2 contains valid data
{
S::accessor<int, 1, S::access::mode::read,
S::access::target::host_buffer>
ResultAcc(Ctx.Buf2);

bool failure = false;
for (size_t Idx = 0; Idx < ResultAcc.get_count(); ++Idx) {
fprintf(stderr, "Second buffer [%3zu] = %i\n", Idx, ResultAcc[Idx]);

failure |= (ResultAcc[Idx] != Idx);
}

assert(!failure && "Invalid data in result buffer");
}
}

int main() {
test();

return 0;
}

// launch of GeneratorTask kernel
// CHECK:---> piKernelCreate(
// CHECK: GeneratorTask
// CHECK:---> piEnqueueKernelLaunch(
// prepare for host task
// CHECK:---> piEnqueueMemBufferMap(
// launch of CopierTask kernel
// CHECK:---> piKernelCreate(
// CHECK: CopierTask
// CHECK:---> piEnqueueKernelLaunch(
// TODO need to check for piEventsWait as "wait on dependencies of host task".
// At the same time this piEventsWait may occur anywhere after
// piEnqueueMemBufferMap ("prepare for host task").