-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hive: Fix join issues when CBO is enabled #2052
Conversation
if (configuration.get(InputFormatConfig.TABLE_SCHEMA) != null) { | ||
tableSchema = SchemaParser.fromJson(configuration.get(InputFormatConfig.TABLE_SCHEMA)); | ||
} else if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) { | ||
if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed?
The original intent of the change was that we have the table schema at hand on the mappers/reducers. If we remove this then every mapper/reducer has to read the table once to get the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As show below:
- Mapper handles input split belongs to one table:
https://github.com/apache/hive/blob/113f6af7528f016bf821f7a746bad496cc93f834/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java#L406 - Function copyTableJobPropertiesToConf copies 'iceberg.mr.table.schema' property to jobConf:
https://github.com/apache/hive/blob/113f6af7528f016bf821f7a746bad496cc93f834/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java#L2427-L2443
if we join two tables, only one iceberg schema exists in jobConf which leads wrong inspector:
- empty inspector(non-overlap in two table selected columns, e.g.
SELECT o.order_id, o.customer_id, o.total, p.name FROM default.orders o JOIN default.products p ON o.product_id = p.id ORDER BY o.order_id
selected columns in table default.orders: [order_id, total, customer_id, product_id]
selected columns in table default.products: [id, name]) - incomplete inspector(overlap in two table selected columns, e.g.
SELECT c.first_name, o.order_id FROM default.orders o JOIN default.customers c ON o.customer_id = c.customer_id ORDER BY o.order_id DESC
selected columns in table default.customers: [first_name, customer_id]
selected columns in table default.orders: [order_id, customer_id]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! This is something @marton-bod already tried his hands on. We might have to use the table identifier to prefix/postfix the schema....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @marton-bod , is there any thread work on the proposal @pvary mentioned above? I can not find any related issue on github.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @qphien . Indeed, we have had a similar problem before in #1708.
My initial solution to fix it was to add the schema into the config object with a prefix based on the table identifier. For example, when joining together default.orders and default.customers, you'd have two properties in the config: default.orders.iceberg.mr.table.schema
and default.customers.iceberg.mr.table.schema
. This should allow you to add the schema for multiple tables without collisions/overwrites.
Eventually we ended up not adding in this fix, but instead just reverted the commit that caused the problem for us. However, this prefix solution can be one way to make things work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think getting table schema from serDeProperties
has the same effect as prefix solution. iceberg.mr.table.schema
in serDeProperties
is overlayed from corresponding tableDesc, so we can get correct table schema when HiveIcebergSerde
is initialized.
// When same table is joined multiple times, it is possible some selected columns are duplicated, | ||
// in this case wrong recordStructField position leads wrong value or ArrayIndexOutOfBoundException | ||
String[] distinctSelectedColumns = Arrays.stream(selectedColumns).distinct().toArray(String[]::new); | ||
Schema projectedSchema = distinctSelectedColumns.length > 0 ? | ||
tableSchema.select(distinctSelectedColumns) : tableSchema; | ||
// the input split mapper handles does not belong to this table | ||
// it is necessary to ensure projectedSchema equals to tableSchema, | ||
// or we cannot find selectOperator's column from inspector | ||
if (projectedSchema.columns().size() != distinctSelectedColumns.length) { | ||
projectedSchema = tableSchema; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marton-bod: Could you please take a look? You know more about the schema projection.
Thanks,
Peter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good generally, but I wanted to clarify this comment:
// the input split mapper handles does not belong to this table // it is necessary to ensure projectedSchema equals to tableSchema, // or we cannot find selectOperator's column from inspector
Just for my understanding, can you give an example in what scenario we could face this issue where the Schema.select() gives back a different number of columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With test case testSelectedColumnsOverlapJoin
, assuming that mapper is handling split belongs to table default.orders
, columns set in hive.io.file.readcolumn.names
are [order_id, customer_id], the inspector created for table default.customers
just contains column [customer_id], when table default.customers
selectOperator is initialized, field first_name
cannot be found from inspector we just created, so exception below is thrown
cannot find field first_name from [org.apache.iceberg.mr.hive.serde.objectinspector.IcebergRecordObjectInspector$IcebergRecordStructField@a6807525]
at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.getStandardStructFieldRef
The cause of this exception is that the schema get from Schema.select()
is not what we want, returning an inspector contains all table columns is an easy workaround to fix this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation!
|
||
return testTableType.instance(shell.metastore().hiveConf(), temp); | ||
} | ||
|
||
static void init(TestHiveShell shell, TestTables testTables, TemporaryFolder temp, String engine) { | ||
init(shell, testTables, temp, engine, "false"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use a boolean instead of the string?
import static org.apache.iceberg.types.Types.NestedField.required; | ||
import static org.junit.runners.Parameterized.Parameter; | ||
import static org.junit.runners.Parameterized.Parameters; | ||
|
||
@RunWith(Parameterized.class) | ||
public class TestHiveIcebergStorageHandlerWithEngine { | ||
|
||
private static final String[] EXECUTION_ENGINES = new String[] {"tez", "mr"}; | ||
private static final String[] EXECUTION_ENGINES = new String[]{"tez", "mr"}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I usually try to avoid formatting only changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, it's my fault, my IDEA automatically reformat code. I will pay more attention next time.
private static final String[] EXECUTION_ENGINES = new String[] {"tez", "mr"}; | ||
private static final String[] EXECUTION_ENGINES = new String[]{"tez", "mr"}; | ||
|
||
private static final String[] CBO_ENABLES = new String[]{"true", "false"}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might not need this list as the values can not be extended even on the long run. Could we just add this by hand at the parameters()
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, i will move CBO_ENABLES list to parameters
method
@@ -107,6 +126,9 @@ | |||
@Parameter(2) | |||
public TestTables.TestTableType testTableType; | |||
|
|||
@Parameter(3) | |||
public String cboEnable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boolean please
shell.executeStatement("SELECT first_name, customer_id FROM default.customers ORDER BY customer_id DESC"); | ||
|
||
Assert.assertEquals(3, descRows.size()); | ||
Assert.assertArrayEquals(new Object[] {"Trudy", 2L}, descRows.get(0)); | ||
Assert.assertArrayEquals(new Object[] {"Bob", 1L}, descRows.get(1)); | ||
Assert.assertArrayEquals(new Object[] {"Alice", 0L}, descRows.get(2)); | ||
Assert.assertArrayEquals(new Object[]{"Trudy", 2L}, descRows.get(0)); | ||
Assert.assertArrayEquals(new Object[]{"Bob", 1L}, descRows.get(1)); | ||
Assert.assertArrayEquals(new Object[]{"Alice", 0L}, descRows.get(2)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I right that these are formatting only changes?
It is much easier if we do not have them, so I usually try to avoid them in my PRs
"SELECT o1.order_id, o1.customer_id, o1.total " + | ||
"FROM default.orders o1 JOIN default.orders o2 ON o1.order_id = o2.order_id ORDER BY o1.order_id" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this change needed?
Could you please help?
Thanks,
Peter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As show in https://github.com/apache/hive/blob/113f6af7528f016bf821f7a746bad496cc93f834/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java#L992-L998,
self join can lead duplicated columns in jobConf hive.io.file.readcolumn.names
.
In this case, it is necessary to test whether self join can work correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh.. I have missed that the FROM part is from the orders table.
Sorry
Thanks for the patch @qphien! Asked a few questions in the review comments. The general observations:
Thanks, |
I'm not sure whether these failure tests are related to this PR. I tested locally and all test cases were passed.
|
public static Collection<Object[]> parameters() { | ||
Collection<Object[]> testParams = new ArrayList<>(); | ||
String javaVersion = System.getProperty("java.specification.version"); | ||
List<Boolean> cboEnables = ImmutableList.of(true, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of running all tests for both CBO on/off, can we turn it on just in those one or two unit test cases where we want to test it, by setting it via shell.setHiveSessionValue()
at the beginning of the test? We're trying to prevent the number of test runs from exploding via the combination of ever-increasing test parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marton-bod OK, I have moved CBO test to some join test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for picking this up @qphien
Thanks for reviewing, @marton-bod and @pvary. Have all your concerns been addressed? |
@rdblue yes, it should be good to go from my side. Thanks! |
+1 from my side too |
Thanks for fixing this, @qphien! And thanks for reviewing, @marton-bod and @pvary! |
Co-authored-by: 罗冲 <luochong@corp.netease.com>
After enabling CBO in Hive, there are some issues on MR when two iceberg table are joined. For example:
These issues also happen with iceberg - non-iceberg table joins.