Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Spark3 ZOrder Rewrite Strategy #3983

Merged
merged 30 commits into from
Apr 21, 2022

Conversation

RussellSpitzer
Copy link
Member

@RussellSpitzer RussellSpitzer commented Jan 25, 2022

Closes #3961

Built on #3960

int byteCompare = UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes);

Assert.assertTrue(String.format(
"Ordering of ints should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ordering of longs, same issue for all other data type tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. If possible I have a separate PR for the byte utils, I think we can get this in faster if we we target that first #3966

private static final org.apache.iceberg.SortOrder Z_SORT_ORDER = org.apache.iceberg.SortOrder.builderFor(Z_SCHEMA)
.sortBy(Z_COLUMN, SortDirection.ASC, NullOrder.NULLS_LAST)
.build();
private static final int STRING_KEY_LENGTH = 60;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, going back to the top question, this is currently static. Can we at least make this configurable somehow, maybe as a part of the interface input?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just my thought for the initial implementation. I think we eventually have this change automatically (as well as bounding), but I don't think we want to expose this as a user configurable parameter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there any rational for 60? Maybe document it? For instance parquet seems to use a default of 64 bytes for its truncation length on indexes. But imagine for a lot of common cases it wold be better to keep this much shorter (e.g. 32 bytes) so that there is room to interleave more primitive types and still fit on a cache line.

Copy link
Member Author

@RussellSpitzer RussellSpitzer Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just picking a random number. I'm fine setting it to 64 or 128.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the utility classes will pad to this value? so I think shorter is better i.e. 32 or even 16. We could run into cases where there are a lot of common prefixes but my sense is most strings are generally short so the extra padding seems wasteful. This is somewhat bike-shedding as I'm sure there will be another smarter implementation.

* @param columns Columns to be used to generate Z-Values
* @return this for method chaining
*/
default RewriteDataFiles zOrder(String... columns) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember in the original design zOrder is considered a part of the sort order, so it makes more sense to me for the input here to be SortOrder instead of String... columns. I guess this will be a shortcut for something like SortOrder.builderFor(schema).zOrder().columns(a,b,c).bitWidth(10) when that interface is out? But how do people define things like string column width to cutoff in this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

End users would not define any of the inner parameters for z ordering, Ideally in a follow up version we take some basic statistics from the data set and use that to develop our z ordering. I think providing any additional user config here is essentially just giving users a bunch of very difficult knobs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a follow up version we take some basic statistics from the data set and use that to develop our z ordering

do you have any current thoughts about how this would be determined by statistics? I always imagined some human decision is still needed to tune the tradeoffs among different configs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For most types my goal would be to bound and the condense the byte representation domain. So for example a column which has many values that translate to

{0, ....., 0, 1, X, Y, Z}

We want to shift to

{1,x,y,z,0, ..., 0}

This involves finding min and max values, binning within that range and shifting left

At least those are my current thoughts

@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this relate to #3960?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see comment below about splitting it out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep sorry, this pr is built on top of 3960 so hopefully we can get things in faster.

// with less magnitude (reverse the order)
for (int i = 0; i < floatBytes.length; i++) {
floatBytes[i] = (byte) ~floatBytes[i];
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is flipping needed? I thought for floats, all we had to do was flip the sign bit just like integers. Isn't that what the IEEE 754 quote above is saying?

Copy link
Member Author

@RussellSpitzer RussellSpitzer Jan 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, so the thing about sign magnitude integers is that the ordering of positive numbers is just what you expect. The larger the binary representation the larger the positive number. But for negatives the effect is the opposite. A negative number with a large magnitude is more negative than a negative number with a smaller magnitude. (This is different than the 2's complement representation we deal with above in orderIntLikeBytes)

So basically if we drew out the ordering from smallest to largest byte representations it goes

0000000                                                                11111111
0--------------------> Most Positive 0 <--------------------Most Negative

So we take this and first flip the sign bit which changes it to

0000000                                                                11111111
0<---------------------Most Negative 0------------------------> Most Positive

But we still have the negatives in the wrong direction, twiddling those bits flips their direction
(0010 -> 1101) (originally the most negative number)
(0001 -> 1110)
(0000 -> 1111)
And our final range looks like this

0000000                                                                11111111
Most negative ---------------------> 0 0------------------------> Most Positive

Imagine we have 4 byte signed magnitude integers

0000 = 0  ==> 1000
0001 =  1  ==> 1001
0010 =  2 ==> 1010
0011 =  3 ==> 1011
0100 = 4 ==> 1100
0101 = 5 ==> 1101
0111 = 6 ==> 1111
1000 = 0 ==> 0111
1001 = -1 ==> 0110
1010 = -2 ==> 0101
1011 = -3 ==> 0100
1100 = -4 ==> 0011
1101 = -5 ==> 0010
1111 = -6 ==> 0000

Which if we sort based on the transformed binary gives us
1111 = -6 ==> 0000
1101 = -5 ==> 0010
1100 = -4 ==> 0011
1011 = -3 ==> 0100
1010 = -2 ==> 0101
1001 = -1 ==> 0110
1000 = 0 ==> 0111
0000 = 0  ==> 1000
0001 =  1  ==> 1001
0010 =  2 ==> 1010
0011 =  3 ==> 1011
0100 = 4 ==> 1100
0101 = 5 ==> 1101
0111 = 6 ==> 1111

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, you're right. Floats are mirrored. And to double-check I validated that HBase also does this. OrdredBytes is a good reference.

partZOrderCols.isEmpty(),
"Cannot ZOrder on an Identity partition column as these values are constant within a partition, " +
"ZOrdering requested on %s",
partZOrderCols);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about simply removing the column because the order is identical without it? If I want to use order by zorder(col1, col2) but I know col1 is always a constant (as it is for a file in an identity partition) then it's equivalent to order by col2.

I think I'd prefer to produce an equivalent, rather than failing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched this to logging and added an additional check to make sure we haven't removed all of the ZOrder named columns. Personally I don't like changing user parameters if we know they are no-ops but I think this is fine too.

.collect(Collectors.toList());

Column zvalueArray = functions.array(zOrderColumns.stream().map(colStruct ->
SparkZOrder.sortedLexicographically(functions.col(colStruct.name()), colStruct.dataType())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that sortedLexicographically makes sense here. It looks like this is converting values to byte arrays that have the same sort, but there's no real sorting going on here, right?

return ZOrderByteUtils.interleaveBits(columnsBinary);
}

private static UserDefinedFunction tinyToOrderedBytesUDF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The udfs here are okay, but I think we'd get better performance overall if we were to use FunctionCatalog to expose these to Spark. That way, Spark compile in the method calls.

@RussellSpitzer
Copy link
Member Author

RussellSpitzer commented Feb 18, 2022

66c77fa - Benchmark
IcebergSortCompactionBenchmark.java
https://github.com/apache/iceberg/blob/66c77fad0c9c14b479909f0c40e8a222d35c00b2/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java

g :iceberg-spark:iceberg-spark-3.2:jmh -PjmhIncludeRegex=IcebergSortCompactionBenchmark -PjmhOutputPath=benchmark/SortResult.txt

Minimal effect on the timing of non-string compactions. Currently String is set at a buffer length of 128bytes which probably explains the dramatically more expensive ZOrder for operations containing Strings. I think additionally the Spark version here can bail on the first misaligned byte, while we read the entire buffer before we make any decision.

Memory is managed in the ZOrder method via thread local bytebuffers owned by the Zorder Udf's created in the sort. I need to check this over but I believe I did it correctly.

Test makes 8 files , 10 million Records each and runs the Iceberg Compaction Algorithm
Schema of the table is

        .withColumnRenamed("id", "longCol")
       .withColumn("intCol", expr("CAST(longCol AS INT)"))
       .withColumn("intCol2", expr("CAST(longCol AS INT)"))
       .withColumn("intCol3", expr("CAST(longCol AS INT)"))
       .withColumn("intCol4", expr("CAST(longCol AS INT)"))
       .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
       .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
       .withColumn("dateCol", date_add(current_date(), col("intCol").mod(NUM_FILES)))
       .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
       .withColumn("stringCol", expr("CAST(dateCol AS STRING)"));

Test rewrites

Benchmark                                        Mode  Cnt     Score      Error  Units
// Use 1 Int Column for sorting
IcebergSortCompactionBenchmark.sortInt             ss    3   344.850 ±  146.488   s/op
IcebergSortCompactionBenchmark.zSortInt            ss    3   370.162 ±   23.263   s/op

// Use 2 Int Columns for sorting
IcebergSortCompactionBenchmark.sortInt2            ss    3   331.688 ±   64.011   s/op
IcebergSortCompactionBenchmark.zSortInt2           ss    3   384.922 ±  141.313   s/op

// Use 3 Int Columns for sorting
IcebergSortCompactionBenchmark.sortInt3            ss    3   331.971 ±   91.621   s/op
IcebergSortCompactionBenchmark.zSortInt3           ss    3   398.508 ±   40.745   s/op

// Use 4 Int Columns for sorting
IcebergSortCompactionBenchmark.sortInt4            ss    3   345.414 ±   54.801   s/op
IcebergSortCompactionBenchmark.zSortInt4           ss    3   403.732 ±   43.947   s/op

// Use just a string column for sorting
IcebergSortCompactionBenchmark.sortString          ss    3   449.647 ±  874.281   s/op 
IcebergSortCompactionBenchmark.zSortString         ss    3   823.717 ± 1306.732   s/op

// Contains String Column String, Int, Date, Double
IcebergSortCompactionBenchmark.sortFourColumns     ss    3   292.972 ± 1978.359   s/op
IcebergSortCompactionBenchmark.zSortFourColumns    ss    3   913.779 ±  717.318   s/op

// Contains "stringCol", "intCol", "dateCol", "timestampCol", "doubleCol", "longCol"
IcebergSortCompactionBenchmark.sortSixColumns      ss    3   419.047 ±  113.040   s/op
IcebergSortCompactionBenchmark.zSortSixColumns     ss    3  1024.332 ±  568.328   s/op 

Summary

From what I see here the main cost comes when we start writing significantly larger ZOrder values, although even in that case we still only get a cost of 2-3x of a normal sort (which is doing something quite different). I'm guessing we could do better with a custom sort expression which doesn't materialize the entire ZValue.

The actual transforms to byte representations seem to be a very small portion of our cost here. In the Int vs Int examples we see the Zorder code is only slightly slower than the normal Sort code. The time is dominated by the shuffle algorithm.

CC: @rdblue

@rdblue
Copy link
Contributor

rdblue commented Feb 18, 2022

I'm a bit suspicious of those benchmarks because the error range is so high. The error rate for sortFourColumns is 6.7x the reported value. I'm not sure we can conclude much from those numbers. The scores look good in favor of plain integers (5-10% lower), but the error range is too high to know.

I'm guessing we could do better with a custom sort expression which doesn't materialize the entire ZValue.

That's one thing that using a float will do. We can convert the first 8 bytes back into a long and then use a secondary column of the remaining bytes or just discard them if we're okay with the loss of precision. That reduces the overall shuffle size and also makes Spark's internal operations much more efficient:

  • UnsafeRow returns primitive longs, rather than allocating arrays and copying bytes out of unsafe memory
  • The value is stored in the fixed-width portion of UnsafeRow, rather than using both length and the variable portion
  • Bytes are compared 8 at a time rather than 1-by-1

I'm not very concerned with the cost of producing the zorder representation, just with the accesses Spark is going to do and the overall size. The floating point idea allows us to reduce the size and discard additional bits of we choose to. That seems like a win for keeping the additional data size down as well as avoiding allocations and thing.

@RussellSpitzer
Copy link
Member Author

I'm a bit suspicious of those benchmarks because the error range is so high. The error rate for sortFourColumns is 6.7x the reported value. I'm not sure we can conclude much from those numbers. The scores look good in favor of plain integers (5-10% lower), but the error range is too high to know.

The error ranges are generated by some magic I do not understand, the actual variability within the runs was about 10 seconds (except for Sort Int 1) for the ints and within 100 seconds for Strings. So I don't know why JMH spits out the huge ranges.

Actual runs

Sort Int1
Iteration   1: 234.163 s/op
Iteration   2: 226.640 s/op
Iteration   3: 418.113 s/op

ZSortInt1
Iteration   1: 371.631 s/op
Iteration   2: 369.345 s/op
Iteration   3: 369.511 s/op
----------------

Sort Int2 
Iteration   1: 335.418 s/op
Iteration   2: 331.193 s/op
Iteration   3: 328.453 s/op

ZSorInt2
Iteration   1: 392.783 s/op
Iteration   2: 384.685 s/op
Iteration   3: 377.297 s/op
------

Sort Int3
Iteration   1: 332.431 s/op
Iteration   2: 326.734 s/op
Iteration   3: 336.747 s/op

ZSortInt3
Iteration   1: 396.057 s/op
Iteration   2: 400.429 s/op
Iteration   3: 399.036 s/op
---------------

SortInt4
Iteration   1: 344.941 s/op
Iteration   2: 342.674 s/op
Iteration   3: 348.626 s/op

ZSortInt4
Iteration   1: 405.152 s/op
Iteration   2: 400.950 s/op
Iteration   3: 405.093 s/op

---------------
Sort String

Iteration   1: 475.893 s/op
Iteration   2: 394.335 s/op
Iteration   3: 478.714 s/op

ZSort String
Iteration   1: 869.749 s/op
Iteration   2: 860.209 s/op
Iteration   3: 741.194 s/op

-------------------

Sort4 Columns
Iteration   1: 234.163 s/op
Iteration   2: 226.640 s/op
Iteration   3: 418.113 s/op

ZSort4 Columns
Iteration   1:952.127 s/op
Iteration   2: 873.557 s/op
Iteration   3: 915.652 s/op

------------------
Sort6 Columns
Iteration   1: 425.914 s/op
Iteration   2: 417.350 s/op
Iteration   3: 413.875 s/op

ZSort 6 Columns
Iteration   1: 1010.229 s/op
Iteration   2: 1002.726 s/op
Iteration   3: 1060.042 s/op

@RussellSpitzer
Copy link
Member Author

I'm guessing we could do better with a custom sort expression which doesn't materialize the entire ZValue.

That's one thing that using a float will do. We can convert the first 8 bytes back into a long and then use a secondary column of the remaining bytes or just discard them if we're okay with the loss of precision. That reduces the overall shuffle size and also makes Spark's internal operations much more efficient:

  • UnsafeRow returns primitive longs, rather than allocating arrays and copying bytes out of unsafe memory
  • The value is stored in the fixed-width portion of UnsafeRow, rather than using both length and the variable portion
  • Bytes are compared 8 at a time rather than 1-by-1

I think if this was actually the case we would see the 4 integer Sort version run much faster than the 4 IntegerZorder portion. But they aren't even though the 4 Integer one should only have to compare the first integer (all values for all columns are the same so only the first column needs to be read a compare time). It also doesn't have the cost of creating an extra column and serializing slightly more information.

The complexity of packing things into columns really doesn't seem like it will get us that much. I think as an example here I will trim our String key down to 8 bytes and rerun that test. I'm betting with that the timings will scale just with the raw number of bytes we have to interleave.

@RussellSpitzer
Copy link
Member Author

RussellSpitzer commented Feb 18, 2022

ZOrder Sort4Columns with only 8 Bytes of the String Considered when making the interleave column
Iteration   1: 298.384 s/op
Iteration   2: 316.174 s/op
Iteration   3: 312.037 s/op

For Reference sort with 4 Columns from the previous run
Sort4 Columns
Iteration   1: 234.163 s/op
Iteration   2: 226.640 s/op
Iteration   3: 418.113 s/op

The dominating factor here is I'm pretty sure just the additional amount of bytes that need to be serialized back and forth and the cost of the interleave column, I don't think the sort comparison is really that significant compared to those costs

One more test, Forcing the ZOrder Column into exactly 8 Bytes (essentially once 8 bytes have been contributed the row is done regardless of how many bytes are in the input columns.

ZOrder Sort4 Columns but the ZValue is only allowed to be 8 bytes long all other information is discarded
Iteration   1: 284.234 s/op
Iteration   2: 274.458 s/op
Iteration   3: 278.901 s/op

@rdblue added in a version where we have the ZOrder function return a Long instead of a byte array. So this is basically the same as the above test which only uses a max of 8 interleaved bytes. Then uses ByteBuffer.pos(0).getLong to place this value into a Spark Column. It ends up being slower than the byte comparison above. I don't know why.

Iteration   1: 317.637 s/op
Iteration   2: 311.544 s/op
Iteration   3: 303.514 s/op

@RussellSpitzer
Copy link
Member Author

RussellSpitzer commented Mar 4, 2022

Summarizing some of the perf tests I ran with the 4 Column Sort (String, Int, Date, Double)

Sorting just on already existing values, no Zorder
Iteration 1: 234.163 s/op
Iteration 2: 226.640 s/op
Iteration 3: 418.113 s/op

ZOrder: 128 bytes per String, All bytes interleaved, Sort on a binary column
Iteration 1:952.127 s/op
Iteration 2: 873.557 s/op
Iteration 3: 915.652 s/op

ZOrder: 8 Bytes per String, All bytes intereleaved, Sort on Binary Column
Iteration 1: 298.384 s/op
Iteration 2: 316.174 s/op
Iteration 3: 312.037 s/op

ZOrder: 8 Bytes per string, Interleave up to 8 bytes, Sort on Binary Column
Iteration 1: 284.234 s/op
Iteration 2: 274.458 s/op
Iteration 3: 278.901 s/op

ZOrder: 8 Bytes per sting, Interleave up to 8 Bytes, Sort on Long representation of 8 Bytes
RussellSpitzer#3
Iteration 1: 317.637 s/op
Iteration 2: 311.544 s/op
Iteration 3: 303.514 s/op

@rdblue
Copy link
Contributor

rdblue commented Mar 13, 2022

As long as we think the time is dominated by serialization, then let's move forward with this. We can always update the implementation later.

}

private void appendData() {
Dataset<Row> df = spark().range(0, NUM_ROWS * NUM_FILES, 1, NUM_FILES)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure the mocked data is good to verify the performance. In real world, it always random and exists duplicate data, and it would be more slower during comparing.

FYI, order by binary column is very slow for the duplicate data, apache/spark#34310 fixed it in Spark-3.3.0. It changes comparison to long-by-long rather byte-by-byte.

Copy link
Member Author

@RussellSpitzer RussellSpitzer Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you suggest? Here is an example where every single record is the same (probably some good branch prediction helping us out here)

ZOrder, 8 Bytes output as Byte Array
Iteration 1: 237.364 s/op
Iteration 2: 196.935 s/op
Iteration 3: 203.385 s/op

I feel like we have lots of micro benchmarks here, but none that really show that in this context it matters that much. If we limit the output to 8 Bytes per column our max ZOrder column is never going to be larger than 64 bytes (or if a user uses more than this many columns it probably wouldn't be that beneficial to ZOrder anyway.)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @RussellSpitzer yea, I agree it really depends on the length of ZOrder value. Just post the thought of potential issue.

What would you suggest

I thought about that do we need a unit benchmark for the method of ZOrderByteUtils.interleaveBits ? it is directly and deos not depend on the Spark version or something else.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think we need lots of benchmarking when we want to tune this up. When Spark 3.0 comes out it will be great to rerun this as well and see if we get better performance. I am 100% in favor of any followups to improve things but I do want to get this out as soon as we can so we have at least a starting point.

Copy link
Member Author

@RussellSpitzer RussellSpitzer Mar 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran some benchmarks on ZOrderByteUtils, see the newly attached commit for the test file
Times 10 Million interleaves, input values come directly from preallocated arrays.

ZOrderByteUtilsBenchmark.interleaveValuesFourColumns               ss    5  10.706 ± 1.117   s/op
ZOrderByteUtilsBenchmark.interleaveValuesFourColumns8ByteOutput    ss    5   2.854 ± 0.832   s/op
ZOrderByteUtilsBenchmark.interleaveValuesThreeColumns              ss    5   7.960 ± 1.532   s/op
ZOrderByteUtilsBenchmark.interleaveValuesTwoColumns                ss    5   5.618 ± 1.265   s/op

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the late, thank you @RussellSpitzer , looks good to me

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally it makes sense, I left one round of comments

}

/**
* For Testing interleave all available bytes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesnt seem relevant right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use it only in the test class but I don't need to, I'll switch it to the other version below.


Assert.assertEquals("Should have 1 fileGroups", 1, result.rewriteResults().size());
int zOrderedFilesTotal = Iterables.size(table.currentSnapshot().addedFiles());
Assert.assertTrue("Should have written 40+ files", zOrderedFilesTotal >= 40);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: is this not deterministic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny changes in the parquet compression and it's ability to determine file size make exact file count slightly iffy between versions. I have spent a lot of time trying to fine tune the parameters here so that we make exactly the number of files we want but it's difficult unless the files are big enough for this test to take a considerable amount of time. :/

private static final int STRING_KEY_LENGTH = 128;

private final List<String> zOrderColNames;
private transient FileScanTaskSetManager manager = FileScanTaskSetManager.get();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we extend Spark3Strategy, why do we need to define these again, and at different serialization level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transient isn't needed, I added that in when I was trying to keep Spark3UDF as a subclass, that was ... not neccessary. We could do without the private method, then I need to change them to protected and add getters because of our style. I didn't do that before because I just dislike making getters :( But I probably should

@@ -155,4 +155,8 @@ protected SparkSession spark() {
protected LogicalPlan sortPlan(Distribution distribution, SortOrder[] ordering, LogicalPlan plan, SQLConf conf) {
return DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, ordering, plan, conf);
}

protected double sizeEstimateMultiple() {
return this.sizeEstimateMultiple;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: i think we don't use 'this' in many places outside assigning same variable name, in the code? I didnt find many examples for returning private variable for example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this, yeah we only use it in assignment

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took another pass (and some old comments still apply)

return PRIMITIVE_EMPTY;
}
return ZOrderByteUtils.doubleToOrderedBytes(value, inputBuffer(position, Double.BYTES)).array();
}, DataTypes.BinaryType).withName("FLOAT_ORDERED_BYTES");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo?

this.numCols = numCols;
}

private void readObject(java.io.ObjectInputStream in)
Copy link
Collaborator

@szehon-ho szehon-ho Mar 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we import ObjectInputStream?

}

private ByteBuffer outputBuffer(int size) {
if (outputBuffer == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not clear why not initialize it in readObject? It just makes an un-initialized ThreadLocal reference right? (it makes that a big difference?)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also, I assume we are expecting concurrent calls , wouldn't we need mt-protection on the check whether buffer is null or not, if we go with this approach (hoping we don't have to)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah no reason we couldn't set it in readObject, I think I just initially put it here before I had implemented the other buffers.

As for thread execution, the way we generate the UDF means that you get a single "outputBuffer" for every thread. The way Spark should be running this is with a single thread per execution context (task) which means that any individual invocation should actually be in serial. That's why we don't have any protections on any of these buffer references.

} else if (type instanceof StringType) {
return stringToOrderedBytesUDF().apply(column);
} else if (type instanceof BinaryType) {
return stringToOrderedBytesUDF().apply(column);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add test with binary type to make sure the cast happens right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah I think this is actually incorrect now ... we just want to truncate without doing all of the other encoding work

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swapped with a truncate or fill method.

@RussellSpitzer
Copy link
Member Author

After changing the default size of all types to 8 bytes and randomizing the shuffle input I get some different perf results. It seems like the pattern of the data is more important to the sort times than the amount of data in the sort field?

Comit : 91a7601

Benchmark                                        Mode  Cnt    Score    Error  Units
IcebergSortCompactionBenchmark.sortFourColumns     ss   10   86.675 ±  8.177   s/op
IcebergSortCompactionBenchmark.sortInt             ss   10   74.262 ±  6.732   s/op
IcebergSortCompactionBenchmark.sortInt2            ss   10   72.515 ±  8.014   s/op
IcebergSortCompactionBenchmark.sortInt3            ss   10   76.228 ±  4.590   s/op
IcebergSortCompactionBenchmark.sortInt4            ss   10   74.730 ±  4.267   s/op
IcebergSortCompactionBenchmark.sortSixColumns      ss   10   75.933 ±  5.042   s/op
IcebergSortCompactionBenchmark.sortString          ss   10   77.488 ±  6.804   s/op

IcebergSortCompactionBenchmark.zSortFourColumns    ss   10  277.954 ± 73.324   s/op
IcebergSortCompactionBenchmark.zSortInt            ss   10  327.105 ± 17.098   s/op
IcebergSortCompactionBenchmark.zSortInt2           ss   10  328.217 ± 13.099   s/op
IcebergSortCompactionBenchmark.zSortInt3           ss   10  342.404 ± 15.660   s/op
IcebergSortCompactionBenchmark.zSortInt4           ss   10  344.997 ± 16.342   s/op
IcebergSortCompactionBenchmark.zSortSixColumns     ss   10  295.686 ± 62.866   s/op
IcebergSortCompactionBenchmark.zSortString         ss   10  333.303 ± 15.966   s/op

What is odd to me here is that the sort time for Strings is now ... basically the same as integers, all of our zorderings take about the same amount of time and so do all of our sortings without zorder. What is more interesting to me is that for ZOrdering this is basically increasing the ZORDER output byte size and have no effect on the comparison time. For Strings maybe this made sense ... but for ZSortInt 1,2,3,4 I would have expected things to take different amounts of times. Perhaps with a totally random layout of data the significant bits to compare on average always appear in the same location for ZOrder regardless of number of interleaved columns?

@RussellSpitzer
Copy link
Member Author

@szehon-ho Made fixes and added some new benchmarks as well, please take a look when you have time.

"Cannot have the interleaved ZOrder value use less than 1 byte, %s was set to %s",
MAX_OUTPUT_SIZE_KEY, maxOutputSize);

return this;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra space

Spark3ZOrderUDF(int numCols, int varTypeSize, int maxOutputSize) {
this.numCols = numCols;
this.varTypeSize = varTypeSize;
this.maxOutputSize = maxOutputSize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not even seem used? Should we remove the option for now (unless I missed where it's used)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woops, yeah I changed this now so that on deserialization (readObject) we set totalOutputBytes to maxOutputBytes if it is larger than maxOutputBytes.

} else if (type instanceof BinaryType) {
return bytesTruncateUDF().apply(column);
} else if (type instanceof BooleanType) {
return bytesTruncateUDF().apply(column);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right? (We will take the default var size here?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I guess technically we should use the primitive size here, although zordering on a column with only 2 values is probably a losing proposition anyway :)

@@ -1181,6 +1263,35 @@ protected Table createTablePartitioned(int partitions, int files) {
return createTablePartitioned(partitions, files, SCALE, Maps.newHashMap());
}

private Table createTypeTestTable() {
Schema schema = new Schema(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still feel a bit more comfortable if we can do add some of the other types, like boolean, binary

private ByteBuffer inputBuffer(int position, int size) {
if (inputBuffers[position] == null) {
// May over allocate on concurrent calls
inputBuffers[position] = ThreadLocal.withInitial(() -> ByteBuffer.allocate(size));
Copy link
Collaborator

@szehon-ho szehon-ho Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I am still trying to understand why use thread local if we are not expecting multiple threads? I guess my point was, if we are doing it to be mt-safe in case it happens, this particular part does not seem very mt-safe. Unless there is another reason of the thread-local I miss?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this a bit offline, for those interested our lifecycle basically looks like

Driver --- UDF --> Executor -> Core - Task (Thread)
                       \_____> Core - Task (Thread)

So every task is run in a single thread on the executor, but the tasks are referring to a UDF which is basically a singleton on the Executor JVM. Each task in isolation runs as if it was single threaded but any UDF members need to be protected against MultiThreaded access.

An alternative implementation we could do is to explicitly handle the multithreading with RDD operations. For example something like

rdd.mapPartitions( it -> {
  inputBuffer = allocate
  outputBuffer = allocate
  it.map( row -> ZOrderRow(row, inputBuffer, outputBuffer)
}
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this a bit on slack and we agreed to shift this to
private transient ThreadLocal<ByteBuffer[]> inputBuffers;

So now the array is allocated once per thread, then we change the elements of that array

VAR_LENGTH_CONTRIBUTION_KEY, varLengthContribution);


maxOutputSize = PropertyUtil.propertyAsInt(options, MAX_OUTPUT_SIZE_KEY, DEFAULT_MAX_OUTPUT_SIZE);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it for now if not used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in the zOrderUDF

See

    this.zOrderUDF = new Spark3ZOrderUDF(zOrderColNames.size(), varLengthContribution, maxOutputSize);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I mean, its not used inside the UDF :)

Use Spark UDFs to create a Z-Value column and then invoke a Spark Sort on it. The resultant data is then saved without the Z-Value Column.
Change all primtives to use 8 byte buffers, Types now aligned based on magnitude
Perf test still WIP, using new Random generating udfs
Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @RussellSpitzer looks mostly good to me just some minor questions left.

Also not very familiar with Parquet UUID, so maybe we can separate out or have someone more familiar look at the new writer?


@Override
protected void validateOptions() {
// TODO implement ZOrder Strategy in API Module
Copy link
Collaborator

@szehon-ho szehon-ho Apr 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didnt get, why do we need to implement in API module to implement this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me remove that, this was to remind me that we need to decide whether this will be defined in API in the future. One of my thoughts is we can eventually change it to just being a type of SortOrder

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to

    // Ignore SortStrategy validation

}

byte[] interleaveBits(Seq<byte[]> scalaBinary) {
byte[][] columnsBinary = scala.collection.JavaConverters.seqAsJavaList(scalaBinary)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: is it possible to import?

@@ -299,6 +301,28 @@ public void write(int repetitionLevel, byte[] bytes) {
}
}

private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) {
Copy link
Collaborator

@szehon-ho szehon-ho Apr 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this required for this change? (Couldn't figure out where it's used). If not a critical part of this change, maybe we can separate it out for people more familiar to take a look at this part?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No i meant to remove this too, I actually removed some of the other code which makes this work

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, just some very small nits and a question, feel free to merge when you are ready

public static final int PRIMITIVE_BUFFER_SIZE = 8;

private ZOrderByteUtils() {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: extra space (other Util has 1 space)

.build();

/**
* Controls the amount of bytes interleaved in the ZOrder Algorithm. Default is all bytes being interleaved.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For learning, what is the effect if we do not use all bytes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You get a more corse ordering (only higher order bytes are considered) the effect is different based on the distribution of data in the actual column.

So if for example we had 2 Integer columns and we only used 1 Byte for our output here we would only be considering the first 4 bits (highest order) from either integer. So only number with extermely high magntiude would be effectively interleaved. All other bits would be ignored. So 3 would look equivalent to 4.


if (!partZOrderCols.isEmpty()) {
LOG.warn("Cannot ZOrder on an Identity partition column as these values are constant within a partition " +
"they will be removed from the ZOrder expression: {}", partZOrderCols);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "they" is a bit abrupt for the middle of the sentence, how about "and"

buffer.put(0, (byte) (value ? -127 : 0));
return buffer.array();
}, DataTypes.BinaryType)
.withName("BOOLEAN-LEXICAL-BYTES");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: looks a bit weird, maybe put on previous line like others


private void increaseOutputSize(int bytes) {
totalOutputBytes += bytes;
if (totalOutputBytes > maxOutputSize) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: totalOutputBytes = Math.max(totalOutputBytes, maxOutputSize) can fit on one line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, but I think you mean "min"

Copy link
Member Author

@RussellSpitzer RussellSpitzer Apr 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  private void increaseOutputSize(int bytes) {
    totalOutputBytes = Math.min(totalOutputBytes + bytes, maxOutputSize);
  }

@RussellSpitzer
Copy link
Member Author

OK I think after tests pass we are good to go. After this we can start working on more tests and further optimizations to the algorithm but I believe this is enough for folks to start getting their hands dirty.

@RussellSpitzer RussellSpitzer merged commit f213541 into apache:master Apr 21, 2022
@RussellSpitzer
Copy link
Member Author

Thanks @szehon-ho , @rdblue , @ulysses-you, @jackye1995 and @emkornfield For all your insightful comments and review. Let's be sure to continue optimizing and finding ways to make Iceberg's optimizations more performant!

xloya pushed a commit to xloya/iceberg that referenced this pull request Sep 24, 2022
sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 9, 2023
Adds initial implementation of a ZOrder File Rewrite Strategy. Allows uses to use a multidimensional sort algorithm for organizing data.

(cherry picked from commit f213541)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Spark Implementation of ZOrder Rewrite Strategy
7 participants