- 
                Notifications
    You must be signed in to change notification settings 
- Fork 3.9k
Description
TL;DR: PR 35087 introduced an explicit fail in large joins with Acero when key data is larger than 4GB (solving the problem reported by issue 34474). However, I think (but I'm not sure) that this quick fix is too restrictive because the total size condition is applied to the total size of tables to be joined, rather than to the size of keys. As a consequence, Acero fails when trying to merge large tables, even when the size of key data is well below 4 GB.
EDIT: It looks like this bug is actually related to the size of the right table, but unrelated to the size of the left table (see here).
In this issue I proceed in four steps. First, I show the test introduced by PR 35087 erroneously applies to the total size of data processed in the join, rather than the size of the key data. Second, I try to find the root cause of this behavior. Third, I discuss whether this is really a bug, or the expected behavior of Acero. Fourth, I suggest the sketch of a solution.
Description of the problem
PR 35087 introduced an explicit fail in large joins with Acero when key data is larger than 4GB (here). However, I discovered that this error message does disappear when I reduce the number of columns in the Arrow Tables I'm trying to merge.
In the following reproducible example, I generate a large Arrow Table and merge it with itself, increasing the number of columns in the right table. The merge works fine for a limited number of columns, then the error message pops up when the number of columns reaches a threshold (8 in my case).
As a consequence, Acero throws an error whenever I try to merge large Arrow Tables, even for tables with key data significantly smaller than 4 GB.
Reproducible example in R
library(stringi)
library(arrow)
library(dplyr)
# Generate a large number of rows with ONE heavy key column
n_rows <- 2e7
length_id <- 20
ids <- stringi::stri_rand_strings(n_rows, length = length_id)
# Create a large Arrow Table with heavy payloads
data <- data.frame(
  id = ids
) |> 
  as_arrow_table() |>
  mutate(
    # Create payload variables
    variable1   = id,
    variable2   = id,
    variable3   = id,
    variable4   = id,
    variable5   = id,
    variable6   = id,
    variable7   = id,
    variable8   = id,
    variable9   = id,
    variable10  = id
  ) |>
  compute()
vars <- names(data)[!names(data) %in% c("id")]
nb_var <- length(vars)
# Join the dataset with itself, with an increasing number of variables
lapply(1:nb_var, function(n) {
  print(paste0("Doing the join with ", n+1, " variables"))
  vars_temp <- c("id", vars[1:n])
  print(vars_temp)
  data_out <- data |> 
    left_join(
      data |>
        select(all_of(vars_temp)),
      by = c("id" = "id")
    ) |>
    compute()
  return("Success!")
})
Cause of the problem
I dived in the C++ source code of Acero to understand the problem. Disclaimer: I do not know anything about C++, so my report might be messy from here on.
PR 35087 introduced a logical test in Status RowArrayMerge::PrepareForMerge(). This test computes what I understand to be the size of the sources (here).
I think the problem comes from the fact that the Status RowArrayMerge::PrepareForMerge() is called twice in SwissTableForJoinBuild::PreparePrtnMerge(): once for the keys (here) and once for payload variables (here). My intuition is that, when applied to payload variables, the logical test actually computes the size of the payload variables, so more or less the size of the tables to be joined. EDIT: It looks like this bug is actually related to the size of the right table, but unrelated to the size of the left table (see here).
Is this really a bug?
Given that I don't know how Acero performs joins, I'm not entirely sure whether the 4 GB size limit mentioned by @westonpace in this message applies to the size of the keys or to the size of the tables to be joined. My understanding of the discussion in the issue and of the error message is that the size limit applies to keys, so the behavior I describe should be considered as a bug. But maybe I misunderstood, and the size limit applies to table size, so the behavior I describe should be considered as the expected one.
In other words: what is exactly the issue with Acero? It cannot join heavy tables, or it cannot join tables with heavy keys?
Suggestion of solution
If the behavior I describe is an actual bug, a potential solution could look like this:
- Introducing an additional boolean argument in RowArrayMerge::PrepareForMerge(). If this argument is TRUE, then the logical test (here) would be performed. If FALSE, it would not be performed;
- This additional argument would be set to TRUE in the first call to RowArrayMerge::PrepareForMerge()(here);
- This additional argument would be set to FALSE in the second call to RowArrayMerge::PrepareForMerge()(here).
With the help of Chat-GPT, I opened a PR suggesting an (untested) solution (here).
Component(s)
C++