Skip to content

Upgrade Arrow to 0.15 #683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Suggests:
testthat,
e1071,
survival,
arrow
arrow (>= 0.15.1)
Collate:
'schema.R'
'generics.R'
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) {
for (rdf_slice in rdf_slices) {
batch <- arrow::record_batch(rdf_slice)
if (is.null(stream_writer)) {
stream <- arrow::FileOutputStream(fileName)
stream <- arrow::FileOutputStream$create(fileName)
schema <- batch$schema
stream_writer <- arrow::RecordBatchStreamWriter(stream, schema)
stream_writer <- arrow::RecordBatchStreamWriter$create(stream, schema)
}

stream_writer$write_batch(batch)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ readDeserializeInArrow <- function(inputCon) {
# for now.
dataLen <- readInt(inputCon)
arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big")
batches <- arrow::RecordBatchStreamReader(arrowData)$batches()
batches <- arrow::RecordBatchStreamReader$create(arrowData)$batches()

if (useAsTibble) {
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))
Expand Down
7 changes: 3 additions & 4 deletions dev/deps/spark-deps-hadoop-palantir
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
api-asn1-api-1.0.0-M20.jar
api-util-1.0.0-M20.jar
arpack_combined_all-0.1.jar
arrow-format-0.12.0.jar
arrow-memory-0.12.0.jar
arrow-vector-0.12.0.jar
arrow-format-0.15.1.jar
arrow-memory-0.15.1.jar
arrow-vector-0.15.1.jar
audience-annotations-0.7.0.jar
automaton-1.11-8.jar
avro-1.8.2.jar
Expand Down Expand Up @@ -87,7 +87,6 @@ hibernate-validator-5.2.4.Final.jar
hk2-api-2.5.0-b32.jar
hk2-locator-2.5.0-b32.jar
hk2-utils-2.5.0-b32.jar
hppc-0.7.2.jar
htrace-core4-4.1.0-incubating.jar
httpclient-4.5.6.jar
httpcore-4.4.10.jar
Expand Down
13 changes: 4 additions & 9 deletions docs/sparkr.md
Original file line number Diff line number Diff line change
Expand Up @@ -648,20 +648,15 @@ Apache Arrow is an in-memory columnar data format that is used in Spark to effic

## Ensure Arrow Installed

Arrow R library is available on CRAN as of [ARROW-3204](https://issues.apache.org/jira/browse/ARROW-3204). It can be installed as below.
Arrow R library is available on CRAN and it can be installed as below.

```bash
Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
```
Please refer [the official documentation of Apache Arrow](https://arrow.apache.org/docs/r/) for more detials.

If you need to install old versions, it should be installed directly from Github. You can use `remotes::install_github` as below.

```bash
Rscript -e 'remotes::install_github("apache/arrow@apache-arrow-0.12.1", subdir = "r")'
```

`apache-arrow-0.12.1` is a version tag that can be checked in [Arrow at Github](https://github.com/apache/arrow/releases). You must ensure that Arrow R package is installed and available on all cluster nodes.
The current supported minimum version is 0.12.1; however, this might change between the minor releases since Arrow optimization in SparkR is experimental.
Note that you must ensure that Arrow R package is installed and available on all cluster nodes.
The current supported minimum version is 0.15.1; however, this might change between the minor releases since Arrow optimization in SparkR is experimental.

## Enabling for Conversion to/from R DataFrame, `dapply` and `gapply`

Expand Down
17 changes: 17 additions & 0 deletions docs/sql-pyspark-pandas-with-arrow.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,20 @@ Note that a standard UDF (non-Pandas) will load timestamp data as Python datetim
different than a Pandas timestamp. It is recommended to use Pandas time series functionality when
working with timestamps in `pandas_udf`s to get the best performance, see
[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details.

### Compatibiliy Setting for PyArrow >= 0.15.0 and Spark 2.3.x, 2.4.x

Since Arrow 0.15.0, a change in the binary IPC format requires an environment variable to be
compatible with previous versions of Arrow <= 0.14.1. This is only necessary to do for PySpark
users with versions 2.3.x and 2.4.x that have manually upgraded PyArrow to 0.15.0. The following
can be added to `conf/spark-env.sh` to use the legacy Arrow IPC format:

```
ARROW_PRE_0_15_IPC_FORMAT=1
```

This will instruct PyArrow >= 0.15.0 to use the legacy IPC format with the older Arrow Java that
is in Spark 2.3.x and 2.4.x. Not setting this environment variable will lead to a similar error as
described in [SPARK-29367](https://issues.apache.org/jira/browse/SPARK-29367) when running
`pandas_udf`s or `toPandas()` with Arrow enabled. More information about the Arrow IPC change can
be read on the Arrow 0.15.0 release [blog](http://arrow.apache.org/blog/2019/10/06/0.15.0-release/#columnar-streaming-protocol-change-since-0140).
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@
<commons-crypto.version>1.0.0</commons-crypto.version>
<!--
If you are changing Arrow version specification, please check ./python/pyspark/sql/utils.py,
./python/run-tests.py and ./python/setup.py too.
and ./python/setup.py too.
-->
<arrow.version>0.12.0</arrow.version>
<arrow.version>0.15.1</arrow.version>

<!-- Async shuffle upload plugin dependency versions -->
<safe-logging.version>1.13.0</safe-logging.version>
Expand Down
6 changes: 5 additions & 1 deletion python/pyspark/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,10 @@ def require_minimum_pyarrow_version():
""" Raise ImportError if minimum version of pyarrow is not installed
"""
# TODO(HyukjinKwon): Relocate and deduplicate the version specification.
minimum_pyarrow_version = "0.12.1"
minimum_pyarrow_version = "0.15.1"

from distutils.version import LooseVersion
import os
try:
import pyarrow
have_arrow = True
Expand All @@ -150,6 +151,9 @@ def require_minimum_pyarrow_version():
if LooseVersion(pyarrow.__version__) < LooseVersion(minimum_pyarrow_version):
raise ImportError("PyArrow >= %s must be installed; however, "
"your version was %s." % (minimum_pyarrow_version, pyarrow.__version__))
if os.environ.get("ARROW_PRE_0_15_IPC_FORMAT", "0") == "1":
raise RuntimeError("Arrow legacy IPC format is not supported in PySpark, "
"please unset ARROW_PRE_0_15_IPC_FORMAT")


def require_test_compiled():
Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def _supports_symlinks():
# For Arrow, you should also check ./pom.xml and ensure there are no breaking changes in the
# binary format protocol with the Java version, see ARROW_HOME/format/* for specifications.
_minimum_pandas_version = "0.23.2"
_minimum_pyarrow_version = "0.12.1"
_minimum_pyarrow_version = "0.15.1"

try:
# We copy the shell script to be under pyspark/python/pyspark so that the launcher scripts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.arrow.flatbuf.MessageHeader
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector._
import org.apache.arrow.vector.ipc.{ArrowStreamWriter, ReadChannel, WriteChannel}
import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, MessageSerializer}
import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, IpcOption, MessageSerializer}

import org.apache.spark.TaskContext
import org.apache.spark.api.java.JavaRDD
Expand Down Expand Up @@ -63,7 +63,7 @@ private[sql] class ArrowBatchStreamWriter(
* End the Arrow stream, does not close output stream.
*/
def end(): Unit = {
ArrowStreamWriter.writeEndOfStream(writeChannel)
ArrowStreamWriter.writeEndOfStream(writeChannel, new IpcOption)
}
}

Expand Down Expand Up @@ -250,8 +250,8 @@ private[sql] object ArrowConverters {
// Only care about RecordBatch messages, skip Schema and unsupported Dictionary messages
if (msgMetadata.getMessage.headerType() == MessageHeader.RecordBatch) {

// Buffer backed output large enough to hold the complete serialized message
val bbout = new ByteBufferOutputStream(4 + msgMetadata.getMessageLength + bodyLength)
// Buffer backed output large enough to hold 8-byte length + complete serialized message
val bbout = new ByteBufferOutputStream(8 + msgMetadata.getMessageLength + bodyLength)

// Write message metadata to ByteBuffer output stream
MessageSerializer.writeMessageBuffer(
Expand Down