From 65a189c7a1ddceb8ab482ccc60af5350b8da5ea5 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 15 Nov 2019 13:27:30 +0900 Subject: [PATCH] [SPARK-29376][SQL][PYTHON] Upgrade Apache Arrow to version 0.15.1 ### What changes were proposed in this pull request? Upgrade Apache Arrow to version 0.15.1. This includes Java artifacts and increases the minimum required version of PyArrow also. Version 0.12.0 to 0.15.1 includes the following selected fixes/improvements relevant to Spark users: * ARROW-6898 - [Java] Fix potential memory leak in ArrowWriter and several test classes * ARROW-6874 - [Python] Memory leak in Table.to_pandas() when conversion to object dtype * ARROW-5579 - [Java] shade flatbuffer dependency * ARROW-5843 - [Java] Improve the readability and performance of BitVectorHelper#getNullCount * ARROW-5881 - [Java] Provide functionalities to efficiently determine if a validity buffer has completely 1 bits/0 bits * ARROW-5893 - [C++] Remove arrow::Column class from C++ library * ARROW-5970 - [Java] Provide pointer to Arrow buffer * ARROW-6070 - [Java] Avoid creating new schema before IPC sending * ARROW-6279 - [Python] Add Table.slice method or allow slices in \_\_getitem\_\_ * ARROW-6313 - [Format] Tracking for ensuring flatbuffer serialized values are aligned in stream/files. * ARROW-6557 - [Python] Always return pandas.Series from Array/ChunkedArray.to_pandas, propagate field names to Series from RecordBatch, Table * ARROW-2015 - [Java] Use Java Time and Date APIs instead of JodaTime * ARROW-1261 - [Java] Add container type for Map logical type * ARROW-1207 - [C++] Implement Map logical type Changelog can be seen at https://arrow.apache.org/release/0.15.0.html ### Why are the changes needed? Upgrade to get bug fixes, improvements, and maintain compatibility with future versions of PyArrow. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing tests, manually tested with Python 3.7, 3.8 Closes #26133 from BryanCutler/arrow-upgrade-015-SPARK-29376. Authored-by: Bryan Cutler Signed-off-by: HyukjinKwon --- dev/deps/spark-deps-hadoop-2.7 | 7 +++---- dev/deps/spark-deps-hadoop-3.2 | 7 +++---- pom.xml | 4 ++-- python/pyspark/sql/utils.py | 6 +++++- python/setup.py | 2 +- .../spark/sql/execution/arrow/ArrowConverters.scala | 8 ++++---- 6 files changed, 18 insertions(+), 16 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index e6d29d04acbf3..54608d203133c 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -17,9 +17,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar api-asn1-api-1.0.0-M20.jar api-util-1.0.0-M20.jar arpack_combined_all-0.1.jar -arrow-format-0.12.0.jar -arrow-memory-0.12.0.jar -arrow-vector-0.12.0.jar +arrow-format-0.15.1.jar +arrow-memory-0.15.1.jar +arrow-vector-0.15.1.jar audience-annotations-0.5.0.jar automaton-1.11-8.jar avro-1.8.2.jar @@ -83,7 +83,6 @@ hadoop-yarn-server-web-proxy-2.7.4.jar hk2-api-2.5.0.jar hk2-locator-2.5.0.jar hk2-utils-2.5.0.jar -hppc-0.7.2.jar htrace-core-3.1.0-incubating.jar httpclient-4.5.6.jar httpcore-4.4.10.jar diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2 index 8f1e7fe125b9f..917fde61fad1a 100644 --- a/dev/deps/spark-deps-hadoop-3.2 +++ b/dev/deps/spark-deps-hadoop-3.2 @@ -12,9 +12,9 @@ antlr4-runtime-4.7.1.jar aopalliance-1.0.jar aopalliance-repackaged-2.5.0.jar arpack_combined_all-0.1.jar -arrow-format-0.12.0.jar -arrow-memory-0.12.0.jar -arrow-vector-0.12.0.jar +arrow-format-0.15.1.jar +arrow-memory-0.15.1.jar +arrow-vector-0.15.1.jar audience-annotations-0.5.0.jar automaton-1.11-8.jar avro-1.8.2.jar @@ -96,7 +96,6 @@ hive-vector-code-gen-2.3.6.jar hk2-api-2.5.0.jar hk2-locator-2.5.0.jar hk2-utils-2.5.0.jar -hppc-0.7.2.jar htrace-core4-4.1.0-incubating.jar httpclient-4.5.6.jar httpcore-4.4.10.jar diff --git a/pom.xml b/pom.xml index 5110285547ab3..a6a82b3339d08 100644 --- a/pom.xml +++ b/pom.xml @@ -200,9 +200,9 @@ 1.0.0 - 0.12.0 + 0.15.1 ${java.home} diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 83afafdd8b138..4260c06f06060 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -160,9 +160,10 @@ def require_minimum_pyarrow_version(): """ Raise ImportError if minimum version of pyarrow is not installed """ # TODO(HyukjinKwon): Relocate and deduplicate the version specification. - minimum_pyarrow_version = "0.12.1" + minimum_pyarrow_version = "0.15.1" from distutils.version import LooseVersion + import os try: import pyarrow have_arrow = True @@ -174,6 +175,9 @@ def require_minimum_pyarrow_version(): if LooseVersion(pyarrow.__version__) < LooseVersion(minimum_pyarrow_version): raise ImportError("PyArrow >= %s must be installed; however, " "your version was %s." % (minimum_pyarrow_version, pyarrow.__version__)) + if os.environ.get("ARROW_PRE_0_15_IPC_FORMAT", "0") == "1": + raise RuntimeError("Arrow legacy IPC format is not supported in PySpark, " + "please unset ARROW_PRE_0_15_IPC_FORMAT") def require_test_compiled(): diff --git a/python/setup.py b/python/setup.py index 092bdd3f90117..138161ff13b41 100755 --- a/python/setup.py +++ b/python/setup.py @@ -105,7 +105,7 @@ def _supports_symlinks(): # For Arrow, you should also check ./pom.xml and ensure there are no breaking changes in the # binary format protocol with the Java version, see ARROW_HOME/format/* for specifications. _minimum_pandas_version = "0.23.2" -_minimum_pyarrow_version = "0.12.1" +_minimum_pyarrow_version = "0.15.1" try: # We copy the shell script to be under pyspark/python/pyspark so that the launcher scripts diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index 1a6f4acb63521..d1076d9d0156c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -26,7 +26,7 @@ import org.apache.arrow.flatbuf.MessageHeader import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector._ import org.apache.arrow.vector.ipc.{ArrowStreamWriter, ReadChannel, WriteChannel} -import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, MessageSerializer} +import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, IpcOption, MessageSerializer} import org.apache.spark.TaskContext import org.apache.spark.api.java.JavaRDD @@ -64,7 +64,7 @@ private[sql] class ArrowBatchStreamWriter( * End the Arrow stream, does not close output stream. */ def end(): Unit = { - ArrowStreamWriter.writeEndOfStream(writeChannel) + ArrowStreamWriter.writeEndOfStream(writeChannel, new IpcOption) } } @@ -251,8 +251,8 @@ private[sql] object ArrowConverters { // Only care about RecordBatch messages, skip Schema and unsupported Dictionary messages if (msgMetadata.getMessage.headerType() == MessageHeader.RecordBatch) { - // Buffer backed output large enough to hold the complete serialized message - val bbout = new ByteBufferOutputStream(4 + msgMetadata.getMessageLength + bodyLength) + // Buffer backed output large enough to hold 8-byte length + complete serialized message + val bbout = new ByteBufferOutputStream(8 + msgMetadata.getMessageLength + bodyLength) // Write message metadata to ByteBuffer output stream MessageSerializer.writeMessageBuffer(