-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Spark3 ZOrder Rewrite Strategy #3983
Conversation
int byteCompare = UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes); | ||
|
||
Assert.assertTrue(String.format( | ||
"Ordering of ints should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ordering of longs, same issue for all other data type tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. If possible I have a separate PR for the byte utils, I think we can get this in faster if we we target that first #3966
private static final org.apache.iceberg.SortOrder Z_SORT_ORDER = org.apache.iceberg.SortOrder.builderFor(Z_SCHEMA) | ||
.sortBy(Z_COLUMN, SortDirection.ASC, NullOrder.NULLS_LAST) | ||
.build(); | ||
private static final int STRING_KEY_LENGTH = 60; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, going back to the top question, this is currently static. Can we at least make this configurable somehow, maybe as a part of the interface input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just my thought for the initial implementation. I think we eventually have this change automatically (as well as bounding), but I don't think we want to expose this as a user configurable parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was there any rational for 60? Maybe document it? For instance parquet seems to use a default of 64 bytes for its truncation length on indexes. But imagine for a lot of common cases it wold be better to keep this much shorter (e.g. 32 bytes) so that there is room to interleave more primitive types and still fit on a cache line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just picking a random number. I'm fine setting it to 64 or 128.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC the utility classes will pad to this value? so I think shorter is better i.e. 32 or even 16. We could run into cases where there are a lot of common prefixes but my sense is most strings are generally short so the extra padding seems wasteful. This is somewhat bike-shedding as I'm sure there will be another smarter implementation.
* @param columns Columns to be used to generate Z-Values | ||
* @return this for method chaining | ||
*/ | ||
default RewriteDataFiles zOrder(String... columns) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember in the original design zOrder is considered a part of the sort order, so it makes more sense to me for the input here to be SortOrder
instead of String... columns
. I guess this will be a shortcut for something like SortOrder.builderFor(schema).zOrder().columns(a,b,c).bitWidth(10)
when that interface is out? But how do people define things like string column width to cutoff in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
End users would not define any of the inner parameters for z ordering, Ideally in a follow up version we take some basic statistics from the data set and use that to develop our z ordering. I think providing any additional user config here is essentially just giving users a bunch of very difficult knobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in a follow up version we take some basic statistics from the data set and use that to develop our z ordering
do you have any current thoughts about how this would be determined by statistics? I always imagined some human decision is still needed to tune the tradeoffs among different configs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For most types my goal would be to bound and the condense the byte representation domain. So for example a column which has many values that translate to
{0, ....., 0, 1, X, Y, Z}
We want to shift to
{1,x,y,z,0, ..., 0}
This involves finding min and max values, binning within that range and shifting left
At least those are my current thoughts
@@ -0,0 +1,128 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this relate to #3960?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see comment below about splitting it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep sorry, this pr is built on top of 3960 so hopefully we can get things in faster.
// with less magnitude (reverse the order) | ||
for (int i = 0; i < floatBytes.length; i++) { | ||
floatBytes[i] = (byte) ~floatBytes[i]; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is flipping needed? I thought for floats, all we had to do was flip the sign bit just like integers. Isn't that what the IEEE 754 quote above is saying?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, so the thing about sign magnitude integers is that the ordering of positive numbers is just what you expect. The larger the binary representation the larger the positive number. But for negatives the effect is the opposite. A negative number with a large magnitude is more negative than a negative number with a smaller magnitude. (This is different than the 2's complement representation we deal with above in orderIntLikeBytes)
So basically if we drew out the ordering from smallest to largest byte representations it goes
0000000 11111111
0--------------------> Most Positive 0 <--------------------Most Negative
So we take this and first flip the sign bit which changes it to
0000000 11111111
0<---------------------Most Negative 0------------------------> Most Positive
But we still have the negatives in the wrong direction, twiddling those bits flips their direction
(0010 -> 1101) (originally the most negative number)
(0001 -> 1110)
(0000 -> 1111)
And our final range looks like this
0000000 11111111
Most negative ---------------------> 0 0------------------------> Most Positive
Imagine we have 4 byte signed magnitude integers
0000 = 0 ==> 1000
0001 = 1 ==> 1001
0010 = 2 ==> 1010
0011 = 3 ==> 1011
0100 = 4 ==> 1100
0101 = 5 ==> 1101
0111 = 6 ==> 1111
1000 = 0 ==> 0111
1001 = -1 ==> 0110
1010 = -2 ==> 0101
1011 = -3 ==> 0100
1100 = -4 ==> 0011
1101 = -5 ==> 0010
1111 = -6 ==> 0000
Which if we sort based on the transformed binary gives us
1111 = -6 ==> 0000
1101 = -5 ==> 0010
1100 = -4 ==> 0011
1011 = -3 ==> 0100
1010 = -2 ==> 0101
1001 = -1 ==> 0110
1000 = 0 ==> 0111
0000 = 0 ==> 1000
0001 = 1 ==> 1001
0010 = 2 ==> 1010
0011 = 3 ==> 1011
0100 = 4 ==> 1100
0101 = 5 ==> 1101
0111 = 6 ==> 1111
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, you're right. Floats are mirrored. And to double-check I validated that HBase also does this. OrdredBytes is a good reference.
5259daf
to
72320da
Compare
partZOrderCols.isEmpty(), | ||
"Cannot ZOrder on an Identity partition column as these values are constant within a partition, " + | ||
"ZOrdering requested on %s", | ||
partZOrderCols); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about simply removing the column because the order is identical without it? If I want to use order by zorder(col1, col2)
but I know col1
is always a constant (as it is for a file in an identity partition) then it's equivalent to order by col2
.
I think I'd prefer to produce an equivalent, rather than failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched this to logging and added an additional check to make sure we haven't removed all of the ZOrder named columns. Personally I don't like changing user parameters if we know they are no-ops but I think this is fine too.
.collect(Collectors.toList()); | ||
|
||
Column zvalueArray = functions.array(zOrderColumns.stream().map(colStruct -> | ||
SparkZOrder.sortedLexicographically(functions.col(colStruct.name()), colStruct.dataType()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that sortedLexicographically
makes sense here. It looks like this is converting values to byte arrays that have the same sort, but there's no real sorting going on here, right?
return ZOrderByteUtils.interleaveBits(columnsBinary); | ||
} | ||
|
||
private static UserDefinedFunction tinyToOrderedBytesUDF() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The udfs here are okay, but I think we'd get better performance overall if we were to use FunctionCatalog
to expose these to Spark. That way, Spark compile in the method calls.
72320da
to
66c77fa
Compare
66c77fa - Benchmark g :iceberg-spark:iceberg-spark-3.2:jmh -PjmhIncludeRegex=IcebergSortCompactionBenchmark -PjmhOutputPath=benchmark/SortResult.txt Minimal effect on the timing of non-string compactions. Currently String is set at a buffer length of 128bytes which probably explains the dramatically more expensive ZOrder for operations containing Strings. I think additionally the Spark version here can bail on the first misaligned byte, while we read the entire buffer before we make any decision. Memory is managed in the ZOrder method via thread local bytebuffers owned by the Zorder Udf's created in the sort. I need to check this over but I believe I did it correctly. Test makes 8 files , 10 million Records each and runs the Iceberg Compaction Algorithm
Test rewrites
Summary From what I see here the main cost comes when we start writing significantly larger ZOrder values, although even in that case we still only get a cost of 2-3x of a normal sort (which is doing something quite different). I'm guessing we could do better with a custom sort expression which doesn't materialize the entire ZValue. The actual transforms to byte representations seem to be a very small portion of our cost here. In the Int vs Int examples we see the Zorder code is only slightly slower than the normal Sort code. The time is dominated by the shuffle algorithm. CC: @rdblue |
I'm a bit suspicious of those benchmarks because the error range is so high. The error rate for sortFourColumns is 6.7x the reported value. I'm not sure we can conclude much from those numbers. The scores look good in favor of plain integers (5-10% lower), but the error range is too high to know.
That's one thing that using a float will do. We can convert the first 8 bytes back into a long and then use a secondary column of the remaining bytes or just discard them if we're okay with the loss of precision. That reduces the overall shuffle size and also makes Spark's internal operations much more efficient:
I'm not very concerned with the cost of producing the zorder representation, just with the accesses Spark is going to do and the overall size. The floating point idea allows us to reduce the size and discard additional bits of we choose to. That seems like a win for keeping the additional data size down as well as avoiding allocations and thing. |
The error ranges are generated by some magic I do not understand, the actual variability within the runs was about 10 seconds (except for Sort Int 1) for the ints and within 100 seconds for Strings. So I don't know why JMH spits out the huge ranges. Actual runs
|
I think if this was actually the case we would see the 4 integer Sort version run much faster than the 4 IntegerZorder portion. But they aren't even though the 4 Integer one should only have to compare the first integer (all values for all columns are the same so only the first column needs to be read a compare time). It also doesn't have the cost of creating an extra column and serializing slightly more information. The complexity of packing things into columns really doesn't seem like it will get us that much. I think as an example here I will trim our String key down to 8 bytes and rerun that test. I'm betting with that the timings will scale just with the raw number of bytes we have to interleave. |
The dominating factor here is I'm pretty sure just the additional amount of bytes that need to be serialized back and forth and the cost of the interleave column, I don't think the sort comparison is really that significant compared to those costs One more test, Forcing the ZOrder Column into exactly 8 Bytes (essentially once 8 bytes have been contributed the row is done regardless of how many bytes are in the input columns.
@rdblue added in a version where we have the ZOrder function return a Long instead of a byte array. So this is basically the same as the above test which only uses a max of 8 interleaved bytes. Then uses ByteBuffer.pos(0).getLong to place this value into a Spark Column. It ends up being slower than the byte comparison above. I don't know why.
|
Summarizing some of the perf tests I ran with the 4 Column Sort (String, Int, Date, Double) Sorting just on already existing values, no Zorder ZOrder: 128 bytes per String, All bytes interleaved, Sort on a binary column ZOrder: 8 Bytes per String, All bytes intereleaved, Sort on Binary Column ZOrder: 8 Bytes per string, Interleave up to 8 bytes, Sort on Binary Column ZOrder: 8 Bytes per sting, Interleave up to 8 Bytes, Sort on Long representation of 8 Bytes |
core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
Outdated
Show resolved
Hide resolved
As long as we think the time is dominated by serialization, then let's move forward with this. We can always update the implementation later. |
} | ||
|
||
private void appendData() { | ||
Dataset<Row> df = spark().range(0, NUM_ROWS * NUM_FILES, 1, NUM_FILES) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure the mocked data is good to verify the performance. In real world, it always random and exists duplicate data, and it would be more slower during comparing.
FYI, order by binary column is very slow for the duplicate data, apache/spark#34310 fixed it in Spark-3.3.0. It changes comparison to long-by-long rather byte-by-byte.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would you suggest? Here is an example where every single record is the same (probably some good branch prediction helping us out here)
ZOrder, 8 Bytes output as Byte Array
Iteration 1: 237.364 s/op
Iteration 2: 196.935 s/op
Iteration 3: 203.385 s/op
I feel like we have lots of micro benchmarks here, but none that really show that in this context it matters that much. If we limit the output to 8 Bytes per column our max ZOrder column is never going to be larger than 64 bytes (or if a user uses more than this many columns it probably wouldn't be that beneficial to ZOrder anyway.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you @RussellSpitzer yea, I agree it really depends on the length of ZOrder value. Just post the thought of potential issue.
What would you suggest
I thought about that do we need a unit benchmark for the method of ZOrderByteUtils.interleaveBits
? it is directly and deos not depend on the Spark version or something else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think we need lots of benchmarking when we want to tune this up. When Spark 3.0 comes out it will be great to rerun this as well and see if we get better performance. I am 100% in favor of any followups to improve things but I do want to get this out as soon as we can so we have at least a starting point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran some benchmarks on ZOrderByteUtils, see the newly attached commit for the test file
Times 10 Million interleaves, input values come directly from preallocated arrays.
ZOrderByteUtilsBenchmark.interleaveValuesFourColumns ss 5 10.706 ± 1.117 s/op
ZOrderByteUtilsBenchmark.interleaveValuesFourColumns8ByteOutput ss 5 2.854 ± 0.832 s/op
ZOrderByteUtilsBenchmark.interleaveValuesThreeColumns ss 5 7.960 ± 1.532 s/op
ZOrderByteUtilsBenchmark.interleaveValuesTwoColumns ss 5 5.618 ± 1.265 s/op
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for the late, thank you @RussellSpitzer , looks good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally it makes sense, I left one round of comments
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderUDF.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderUDF.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderUDF.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
/** | ||
* For Testing interleave all available bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment doesnt seem relevant right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use it only in the test class but I don't need to, I'll switch it to the other version below.
|
||
Assert.assertEquals("Should have 1 fileGroups", 1, result.rewriteResults().size()); | ||
int zOrderedFilesTotal = Iterables.size(table.currentSnapshot().addedFiles()); | ||
Assert.assertTrue("Should have written 40+ files", zOrderedFilesTotal >= 40); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: is this not deterministic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tiny changes in the parquet compression and it's ability to determine file size make exact file count slightly iffy between versions. I have spent a lot of time trying to fine tune the parameters here so that we make exactly the number of files we want but it's difficult unless the files are big enough for this test to take a considerable amount of time. :/
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderStrategy.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderStrategy.java
Outdated
Show resolved
Hide resolved
private static final int STRING_KEY_LENGTH = 128; | ||
|
||
private final List<String> zOrderColNames; | ||
private transient FileScanTaskSetManager manager = FileScanTaskSetManager.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we extend Spark3Strategy, why do we need to define these again, and at different serialization level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transient isn't needed, I added that in when I was trying to keep Spark3UDF as a subclass, that was ... not neccessary. We could do without the private method, then I need to change them to protected and add getters because of our style. I didn't do that before because I just dislike making getters :( But I probably should
@@ -155,4 +155,8 @@ protected SparkSession spark() { | |||
protected LogicalPlan sortPlan(Distribution distribution, SortOrder[] ordering, LogicalPlan plan, SQLConf conf) { | |||
return DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, ordering, plan, conf); | |||
} | |||
|
|||
protected double sizeEstimateMultiple() { | |||
return this.sizeEstimateMultiple; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: i think we don't use 'this' in many places outside assigning same variable name, in the code? I didnt find many examples for returning private variable for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this, yeah we only use it in assignment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took another pass (and some old comments still apply)
return PRIMITIVE_EMPTY; | ||
} | ||
return ZOrderByteUtils.doubleToOrderedBytes(value, inputBuffer(position, Double.BYTES)).array(); | ||
}, DataTypes.BinaryType).withName("FLOAT_ORDERED_BYTES"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo?
this.numCols = numCols; | ||
} | ||
|
||
private void readObject(java.io.ObjectInputStream in) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we import ObjectInputStream?
} | ||
|
||
private ByteBuffer outputBuffer(int size) { | ||
if (outputBuffer == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still not clear why not initialize it in readObject? It just makes an un-initialized ThreadLocal reference right? (it makes that a big difference?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And also, I assume we are expecting concurrent calls , wouldn't we need mt-protection on the check whether buffer is null or not, if we go with this approach (hoping we don't have to)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah no reason we couldn't set it in readObject, I think I just initially put it here before I had implemented the other buffers.
As for thread execution, the way we generate the UDF means that you get a single "outputBuffer" for every thread. The way Spark should be running this is with a single thread per execution context (task) which means that any individual invocation should actually be in serial. That's why we don't have any protections on any of these buffer references.
} else if (type instanceof StringType) { | ||
return stringToOrderedBytesUDF().apply(column); | ||
} else if (type instanceof BinaryType) { | ||
return stringToOrderedBytesUDF().apply(column); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add test with binary type to make sure the cast happens right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yeah I think this is actually incorrect now ... we just want to truncate without doing all of the other encoding work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Swapped with a truncate or fill method.
After changing the default size of all types to 8 bytes and randomizing the shuffle input I get some different perf results. It seems like the pattern of the data is more important to the sort times than the amount of data in the sort field? Comit : 91a7601
What is odd to me here is that the sort time for Strings is now ... basically the same as integers, all of our zorderings take about the same amount of time and so do all of our sortings without zorder. What is more interesting to me is that for ZOrdering this is basically increasing the ZORDER output byte size and have no effect on the comparison time. For Strings maybe this made sense ... but for ZSortInt 1,2,3,4 I would have expected things to take different amounts of times. Perhaps with a totally random layout of data the significant bits to compare on average always appear in the same location for ZOrder regardless of number of interleaved columns? |
5d8bbac
to
436994c
Compare
@szehon-ho Made fixes and added some new benchmarks as well, please take a look when you have time. |
"Cannot have the interleaved ZOrder value use less than 1 byte, %s was set to %s", | ||
MAX_OUTPUT_SIZE_KEY, maxOutputSize); | ||
|
||
return this; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra space
Spark3ZOrderUDF(int numCols, int varTypeSize, int maxOutputSize) { | ||
this.numCols = numCols; | ||
this.varTypeSize = varTypeSize; | ||
this.maxOutputSize = maxOutputSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not even seem used? Should we remove the option for now (unless I missed where it's used)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
woops, yeah I changed this now so that on deserialization (readObject) we set totalOutputBytes to maxOutputBytes if it is larger than maxOutputBytes.
} else if (type instanceof BinaryType) { | ||
return bytesTruncateUDF().apply(column); | ||
} else if (type instanceof BooleanType) { | ||
return bytesTruncateUDF().apply(column); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this right? (We will take the default var size here?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I guess technically we should use the primitive size here, although zordering on a column with only 2 values is probably a losing proposition anyway :)
@@ -1181,6 +1263,35 @@ protected Table createTablePartitioned(int partitions, int files) { | |||
return createTablePartitioned(partitions, files, SCALE, Maps.newHashMap()); | |||
} | |||
|
|||
private Table createTypeTestTable() { | |||
Schema schema = new Schema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still feel a bit more comfortable if we can do add some of the other types, like boolean, binary
private ByteBuffer inputBuffer(int position, int size) { | ||
if (inputBuffers[position] == null) { | ||
// May over allocate on concurrent calls | ||
inputBuffers[position] = ThreadLocal.withInitial(() -> ByteBuffer.allocate(size)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I am still trying to understand why use thread local if we are not expecting multiple threads? I guess my point was, if we are doing it to be mt-safe in case it happens, this particular part does not seem very mt-safe. Unless there is another reason of the thread-local I miss?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this a bit offline, for those interested our lifecycle basically looks like
Driver --- UDF --> Executor -> Core - Task (Thread)
\_____> Core - Task (Thread)
So every task is run in a single thread on the executor, but the tasks are referring to a UDF which is basically a singleton on the Executor JVM. Each task in isolation runs as if it was single threaded but any UDF members need to be protected against MultiThreaded access.
An alternative implementation we could do is to explicitly handle the multithreading with RDD operations. For example something like
rdd.mapPartitions( it -> {
inputBuffer = allocate
outputBuffer = allocate
it.map( row -> ZOrderRow(row, inputBuffer, outputBuffer)
}
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We talked about this a bit on slack and we agreed to shift this to
private transient ThreadLocal<ByteBuffer[]> inputBuffers;
So now the array is allocated once per thread, then we change the elements of that array
VAR_LENGTH_CONTRIBUTION_KEY, varLengthContribution); | ||
|
||
|
||
maxOutputSize = PropertyUtil.propertyAsInt(options, MAX_OUTPUT_SIZE_KEY, DEFAULT_MAX_OUTPUT_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove it for now if not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in the zOrderUDF
See
this.zOrderUDF = new Spark3ZOrderUDF(zOrderColNames.size(), varLengthContribution, maxOutputSize);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I mean, its not used inside the UDF :)
Use Spark UDFs to create a Z-Value column and then invoke a Spark Sort on it. The resultant data is then saved without the Z-Value Column.
Change all primtives to use 8 byte buffers, Types now aligned based on magnitude Perf test still WIP, using new Random generating udfs
bc5f7af
to
46b1a16
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @RussellSpitzer looks mostly good to me just some minor questions left.
Also not very familiar with Parquet UUID, so maybe we can separate out or have someone more familiar look at the new writer?
|
||
@Override | ||
protected void validateOptions() { | ||
// TODO implement ZOrder Strategy in API Module |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didnt get, why do we need to implement in API module to implement this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me remove that, this was to remind me that we need to decide whether this will be defined in API in the future. One of my thoughts is we can eventually change it to just being a type of SortOrder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to
// Ignore SortStrategy validation
} | ||
|
||
byte[] interleaveBits(Seq<byte[]> scalaBinary) { | ||
byte[][] columnsBinary = scala.collection.JavaConverters.seqAsJavaList(scalaBinary) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: is it possible to import?
@@ -299,6 +301,28 @@ public void write(int repetitionLevel, byte[] bytes) { | |||
} | |||
} | |||
|
|||
private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this required for this change? (Couldn't figure out where it's used). If not a critical part of this change, maybe we can separate it out for people more familiar to take a look at this part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No i meant to remove this too, I actually removed some of the other code which makes this work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, just some very small nits and a question, feel free to merge when you are ready
public static final int PRIMITIVE_BUFFER_SIZE = 8; | ||
|
||
private ZOrderByteUtils() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: extra space (other Util has 1 space)
.build(); | ||
|
||
/** | ||
* Controls the amount of bytes interleaved in the ZOrder Algorithm. Default is all bytes being interleaved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For learning, what is the effect if we do not use all bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You get a more corse ordering (only higher order bytes are considered) the effect is different based on the distribution of data in the actual column.
So if for example we had 2 Integer columns and we only used 1 Byte for our output here we would only be considering the first 4 bits (highest order) from either integer. So only number with extermely high magntiude would be effectively interleaved. All other bits would be ignored. So 3 would look equivalent to 4.
|
||
if (!partZOrderCols.isEmpty()) { | ||
LOG.warn("Cannot ZOrder on an Identity partition column as these values are constant within a partition " + | ||
"they will be removed from the ZOrder expression: {}", partZOrderCols); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "they" is a bit abrupt for the middle of the sentence, how about "and"
buffer.put(0, (byte) (value ? -127 : 0)); | ||
return buffer.array(); | ||
}, DataTypes.BinaryType) | ||
.withName("BOOLEAN-LEXICAL-BYTES"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: looks a bit weird, maybe put on previous line like others
|
||
private void increaseOutputSize(int bytes) { | ||
totalOutputBytes += bytes; | ||
if (totalOutputBytes > maxOutputSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: totalOutputBytes = Math.max(totalOutputBytes, maxOutputSize) can fit on one line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me, but I think you mean "min"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private void increaseOutputSize(int bytes) {
totalOutputBytes = Math.min(totalOutputBytes + bytes, maxOutputSize);
}
OK I think after tests pass we are good to go. After this we can start working on more tests and further optimizations to the algorithm but I believe this is enough for folks to start getting their hands dirty. |
Thanks @szehon-ho , @rdblue , @ulysses-you, @jackye1995 and @emkornfield For all your insightful comments and review. Let's be sure to continue optimizing and finding ways to make Iceberg's optimizations more performant! |
Adds initial implementation of a ZOrder File Rewrite Strategy. Allows uses to use a multidimensional sort algorithm for organizing data. (cherry picked from commit f213541)
Closes #3961
Built on #3960