Skip to content

Commit

Permalink
[Spark] Use Spark conf to control special char injection in paths (#3616
Browse files Browse the repository at this point in the history
)

#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

Use a static Spark config to select the prefix that will be used for
Parquet and DV files. This is a follow-up from commit 82f7971.

## How was this patch tested?

Test-only changes.

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
cstavr authored Aug 28, 2024
1 parent 103af9d commit a0beb10
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.net.URI
import java.util.{Base64, UUID}

import org.apache.spark.sql.delta.DeltaErrors
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.{Codec, DeltaEncoder, JsonUtils}
import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
Expand All @@ -29,6 +30,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{Column, Encoder}
import org.apache.spark.sql.functions.{concat, lit, when}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -202,7 +204,8 @@ case class DeletionVectorDescriptor(
object DeletionVectorDescriptor {

/** Prefix that is used in all file names generated by deletion vector store. */
val DELETION_VECTOR_FILE_NAME_PREFIX = if (Utils.isTesting) "test%dv%prefix-" else ""
val DELETION_VECTOR_FILE_NAME_PREFIX = SQLConf.get.getConf(DeltaSQLConf.TEST_DV_NAME_PREFIX)

/** String that is used in all file names generated by deletion vector store */
val DELETION_VECTOR_FILE_NAME_CORE = DELETION_VECTOR_FILE_NAME_PREFIX + "deletion_vector"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.delta.files.DeltaFileFormatWriter.PartitionedTaskAttemptContextImpl
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StringType, TimestampType}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -110,7 +111,7 @@ class DelayedCommitProtocol(
}

/** Prefix added in testing mode to all filenames to test special chars that need URL-encoding. */
val FILE_NAME_PREFIX = if (Utils.isTesting) "test%file%prefix-" else ""
val FILE_NAME_PREFIX = SQLConf.get.getConf(DeltaSQLConf.TEST_FILE_NAME_PREFIX)

protected def getFileName(
taskContext: TaskAttemptContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2026,6 +2026,20 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(Utils.isTesting)

val TEST_FILE_NAME_PREFIX =
buildStaticConf("testOnly.dataFileNamePrefix")
.internal()
.doc("[TEST_ONLY]: The prefix to use for the names of all Parquet data files.")
.stringConf
.createWithDefault(if (Utils.isTesting) "test%file%prefix-" else "")

val TEST_DV_NAME_PREFIX =
buildStaticConf("testOnly.dvFileNamePrefix")
.internal()
.doc("[TEST_ONLY]: The prefix to use for the names of all Deletion Vector files.")
.stringConf
.createWithDefault(if (Utils.isTesting) "test%dv%prefix-" else "")

///////////
// UTC TIMESTAMP PARTITION VALUES
///////////////////
Expand Down

0 comments on commit a0beb10

Please sign in to comment.