Skip to content

Commit

Permalink
[SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10.0
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Upgrade Apache Arrow to 0.10.0

Version 0.10.0 has a number of bug fixes and improvements with the following pertaining directly to usage in Spark:
 * Allow for adding BinaryType support ARROW-2141
 * Bug fix related to array serialization ARROW-1973
 * Python2 str will be made into an Arrow string instead of bytes ARROW-2101
 * Python bytearrays are supported in as input to pyarrow ARROW-2141
 * Java has common interface for reset to cleanup complex vectors in Spark ArrowWriter ARROW-1962
 * Cleanup pyarrow type equality checks ARROW-2423
 * ArrowStreamWriter should not hold references to ArrowBlocks ARROW-2632, ARROW-2645
 * Improved low level handling of messages for RecordBatch ARROW-2704

## How was this patch tested?

existing tests

Author: Bryan Cutler <cutlerb@gmail.com>

Closes apache#21939 from BryanCutler/arrow-upgrade-010.
  • Loading branch information
BryanCutler committed Aug 15, 2018
1 parent 92fd7f3 commit ed075e1
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 35 deletions.
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
api-asn1-api-1.0.0-M20.jar
api-util-1.0.0-M20.jar
arpack_combined_all-0.1.jar
arrow-format-0.8.0.jar
arrow-memory-0.8.0.jar
arrow-vector-0.8.0.jar
arrow-format-0.10.0.jar
arrow-memory-0.10.0.jar
arrow-vector-0.10.0.jar
automaton-1.11-8.jar
avro-1.8.2.jar
avro-ipc-1.8.2.jar
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
api-asn1-api-1.0.0-M20.jar
api-util-1.0.0-M20.jar
arpack_combined_all-0.1.jar
arrow-format-0.8.0.jar
arrow-memory-0.8.0.jar
arrow-vector-0.8.0.jar
arrow-format-0.10.0.jar
arrow-memory-0.10.0.jar
arrow-vector-0.10.0.jar
automaton-1.11-8.jar
avro-1.8.2.jar
avro-ipc-1.8.2.jar
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-3.1
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ aopalliance-1.0.jar
aopalliance-repackaged-2.4.0-b34.jar
apache-log4j-extras-1.2.17.jar
arpack_combined_all-0.1.jar
arrow-format-0.8.0.jar
arrow-memory-0.8.0.jar
arrow-vector-0.8.0.jar
arrow-format-0.10.0.jar
arrow-memory-0.10.0.jar
arrow-vector-0.10.0.jar
automaton-1.11-8.jar
avro-1.8.2.jar
avro-ipc-1.8.2.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@
If you are changing Arrow version specification, please check ./python/pyspark/sql/utils.py,
./python/run-tests.py and ./python/setup.py too.
-->
<arrow.version>0.8.0</arrow.version>
<arrow.version>0.10.0</arrow.version>

<test.java.home>${java.home}</test.java.home>
<test.exclude.tags></test.exclude.tags>
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,14 @@ def _create_batch(series, timezone):
def create_array(s, t):
mask = s.isnull()
# Ensure timestamp series are in expected form for Spark internal representation
# TODO: maybe don't need None check anymore as of Arrow 0.9.1
if t is not None and pa.types.is_timestamp(t):
s = _check_series_convert_timestamps_internal(s.fillna(0), timezone)
# TODO: need cast after Arrow conversion, ns values cause error with pandas 0.19.2
return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False)
elif t is not None and pa.types.is_string(t) and sys.version < '3':
# TODO: need decode before converting to Arrow in Python 2
# TODO: don't need as of Arrow 0.9.1
return pa.Array.from_pandas(s.apply(
lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t)
elif t is not None and pa.types.is_decimal(t) and \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ public ArrowColumnVector(ValueVector vector) {
} else if (vector instanceof ListVector) {
ListVector listVector = (ListVector) vector;
accessor = new ArrayAccessor(listVector);
} else if (vector instanceof NullableMapVector) {
NullableMapVector mapVector = (NullableMapVector) vector;
accessor = new StructAccessor(mapVector);
} else if (vector instanceof StructVector) {
StructVector structVector = (StructVector) vector;
accessor = new StructAccessor(structVector);

childColumns = new ArrowColumnVector[mapVector.size()];
childColumns = new ArrowColumnVector[structVector.size()];
for (int i = 0; i < childColumns.length; ++i) {
childColumns[i] = new ArrowColumnVector(mapVector.getVectorById(i));
childColumns[i] = new ArrowColumnVector(structVector.getVectorById(i));
}
} else {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -472,7 +472,7 @@ final ColumnarArray getArray(int rowId) {
*/
private static class StructAccessor extends ArrowVectorAccessor {

StructAccessor(NullableMapVector vector) {
StructAccessor(StructVector vector) {
super(vector);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.collection.JavaConverters._

import org.apache.arrow.vector._
import org.apache.arrow.vector.complex._
import org.apache.arrow.vector.types.pojo.ArrowType

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
Expand Down Expand Up @@ -62,7 +61,7 @@ object ArrowWriter {
case (ArrayType(_, _), vector: ListVector) =>
val elementVector = createFieldWriter(vector.getDataVector())
new ArrayWriter(vector, elementVector)
case (StructType(_), vector: NullableMapVector) =>
case (StructType(_), vector: StructVector) =>
val children = (0 until vector.size()).map { ordinal =>
createFieldWriter(vector.getChildByOrdinal(ordinal))
}
Expand Down Expand Up @@ -129,20 +128,7 @@ private[arrow] abstract class ArrowFieldWriter {
}

def reset(): Unit = {
// TODO: reset() should be in a common interface
valueVector match {
case fixedWidthVector: BaseFixedWidthVector => fixedWidthVector.reset()
case variableWidthVector: BaseVariableWidthVector => variableWidthVector.reset()
case listVector: ListVector =>
// Manual "reset" the underlying buffer.
// TODO: When we upgrade to Arrow 0.10.0, we can simply remove this and call
// `listVector.reset()`.
val buffers = listVector.getBuffers(false)
buffers.foreach(buf => buf.setZero(0, buf.capacity()))
listVector.setValueCount(0)
listVector.setLastSet(0)
case _ =>
}
valueVector.reset()
count = 0
}
}
Expand Down Expand Up @@ -323,7 +309,7 @@ private[arrow] class ArrayWriter(
}

private[arrow] class StructWriter(
val valueVector: NullableMapVector,
val valueVector: StructVector,
children: Array[ArrowFieldWriter]) extends ArrowFieldWriter {

override def setNull(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite {
val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
val schema = new StructType().add("int", IntegerType).add("long", LongType)
val vector = ArrowUtils.toArrowField("struct", schema, nullable = false, null)
.createVector(allocator).asInstanceOf[NullableMapVector]
.createVector(allocator).asInstanceOf[StructVector]

vector.allocateNew()
val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector]
Expand Down Expand Up @@ -373,7 +373,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite {
val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
val schema = new StructType().add("int", IntegerType).add("long", LongType)
val vector = ArrowUtils.toArrowField("struct", schema, nullable = true, null)
.createVector(allocator).asInstanceOf[NullableMapVector]
.createVector(allocator).asInstanceOf[StructVector]
vector.allocateNew()
val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector]
val longVector = vector.getChildByOrdinal(1).asInstanceOf[BigIntVector]
Expand Down

0 comments on commit ed075e1

Please sign in to comment.