Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable batched multi-source reading of JSONL files with large records #16687

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixed bad test
  • Loading branch information
shrshi committed Aug 30, 2024
commit b8e985ec248af9b9c8f0fbc5a529c7b4caa08741
74 changes: 0 additions & 74 deletions cpp/tests/io/json/json_chunked_reader.cu
Original file line number Diff line number Diff line change
Expand Up @@ -172,77 +172,3 @@ TEST_F(JsonReaderTest, ByteRange_MultiSource)
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view());
}
}

TEST_F(JsonReaderTest, ByteRangeWithRealloc_MultiSource)
{
std::string long_string = "haha";
std::size_t log_repetitions = 10;
long_string.reserve(long_string.size() * log_repetitions);
for (std::size_t i = 0; i < log_repetitions; i++) {
long_string += long_string;
}

auto json_string = [&long_string]() {
std::string json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 }
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 })";
std::string replace_chars = "c";
std::size_t pos = json_string.find(replace_chars);
while (pos != std::string::npos) {
// Replace the substring with the specified string
json_string.replace(pos, replace_chars.size(), long_string);

// Find the next occurrence of the substring
pos = json_string.find(replace_chars, pos + long_string.size());
}
return json_string;
}();

auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json";
{
std::ofstream outfile(filename, std::ofstream::out);
outfile << json_string;
}

constexpr int num_sources = 5;
std::vector<std::string> filepaths(num_sources, filename);

// Initialize parsing options (reading json lines)
cudf::io::json_reader_options json_lines_options =
cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths})
.lines(true)
.compression(cudf::io::compression_type::NONE)
.recovery_mode(cudf::io::json_recovery_mode_t::FAIL);

// Read full test data via existing, nested JSON lines reader
cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options);

auto file_paths = json_lines_options.get_source().filepaths();
std::vector<std::unique_ptr<cudf::io::datasource>> datasources;
for (auto& fp : file_paths) {
datasources.emplace_back(cudf::io::datasource::create(fp));
}

// Test for different chunk sizes
std::vector<int> chunk_sizes{7, 10, 15, 20, 40, 50, 100, 200, 500, 1000, 2000};
for (auto chunk_size : chunk_sizes) {
auto const tables = split_byte_range_reading(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());

auto table_views = std::vector<cudf::table_view>(tables.size());
std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) {
return table.tbl->view();
});
auto result = cudf::concatenate(table_views);

// Verify that the data read via chunked reader matches the data read via nested JSON reader
// cannot use EQUAL due to concatenate removing null mask
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view());
break;
}
}
44 changes: 44 additions & 0 deletions cpp/tests/io/json/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,50 @@ TEST_F(JsonReaderTest, JsonLinesByteRange)
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int64_wrapper{{3000, 4000, 5000}});
}

TEST_F(JsonReaderTest, JsonLinesByteRangeWithRealloc)
{
std::string long_string = "haha";
std::size_t log_repetitions = 12;
long_string.reserve(long_string.size() * (1UL << log_repetitions));
Copy link
Contributor

@vuule vuule Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit better than std::pow (that I suggested) :D

for (std::size_t i = 0; i < log_repetitions; i++) {
long_string += long_string;
}

auto json_string = [&long_string]() {
std::string json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 }
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 })";
std::string replace_chars = "c";
std::size_t pos = json_string.find(replace_chars);
while (pos != std::string::npos) {
// Replace the substring with the specified string
json_string.replace(pos, replace_chars.size(), long_string);

// Find the next occurrence of the substring
pos = json_string.find(replace_chars, pos + long_string.size());
}
return json_string;
}();

// Initialize parsing options (reading json lines). Set byte range offset and size so as to read the second row of input
cudf::io::json_reader_options json_lines_options =
cudf::io::json_reader_options::builder(cudf::io::source_info{cudf::host_span<std::byte>(reinterpret_cast<std::byte*>(json_string.data()), json_string.size())})
.lines(true)
.compression(cudf::io::compression_type::NONE)
.recovery_mode(cudf::io::json_recovery_mode_t::FAIL)
.byte_range_offset(16430)
.byte_range_size(30);

// Read full test data via existing, nested JSON lines reader
cudf::io::table_with_metadata result = cudf::io::read_json(json_lines_options);

EXPECT_EQ(result.tbl->num_columns(), 3);
EXPECT_EQ(result.tbl->num_rows(), 1);
EXPECT_EQ(result.metadata.schema_info[2].name, long_string);
}

TEST_F(JsonReaderTest, JsonLinesMultipleFilesByteRange_AcrossFiles)
{
const std::string file1 = temp_env->get_temp_dir() + "JsonLinesMultipleFilesByteRangeTest1.json";
Expand Down