Skip to content

Commit

Permalink
[DELTA-OSS-EXTERNAL] Adding support for GCS
Browse files Browse the repository at this point in the history
Adding support for Google Cloud Storage(GCS) as Delta Storage by introducing GcsLogStore.

This PR addresses [issue delta-io#294]. File creation is an all-or-nothing approach to achieve atomicity and uses Gcs [preconditions]to avoid race conditions among multiple writers/drivers. This implementation relies on gcs-connector to provide necessary `FileSystem` implementations. This has been tested on a Google Dataproc cluster.

#### GcsLogStore requirements

1. spark.delta.logStore.class=org.apache.spark.sql.delta.storage.GcsLogStore
2. Include gcs-connector in classpath. The Cloud Storage connector is automatically installed on Dataproc clusters.

#### Usage

```
TABLE_LOCATION = 'gs://ranuvikram-test/test/delta-table'

# Write data to table.
data = spark.range(5, 10)
data.write.format("delta").mode("append").save(TABLE_LOCATION)

# Read data from table.
df = spark.read.format("delta").load(TABLE_LOCATION)
df.show()
```
: delta-io#294: https://cloud.google.com/storage/docs/generations-preconditions#_Preconditions

Closes delta-io#560

Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Ranu Vikram <ranu010101@users.noreply.github.com>

#22070 is resolved by tdas/o9ixtoaw.

GitOrigin-RevId: 0a1ce1d4407637d7697b93a25d8fd6be3efe2f6d
  • Loading branch information
ranu010101 authored and tdas committed May 19, 2021
1 parent 9a7338c commit 90606ff
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 2 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Delta Lake ACID guarantees are predicated on the atomicity and durability guaran
2. **Mutual exclusion**: Only one writer must be able to create (or rename) a file at the final destination.
3. **Consistent listing**: Once a file has been written in a directory, all future listings for that directory must return that file.

Given that storage systems do not necessarily provide all of these guarantees out-of-the-box, Delta Lake transactional operations typically go through the [LogStore API](https://github.com/delta-io/delta/blob/master/src/main/scala/org/apache/spark/sql/delta/storage/LogStore.scala) instead of accessing the storage system directly. We can plug in custom `LogStore` implementations in order to provide the above guarantees for different storage systems. Delta Lake has built-in `LogStore` implementations for HDFS, Amazon S3, Azure and OCI (Oracle Cloud Infrastructure) storage services. Please see [Delta Lake Storage Configuration](https://docs.delta.io/latest/delta-storage.html) for more details. If you are interested in adding a custom `LogStore` implementation for your storage system, you can start discussions in the community mailing group.
Given that storage systems do not necessarily provide all of these guarantees out-of-the-box, Delta Lake transactional operations typically go through the [LogStore API](https://github.com/delta-io/delta/blob/master/src/main/scala/org/apache/spark/sql/delta/storage/LogStore.scala) instead of accessing the storage system directly. We can plug in custom `LogStore` implementations in order to provide the above guarantees for different storage systems. Delta Lake has built-in `LogStore` implementations for HDFS, Amazon S3, Azure, Google Cloud Storage, Oracle Cloud Infrastructure, and IBM Cloud Object Storage. Please see [Delta Lake Storage Configuration](https://docs.delta.io/latest/delta-storage.html) for more details. If you are interested in adding a custom `LogStore` implementation for your storage system, you can start discussions in the community mailing group.

As an optimization, storage systems can also allow _partial listing of a directory, given a start marker_. Delta Lake can use this ability to efficiently discover the latest version of a table, without listing all of the files in the transaction log.

Expand All @@ -103,7 +103,7 @@ We use [GitHub Issues](https://github.com/delta-io/delta/issues) to track commun
# Contributing
We welcome contributions to Delta Lake. See our [CONTRIBUTING.md](https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md) for more details.

We also adhere to the [Delta Lake Code of Conduct](https://github.com/delta-io/delta/blob/master/CODE_OF_CONDUCT.md)
We also adhere to the [Delta Lake Code of Conduct](https://github.com/delta-io/delta/blob/master/CODE_OF_CONDUCT.md).

# License
Apache License 2.0, see [LICENSE](https://github.com/delta-io/delta/blob/master/LICENSE.txt).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (2020) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.storage

import java.io.{IOException, _}
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.FileAlreadyExistsException

import com.google.common.base.Throwables
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging

/**
* The [[LogStore]] implementation for GCS, which uses gcs-connector to
* provide the necessary atomic and durability guarantees:
*
* 1. Atomic Visibility: Read/read-after-metadata-update/delete are strongly
* consistent for GCS.
*
* 2. Consistent Listing: GCS guarantees strong consistency for both object and
* bucket listing operations.
* https://cloud.google.com/storage/docs/consistency
*
* 3. Mutual Exclusion: Preconditions are used to handle race conditions.
*
* Regarding file creation, this implementation:
* - Opens a stream to write to GCS otherwise.
* - Throws [[FileAlreadyExistsException]] if file exists and overwrite is false.
* - Assumes file writing to be all-or-nothing, irrespective of overwrite option.
*/
class GCSLogStore(sparkConf: SparkConf, defaultHadoopConf: Configuration)
extends HadoopFileSystemLogStore(sparkConf, defaultHadoopConf) with Logging {

val preconditionFailedExceptionMessage = "412 Precondition Failed"

def write(path: Path, actions: Iterator[String], overwrite: Boolean = false): Unit = {
val fs = path.getFileSystem(getHadoopConfiguration)

// This is needed for the tests to throw error with local file system.
if (fs.isInstanceOf[LocalFileSystem] && !overwrite && fs.exists(path)) {
throw new FileAlreadyExistsException(path.toString)
}

try {
// If overwrite=false and path already exists, gcs-connector will throw
// org.apache.hadoop.fs.FileAlreadyExistsException after fs.create is invoked.
// This should be mapped to java.nio.file.FileAlreadyExistsException.
val stream = fs.create(path, overwrite)
try {
actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write)
} finally {
stream.close()
}
} catch {
case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
throw new FileAlreadyExistsException(path.toString).initCause(e)
// GCS uses preconditions to handle race conditions for multiple writers.
// If path gets created between fs.create and stream.close by an external
// agent or race conditions. Then this block will execute.
// Reference: https://cloud.google.com/storage/docs/generations-preconditions
case e: IOException if isPreconditionFailure(e) =>
if (!overwrite) {
throw new FileAlreadyExistsException(path.toString).initCause(e)
}
}
}

private def isPreconditionFailure(x: Throwable): Boolean = {
Throwables.getCausalChain(x)
.stream()
.filter(p => p != null)
.filter(p => p.getMessage != null)
.filter(p => p.getMessage.contains(preconditionFailedExceptionMessage))
.findFirst
.isPresent;
}

override def invalidateCache(): Unit = {}

override def isPartialWriteVisible(path: Path): Boolean = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (2020) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.storage._

class GCSLogStoreSuite extends LogStoreSuiteBase {

override val logStoreClassName: String = classOf[GCSLogStore].getName

testHadoopConf(
expectedErrMsg = ".*No FileSystem for scheme.*fake.*",
"fs.fake.impl" -> classOf[FakeFileSystem].getName,
"fs.fake.impl.disable.cache" -> "true")

protected def shouldUseRenameToWriteCheckpoint: Boolean = false
}

0 comments on commit 90606ff

Please sign in to comment.