Skip to content

Commit 66c885c

Browse files
Merge pull request #751 from IntelPython/feature/waitless-SyclEvent_deleter
SyclEvent deleter no longer waits
2 parents 05a5abd + 8effe77 commit 66c885c

File tree

5 files changed

+199
-5
lines changed

5 files changed

+199
-5
lines changed

dpctl/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ foreach(_cy_file ${_cython_sources})
161161
build_dpctl_ext(${_trgt} ${_cy_file} "dpctl")
162162
endforeach()
163163

164+
target_include_directories(_sycl_queue PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})
165+
164166
add_subdirectory(program)
165167
add_subdirectory(memory)
166168
add_subdirectory(tensor)

dpctl/_host_task_util.hpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
//===--- _host_tasl_util.hpp - Implements async DECREF =//
2+
//
3+
// Data Parallel Control (dpctl)
4+
//
5+
// Copyright 2020-2021 Intel Corporation
6+
//
7+
// Licensed under the Apache License, Version 2.0 (the "License");
8+
// you may not use this file except in compliance with the License.
9+
// You may obtain a copy of the License at
10+
//
11+
// http://www.apache.org/licenses/LICENSE-2.0
12+
//
13+
// Unless required by applicable law or agreed to in writing, software
14+
// distributed under the License is distributed on an "AS IS" BASIS,
15+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
// See the License for the specific language governing permissions and
17+
// limitations under the License.
18+
//
19+
//===----------------------------------------------------------------------===//
20+
///
21+
/// \file
22+
/// This file implements a utility function to schedule host task to a sycl
23+
/// queue depending on given array of sycl events to decrement reference counts
24+
/// for the given array of Python objects.
25+
///
26+
/// N.B.: The host task attempts to acquire GIL, so queue wait, event wait and
27+
/// other synchronization mechanisms should be called after releasing the GIL to
28+
/// avoid deadlocks.
29+
///
30+
//===----------------------------------------------------------------------===//
31+
32+
#include "Python.h"
33+
#include "syclinterface/dpctl_data_types.h"
34+
#include <CL/sycl.hpp>
35+
36+
int async_dec_ref(DPCTLSyclQueueRef QRef,
37+
PyObject **obj_array,
38+
size_t obj_array_size,
39+
DPCTLSyclEventRef *ERefs,
40+
size_t nERefs)
41+
{
42+
43+
sycl::queue *q = reinterpret_cast<sycl::queue *>(QRef);
44+
45+
std::vector<PyObject *> obj_vec;
46+
obj_vec.reserve(obj_array_size);
47+
for (size_t obj_id = 0; obj_id < obj_array_size; ++obj_id) {
48+
obj_vec.push_back(obj_array[obj_id]);
49+
}
50+
51+
try {
52+
q->submit([&](sycl::handler &cgh) {
53+
for (size_t ev_id = 0; ev_id < nERefs; ++ev_id) {
54+
cgh.depends_on(
55+
*(reinterpret_cast<sycl::event *>(ERefs[ev_id])));
56+
}
57+
cgh.host_task([obj_array_size, obj_vec]() {
58+
{
59+
PyGILState_STATE gstate;
60+
gstate = PyGILState_Ensure();
61+
for (size_t i = 0; i < obj_array_size; ++i) {
62+
Py_DECREF(obj_vec[i]);
63+
}
64+
PyGILState_Release(gstate);
65+
}
66+
});
67+
});
68+
} catch (const std::exception &e) {
69+
return 1;
70+
}
71+
72+
return 0;
73+
}

dpctl/_sycl_event.pyx

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ cdef class _SyclEvent:
9898

9999
def __dealloc__(self):
100100
if (self._event_ref):
101-
with nogil: DPCTLEvent_Wait(self._event_ref)
102101
DPCTLEvent_Delete(self._event_ref)
103102
self._event_ref = NULL
104103
self.args = None

dpctl/_sycl_queue.pyx

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,17 @@ import ctypes
6565
from .enum_types import backend_type
6666

6767
from cpython cimport pycapsule
68+
from cpython.ref cimport Py_DECREF, Py_INCREF, PyObject
6869
from libc.stdlib cimport free, malloc
6970

7071
import collections.abc
7172
import logging
7273

74+
75+
cdef extern from "_host_task_util.hpp":
76+
int async_dec_ref(DPCTLSyclQueueRef, PyObject **, size_t, DPCTLSyclEventRef *, size_t) nogil
77+
78+
7379
__all__ = [
7480
"SyclQueue",
7581
"SyclKernelInvalidRangeError",
@@ -714,12 +720,14 @@ cdef class SyclQueue(_SyclQueue):
714720
cdef _arg_data_type *kargty = NULL
715721
cdef DPCTLSyclEventRef *depEvents = NULL
716722
cdef DPCTLSyclEventRef Eref = NULL
717-
cdef int ret
723+
cdef int ret = 0
718724
cdef size_t gRange[3]
719725
cdef size_t lRange[3]
720726
cdef size_t nGS = len(gS)
721727
cdef size_t nLS = len(lS) if lS is not None else 0
722728
cdef size_t nDE = len(dEvents) if dEvents is not None else 0
729+
cdef PyObject **arg_objects = NULL
730+
cdef ssize_t i = 0
723731

724732
# Allocate the arrays to be sent to DPCTLQueue_Submit
725733
kargs = <void**>malloc(len(args) * sizeof(void*))
@@ -820,8 +828,24 @@ cdef class SyclQueue(_SyclQueue):
820828
raise SyclKernelSubmitError(
821829
"Kernel submission to Sycl queue failed."
822830
)
823-
824-
return SyclEvent._create(Eref, args)
831+
# increment reference counts to each argument
832+
arg_objects = <PyObject **>malloc(len(args) * sizeof(PyObject *))
833+
for i in range(len(args)):
834+
arg_objects[i] = <PyObject *>(args[i])
835+
Py_INCREF(<object> arg_objects[i])
836+
837+
# schedule decrement
838+
if async_dec_ref(self.get_queue_ref(), arg_objects, len(args), &Eref, 1):
839+
# async task submission failed, decrement ref counts and wait
840+
for i in range(len(args)):
841+
arg_objects[i] = <PyObject *>(args[i])
842+
Py_DECREF(<object> arg_objects[i])
843+
with nogil: DPCTLEvent_Wait(Eref)
844+
845+
# free memory
846+
free(arg_objects)
847+
848+
return SyclEvent._create(Eref, [])
825849

826850
cpdef void wait(self):
827851
with nogil: DPCTLQueue_Wait(self._queue_ref)

dpctl/tests/test_sycl_kernel_submit.py

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import dpctl
2626
import dpctl.memory as dpctl_mem
2727
import dpctl.program as dpctl_prog
28+
import dpctl.tensor as dpt
2829

2930

3031
@pytest.mark.parametrize(
@@ -107,4 +108,99 @@ def test_create_program_from_source(ctype_str, dtype, ctypes_ctor):
107108
ref_c = a * np.array(d, dtype=dtype) + b
108109
host_dt, device_dt = timer.dt
109110
assert type(host_dt) is float and type(device_dt) is float
110-
assert np.allclose(c, ref_c), "Faled for {}, {}".formatg(r, lr)
111+
assert np.allclose(c, ref_c), "Failed for {}, {}".formatg(r, lr)
112+
113+
114+
def test_async_submit():
115+
try:
116+
q = dpctl.SyclQueue("opencl")
117+
except dpctl.SyclQueueCreationError:
118+
pytest.skip("OpenCL queue could not be created")
119+
oclSrc = (
120+
"kernel void kern1(global unsigned int *res, unsigned int mod) {"
121+
" size_t index = get_global_id(0);"
122+
" int ri = (index % mod);"
123+
" res[index] = (ri * ri) % mod;"
124+
"}"
125+
" "
126+
"kernel void kern2(global unsigned int *res, unsigned int mod) {"
127+
" size_t index = get_global_id(0);"
128+
" int ri = (index % mod);"
129+
" int ri2 = (ri * ri) % mod;"
130+
" res[index] = (ri2 * ri) % mod;"
131+
"}"
132+
" "
133+
"kernel void kern3("
134+
" global unsigned int *res, global unsigned int *arg1, "
135+
" global unsigned int *arg2)"
136+
"{"
137+
" size_t index = get_global_id(0);"
138+
" res[index] = "
139+
" (arg1[index] < arg2[index]) ? arg1[index] : arg2[index];"
140+
"}"
141+
)
142+
prog = dpctl_prog.create_program_from_source(q, oclSrc)
143+
kern1Kernel = prog.get_sycl_kernel("kern1")
144+
kern2Kernel = prog.get_sycl_kernel("kern2")
145+
kern3Kernel = prog.get_sycl_kernel("kern3")
146+
147+
assert isinstance(kern1Kernel, dpctl_prog.SyclKernel)
148+
assert isinstance(kern2Kernel, dpctl_prog.SyclKernel)
149+
assert isinstance(kern2Kernel, dpctl_prog.SyclKernel)
150+
151+
n = 1024 * 1024
152+
X = dpt.empty((3, n), dtype="u4", usm_type="device", sycl_queue=q)
153+
first_row = dpctl_mem.as_usm_memory(X[0])
154+
second_row = dpctl_mem.as_usm_memory(X[1])
155+
third_row = dpctl_mem.as_usm_memory(X[2])
156+
157+
e1 = q.submit(
158+
kern1Kernel,
159+
[
160+
first_row,
161+
ctypes.c_uint(17),
162+
],
163+
[
164+
n,
165+
],
166+
)
167+
e2 = q.submit(
168+
kern2Kernel,
169+
[
170+
second_row,
171+
ctypes.c_uint(27),
172+
],
173+
[
174+
n,
175+
],
176+
)
177+
e3 = q.submit(
178+
kern3Kernel,
179+
[third_row, first_row, second_row],
180+
[
181+
n,
182+
],
183+
None,
184+
[e1, e2],
185+
)
186+
status_complete = dpctl.event_status_type.complete
187+
assert not all(
188+
[
189+
e == status_complete
190+
for e in (
191+
e1.execution_status,
192+
e2.execution_status,
193+
e3.execution_status,
194+
)
195+
]
196+
)
197+
198+
e3.wait()
199+
Xnp = dpt.asnumpy(X)
200+
Xref = np.empty((3, n), dtype="u4")
201+
for i in range(n):
202+
Xref[0, i] = (i * i) % 17
203+
Xref[1, i] = (i * i * i) % 27
204+
Xref[2, i] = min(Xref[0, i], Xref[1, i])
205+
206+
assert np.array_equal(Xnp, Xref)

0 commit comments

Comments
 (0)