Skip to content

Commit 861b5da

Browse files
lidavidmpitrou
authored andcommitted
ARROW-12760: [C++][Python][R] Allow setting I/O thread pool size
This adds functions to change the pool size at runtime, but doesn't adjust the default size of 8. Closes #10316 from lidavidm/arrow-12760 Authored-by: David Li <li.davidm96@gmail.com> Signed-off-by: Antoine Pitrou <antoine@python.org>
1 parent 943d2bd commit 861b5da

File tree

16 files changed

+189
-1
lines changed

16 files changed

+189
-1
lines changed

cpp/src/arrow/io/interfaces.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include "arrow/buffer.h"
3131
#include "arrow/io/concurrency.h"
32+
#include "arrow/io/type_fwd.h"
3233
#include "arrow/io/util_internal.h"
3334
#include "arrow/result.h"
3435
#include "arrow/status.h"
@@ -53,6 +54,12 @@ IOContext::IOContext(MemoryPool* pool, StopToken stop_token)
5354

5455
const IOContext& default_io_context() { return g_default_io_context; }
5556

57+
int GetIOThreadPoolCapacity() { return internal::GetIOThreadPool()->GetCapacity(); }
58+
59+
Status SetIOThreadPoolCapacity(int threads) {
60+
return internal::GetIOThreadPool()->SetCapacity(threads);
61+
}
62+
5663
FileInterface::~FileInterface() = default;
5764

5865
Status FileInterface::Abort() { return Close(); }

cpp/src/arrow/io/memory_test.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,5 +861,15 @@ TEST(CacheOptions, Basics) {
861861
check(CacheOptions::MakeFromNetworkMetrics(5, 500, .75, 5), 2.5, 5);
862862
}
863863

864+
TEST(IOThreadPool, Capacity) {
865+
// Simple sanity check
866+
auto pool = internal::GetIOThreadPool();
867+
int capacity = pool->GetCapacity();
868+
ASSERT_GT(capacity, 0);
869+
ASSERT_EQ(GetIOThreadPoolCapacity(), capacity);
870+
ASSERT_OK(SetIOThreadPoolCapacity(capacity + 1));
871+
ASSERT_EQ(GetIOThreadPoolCapacity(), capacity + 1);
872+
}
873+
864874
} // namespace io
865875
} // namespace arrow

cpp/src/arrow/io/type_fwd.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#pragma once
1919

20+
#include "arrow/type_fwd.h"
2021
#include "arrow/util/visibility.h"
2122

2223
namespace arrow {
@@ -33,6 +34,23 @@ struct CacheOptions;
3334
ARROW_EXPORT
3435
const IOContext& default_io_context();
3536

37+
/// \brief Get the capacity of the global I/O thread pool
38+
///
39+
/// Return the number of worker threads in the thread pool to which
40+
/// Arrow dispatches various I/O-bound tasks. This is an ideal number,
41+
/// not necessarily the exact number of threads at a given point in time.
42+
///
43+
/// You can change this number using SetIOThreadPoolCapacity().
44+
ARROW_EXPORT int GetIOThreadPoolCapacity();
45+
46+
/// \brief Set the capacity of the global I/O thread pool
47+
///
48+
/// Set the number of worker threads in the thread pool to which
49+
/// Arrow dispatches various I/O-bound tasks.
50+
///
51+
/// The current number is returned by GetIOThreadPoolCapacity().
52+
ARROW_EXPORT Status SetIOThreadPoolCapacity(int threads);
53+
3654
class FileInterface;
3755
class Seekable;
3856
class Writable;

python/pyarrow/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ def parse_git(root, **kwargs):
6767
from pyarrow.lib import (BuildInfo, RuntimeInfo, VersionInfo,
6868
cpp_build_info, cpp_version, cpp_version_info,
6969
runtime_info, cpu_count, set_cpu_count,
70-
enable_signal_handlers)
70+
enable_signal_handlers,
71+
io_thread_count, set_io_thread_count)
7172

7273

7374
def show_versions():

python/pyarrow/includes/libarrow.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,8 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
11631163
CIOContext(CMemoryPool*, CStopToken)
11641164

11651165
CIOContext c_default_io_context "arrow::io::default_io_context"()
1166+
int GetIOThreadPoolCapacity()
1167+
CStatus SetIOThreadPoolCapacity(int threads)
11661168

11671169
cdef cppclass FileStatistics:
11681170
int64_t size

python/pyarrow/io.pxi

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,46 @@ cdef extern from "Python.h":
4242
char *v, Py_ssize_t len) except NULL
4343

4444

45+
def io_thread_count():
46+
"""
47+
Return the number of threads to use for I/O operations.
48+
49+
Many operations, such as scanning a dataset, will implicitly make
50+
use of this pool. The number of threads is set to a fixed value at
51+
startup. It can be modified at runtime by calling
52+
:func:`set_io_thread_count()`.
53+
54+
See Also
55+
--------
56+
set_io_thread_count : Modify the size of this pool.
57+
cpu_count : The analogous function for the CPU thread pool.
58+
"""
59+
return GetIOThreadPoolCapacity()
60+
61+
62+
def set_io_thread_count(int count):
63+
"""
64+
Set the number of threads to use for I/O operations.
65+
66+
Many operations, such as scanning a dataset, will implicitly make
67+
use of this pool.
68+
69+
Parameters
70+
----------
71+
count : int
72+
The max number of threads that may be used for I/O.
73+
Must be positive.
74+
75+
See Also
76+
--------
77+
io_thread_count : Get the size of this pool.
78+
set_cpu_count : The analogous function for the CPU thread pool.
79+
"""
80+
if count < 1:
81+
raise ValueError("IO thread count must be strictly positive")
82+
check_status(SetIOThreadPoolCapacity(count))
83+
84+
4585
cdef class NativeFile(_Weakrefable):
4686
"""
4787
The base class for all Arrow streams.

python/pyarrow/lib.pyx

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,23 @@ def cpu_count():
4848
If neither is present, it will default to the number of hardware threads
4949
on the system. It can be modified at runtime by calling
5050
:func:`set_cpu_count()`.
51+
52+
See Also
53+
--------
54+
set_cpu_count : Modify the size of this pool.
55+
io_thread_count : The analogous function for the I/O thread pool.
5156
"""
5257
return GetCpuThreadPoolCapacity()
5358

5459

5560
def set_cpu_count(int count):
5661
"""
5762
Set the number of threads to use in parallel operations.
63+
64+
See Also
65+
--------
66+
cpu_count : Get the size of this pool.
67+
set_io_thread_count : The analogous function for the I/O thread pool.
5868
"""
5969
if count < 1:
6070
raise ValueError("CPU count must be strictly positive")

python/pyarrow/tests/test_misc.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ def test_cpu_count():
4545
pa.set_cpu_count(n)
4646

4747

48+
def test_io_thread_count():
49+
n = pa.io_thread_count()
50+
assert n > 0
51+
try:
52+
pa.set_io_thread_count(n + 5)
53+
assert pa.io_thread_count() == n + 5
54+
finally:
55+
pa.set_io_thread_count(n)
56+
57+
4858
def test_build_info():
4959
assert isinstance(pa.cpp_build_info, pa.BuildInfo)
5060
assert isinstance(pa.cpp_version_info, pa.VersionInfo)

r/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ export(int16)
229229
export(int32)
230230
export(int64)
231231
export(int8)
232+
export(io_thread_count)
232233
export(is_in)
233234
export(large_binary)
234235
export(large_list_of)
@@ -261,6 +262,7 @@ export(record_batch)
261262
export(s3_bucket)
262263
export(schema)
263264
export(set_cpu_count)
265+
export(set_io_thread_count)
264266
export(starts_with)
265267
export(string)
266268
export(struct)

r/R/arrowExports.R

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)