Skip to content

Commit 6508c5a

Browse files
committed
Rework concurrency in test, use Arrow Future
1 parent ec9b054 commit 6508c5a

File tree

1 file changed

+28
-14
lines changed

1 file changed

+28
-14
lines changed

cpp/src/arrow/dataset/file_parquet_encryption_test.cc

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
#include <future>
1918
#include <string_view>
2019

2120
#include "gtest/gtest.h"
@@ -33,6 +32,8 @@
3332
#include "arrow/testing/gtest_util.h"
3433
#include "arrow/testing/random.h"
3534
#include "arrow/type.h"
35+
#include "arrow/util/future.h"
36+
#include "arrow/util/thread_pool.h"
3637
#include "parquet/arrow/reader.h"
3738
#include "parquet/encryption/crypto_factory.h"
3839
#include "parquet/encryption/encryption_internal.h"
@@ -166,22 +167,35 @@ class DatasetEncryptionTestBase : public testing::TestWithParam<CompressionParam
166167
// Create the dataset
167168
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());
168169

169-
std::vector<std::future<Result<std::shared_ptr<Table>>>> threads;
170+
if (concurrently) {
171+
// start with a single thread so we are more likely to build up a queue of jobs
172+
ASSERT_OK_AND_ASSIGN(auto pool, arrow::internal::ThreadPool::Make(1));
173+
std::vector<Future<std::shared_ptr<Table>>> threads;
174+
175+
// Read dataset above multiple times concurrently to see that is thread-safe.
176+
for (size_t i = 0; i < 100; ++i) {
177+
threads.push_back(
178+
DeferNotOk(pool->Submit(DatasetEncryptionTestBase::read, dataset)));
179+
}
170180

171-
// Read dataset above multiple times concurrently to see that is thread-safe.
172-
// Reuse the dataset above to scan it twice to make sure decryption works correctly.
173-
const size_t attempts = concurrently ? 1000 : 2;
174-
for (size_t i = 0; i < attempts; ++i) {
175-
if (concurrently) {
176-
threads.push_back(std::async(DatasetEncryptionTestBase::read, dataset));
177-
} else {
178-
ASSERT_OK_AND_ASSIGN(auto read_table, read(dataset));
181+
// ramp up parallelism
182+
ASSERT_OK(pool->SetCapacity(16));
183+
// ensure there are sufficient jobs to see concurrent processing
184+
ASSERT_GT(pool->GetNumTasks(), 16);
185+
printf("%d", pool->GetNumTasks());
186+
187+
// wait for all jobs to finish
188+
pool->WaitForIdle();
189+
190+
// assert correctness of jobs
191+
for (auto& thread : threads) {
192+
ASSERT_OK_AND_ASSIGN(auto read_table, thread.result());
179193
AssertTablesEqual(*read_table, *table_);
180194
}
181-
}
182-
if (concurrently) {
183-
for (auto& thread : threads) {
184-
ASSERT_OK_AND_ASSIGN(auto read_table, thread.get());
195+
} else {
196+
// Reuse the dataset above to scan it twice to make sure decryption works correctly.
197+
for (size_t i = 0; i < 2; ++i) {
198+
ASSERT_OK_AND_ASSIGN(auto read_table, read(dataset));
185199
AssertTablesEqual(*read_table, *table_);
186200
}
187201
}

0 commit comments

Comments
 (0)