Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hive: Fix join issues when CBO is enabled #2052

Merged
merged 3 commits into from
Jan 15, 2021
Merged

Conversation

qphien
Copy link
Contributor

@qphien qphien commented Jan 8, 2021

After enabling CBO in Hive, there are some issues on MR when two iceberg table are joined. For example:

  • Cannot find field from inspector
  • ArrayIndexOutOfBoundsException when getting values from GenericRecord

These issues also happen with iceberg - non-iceberg table joins.

@github-actions github-actions bot added the MR label Jan 8, 2021
if (configuration.get(InputFormatConfig.TABLE_SCHEMA) != null) {
tableSchema = SchemaParser.fromJson(configuration.get(InputFormatConfig.TABLE_SCHEMA));
} else if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?
The original intent of the change was that we have the table schema at hand on the mappers/reducers. If we remove this then every mapper/reducer has to read the table once to get the schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As show below:

if we join two tables, only one iceberg schema exists in jobConf which leads wrong inspector:

  • empty inspector(non-overlap in two table selected columns, e.g.
    SELECT o.order_id, o.customer_id, o.total, p.name FROM default.orders o JOIN default.products p ON o.product_id = p.id ORDER BY o.order_id
    selected columns in table default.orders: [order_id, total, customer_id, product_id]
    selected columns in table default.products: [id, name])
  • incomplete inspector(overlap in two table selected columns, e.g.
    SELECT c.first_name, o.order_id FROM default.orders o JOIN default.customers c ON o.customer_id = c.customer_id ORDER BY o.order_id DESC
    selected columns in table default.customers: [first_name, customer_id]
    selected columns in table default.orders: [order_id, customer_id]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation! This is something @marton-bod already tried his hands on. We might have to use the table identifier to prefix/postfix the schema....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @marton-bod , is there any thread work on the proposal @pvary mentioned above? I can not find any related issue on github.
Thanks.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @qphien . Indeed, we have had a similar problem before in #1708.
My initial solution to fix it was to add the schema into the config object with a prefix based on the table identifier. For example, when joining together default.orders and default.customers, you'd have two properties in the config: default.orders.iceberg.mr.table.schema and default.customers.iceberg.mr.table.schema. This should allow you to add the schema for multiple tables without collisions/overwrites.

Eventually we ended up not adding in this fix, but instead just reverted the commit that caused the problem for us. However, this prefix solution can be one way to make things work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think getting table schema from serDeProperties has the same effect as prefix solution. iceberg.mr.table.schema in serDeProperties is overlayed from corresponding tableDesc, so we can get correct table schema when HiveIcebergSerde is initialized.

Comment on lines +84 to +94
// When same table is joined multiple times, it is possible some selected columns are duplicated,
// in this case wrong recordStructField position leads wrong value or ArrayIndexOutOfBoundException
String[] distinctSelectedColumns = Arrays.stream(selectedColumns).distinct().toArray(String[]::new);
Schema projectedSchema = distinctSelectedColumns.length > 0 ?
tableSchema.select(distinctSelectedColumns) : tableSchema;
// the input split mapper handles does not belong to this table
// it is necessary to ensure projectedSchema equals to tableSchema,
// or we cannot find selectOperator's column from inspector
if (projectedSchema.columns().size() != distinctSelectedColumns.length) {
projectedSchema = tableSchema;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marton-bod: Could you please take a look? You know more about the schema projection.
Thanks,
Peter

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good generally, but I wanted to clarify this comment:
// the input split mapper handles does not belong to this table // it is necessary to ensure projectedSchema equals to tableSchema, // or we cannot find selectOperator's column from inspector
Just for my understanding, can you give an example in what scenario we could face this issue where the Schema.select() gives back a different number of columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With test case testSelectedColumnsOverlapJoin, assuming that mapper is handling split belongs to table default.orders, columns set in hive.io.file.readcolumn.names are [order_id, customer_id], the inspector created for table default.customers just contains column [customer_id], when table default.customers selectOperator is initialized, field first_name cannot be found from inspector we just created, so exception below is thrown

cannot find field first_name from [org.apache.iceberg.mr.hive.serde.objectinspector.IcebergRecordObjectInspector$IcebergRecordStructField@a6807525]
	at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.getStandardStructFieldRef

The cause of this exception is that the schema get from Schema.select() is not what we want, returning an inspector contains all table columns is an easy workaround to fix this issue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation!


return testTableType.instance(shell.metastore().hiveConf(), temp);
}

static void init(TestHiveShell shell, TestTables testTables, TemporaryFolder temp, String engine) {
init(shell, testTables, temp, engine, "false");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use a boolean instead of the string?

import static org.apache.iceberg.types.Types.NestedField.required;
import static org.junit.runners.Parameterized.Parameter;
import static org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class TestHiveIcebergStorageHandlerWithEngine {

private static final String[] EXECUTION_ENGINES = new String[] {"tez", "mr"};
private static final String[] EXECUTION_ENGINES = new String[]{"tez", "mr"};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I usually try to avoid formatting only changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it's my fault, my IDEA automatically reformat code. I will pay more attention next time.

private static final String[] EXECUTION_ENGINES = new String[] {"tez", "mr"};
private static final String[] EXECUTION_ENGINES = new String[]{"tez", "mr"};

private static final String[] CBO_ENABLES = new String[]{"true", "false"};
Copy link
Contributor

@pvary pvary Jan 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might not need this list as the values can not be extended even on the long run. Could we just add this by hand at the parameters() method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, i will move CBO_ENABLES list to parameters method

@@ -107,6 +126,9 @@
@Parameter(2)
public TestTables.TestTableType testTableType;

@Parameter(3)
public String cboEnable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boolean please

Comment on lines 167 to 173
shell.executeStatement("SELECT first_name, customer_id FROM default.customers ORDER BY customer_id DESC");

Assert.assertEquals(3, descRows.size());
Assert.assertArrayEquals(new Object[] {"Trudy", 2L}, descRows.get(0));
Assert.assertArrayEquals(new Object[] {"Bob", 1L}, descRows.get(1));
Assert.assertArrayEquals(new Object[] {"Alice", 0L}, descRows.get(2));
Assert.assertArrayEquals(new Object[]{"Trudy", 2L}, descRows.get(0));
Assert.assertArrayEquals(new Object[]{"Bob", 1L}, descRows.get(1));
Assert.assertArrayEquals(new Object[]{"Alice", 0L}, descRows.get(2));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right that these are formatting only changes?
It is much easier if we do not have them, so I usually try to avoid them in my PRs

Comment on lines +214 to +215
"SELECT o1.order_id, o1.customer_id, o1.total " +
"FROM default.orders o1 JOIN default.orders o2 ON o1.order_id = o2.order_id ORDER BY o1.order_id"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this change needed?
Could you please help?
Thanks,
Peter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As show in https://github.com/apache/hive/blob/113f6af7528f016bf821f7a746bad496cc93f834/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java#L992-L998,
self join can lead duplicated columns in jobConf hive.io.file.readcolumn.names.
In this case, it is necessary to test whether self join can work correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh.. I have missed that the FROM part is from the orders table.
Sorry

@pvary
Copy link
Contributor

pvary commented Jan 8, 2021

Thanks for the patch @qphien!
Really appreciate that you have taken the time to track this down!

Asked a few questions in the review comments. The general observations:

  • Please do not do formatting only changes - These are making the review (and backport) harder
  • I would recommend to use a boolean or Boolean instead of "false"/"true" strings
  • There is one particular line I personally removed once accidentally and later realized that it is needed for performant queries (we might want to add a test case for it 😄). I think the line is still needed.
  • I asked @marton-bod to review the projection related part since he was the one working on that.

Thanks,
Peter

@qphien
Copy link
Contributor Author

qphien commented Jan 9, 2021

I'm not sure whether these failure tests are related to this PR. I tested locally and all test cases were passed.

org.apache.iceberg.spark.extensions.TestCopyOnWriteDelete > testDeleteWithSerializableIsolation[catalogName = spark_catalog, implementation = org.apache.iceberg.spark.SparkSessionCatalog, config = {type=hive, default-namespace=default, clients=1, parquet-enabled=false, cache-enabled=false}, format = avro, vectorized = false] FAILED
    java.lang.RuntimeException: Failed to get table info from metastore default.table

        Caused by:
        org.apache.thrift.transport.TTransportException: java.net.SocketException: Broken pipe (Write failed)

            Caused by:
            java.net.SocketException: Broken pipe (Write failed)

public static Collection<Object[]> parameters() {
Collection<Object[]> testParams = new ArrayList<>();
String javaVersion = System.getProperty("java.specification.version");
List<Boolean> cboEnables = ImmutableList.of(true, false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of running all tests for both CBO on/off, can we turn it on just in those one or two unit test cases where we want to test it, by setting it via shell.setHiveSessionValue() at the beginning of the test? We're trying to prevent the number of test runs from exploding via the combination of ever-increasing test parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marton-bod OK, I have moved CBO test to some join test cases.

Copy link
Collaborator

@marton-bod marton-bod left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for picking this up @qphien

@rdblue
Copy link
Contributor

rdblue commented Jan 13, 2021

Thanks for reviewing, @marton-bod and @pvary. Have all your concerns been addressed?

@marton-bod
Copy link
Collaborator

@rdblue yes, it should be good to go from my side. Thanks!

@pvary
Copy link
Contributor

pvary commented Jan 14, 2021

+1 from my side too

@rdblue rdblue merged commit d50f540 into apache:master Jan 15, 2021
@rdblue
Copy link
Contributor

rdblue commented Jan 15, 2021

Thanks for fixing this, @qphien! And thanks for reviewing, @marton-bod and @pvary!

XuQianJin-Stars pushed a commit to XuQianJin-Stars/iceberg that referenced this pull request Mar 22, 2021
Co-authored-by: 罗冲 <luochong@corp.netease.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants