Skip to content

Commit

Permalink
[Sharing]Use bytes length for file size in DeltaSharingLogFileStatus (
Browse files Browse the repository at this point in the history
…#3432)

## Description
Use bytes length for file size in DeltaSharingLogFileStatus, to match
the actual size of the bytes in SeekableByteArrayInputStream, this is to
avoid the length difference caused by non utf-8 characters.

## How was this patch tested?
Unit Test
  • Loading branch information
linzhou-db authored Jul 29, 2024
1 parent 6495cba commit e1dd541
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.delta.sharing.spark

import java.io.{ByteArrayInputStream, FileNotFoundException}
import java.net.{URI, URLDecoder, URLEncoder}
import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, Builder}
Expand Down Expand Up @@ -65,14 +66,16 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem with Logging
val arrayBuilder = Array.newBuilder[Byte]
while (iterator.hasNext) {
val actionJsonStr = iterator.next()
arrayBuilder ++= actionJsonStr.getBytes()
arrayBuilder ++= actionJsonStr.getBytes(StandardCharsets.UTF_8)
}
// We still have to load the full content of a delta log file in memory to serve them.
// This still exposes the risk of OOM.
new FSDataInputStream(new SeekableByteArrayInputStream(arrayBuilder.result()))
} else {
val content = getBlockAndReleaseLockHelper[String](f, None)
new FSDataInputStream(new SeekableByteArrayInputStream(content.getBytes()))
new FSDataInputStream(new SeekableByteArrayInputStream(
content.getBytes(StandardCharsets.UTF_8)
))
}
}

Expand Down Expand Up @@ -622,7 +625,9 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging {
minVersion,
ArrayBuffer[String]()
) += protocolAndMetadataStr
versionToJsonLogSize(minVersion) += protocolAndMetadataStr.length
versionToJsonLogSize(minVersion) += protocolAndMetadataStr.getBytes(
StandardCharsets.UTF_8
).length
numFileActionsInMinVersion = versionToDeltaSharingFileActions
.getOrElseUpdate(minVersion, ArrayBuffer[model.DeltaSharingFileAction]())
.size
Expand All @@ -636,7 +641,7 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging {
version,
ArrayBuffer[String]()
) += metadataStr
versionToJsonLogSize(version) += metadataStr.length
versionToJsonLogSize(version) += metadataStr.getBytes(StandardCharsets.UTF_8).length
}
}
// Write file actions to the delta log json file.
Expand Down Expand Up @@ -669,7 +674,7 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging {
version,
ArrayBuffer[String]()
) += actionJsonStr
versionToJsonLogSize(version) += actionJsonStr.length
versionToJsonLogSize(version) += actionJsonStr.getBytes(StandardCharsets.UTF_8).length

// 3. process expiration timestamp
if (fileAction.expirationTimestamp != null) {
Expand Down Expand Up @@ -766,11 +771,11 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging {
)
case protocol: model.DeltaSharingProtocol =>
val protocolJsonStr = protocol.deltaProtocol.json + "\n"
jsonLogSize += protocolJsonStr.length
jsonLogSize += protocolJsonStr.getBytes(StandardCharsets.UTF_8).length
jsonLogSeq += protocolJsonStr
case metadata: model.DeltaSharingMetadata =>
val metadataJsonStr = metadata.deltaMetadata.json + "\n"
jsonLogSize += metadataJsonStr.length
jsonLogSize += metadataJsonStr.getBytes(StandardCharsets.UTF_8).length
jsonLogSeq += metadataJsonStr
case _ =>
throw new IllegalStateException(
Expand Down Expand Up @@ -800,7 +805,7 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging {

// 2. prepare json log content.
val actionJsonStr = getActionWithDeltaSharingPath(fileAction, customTablePath) + "\n"
jsonLogSize += actionJsonStr.length
jsonLogSize += actionJsonStr.getBytes(StandardCharsets.UTF_8).length
jsonLogSeq += actionJsonStr

// 3. process expiration timestamp
Expand Down Expand Up @@ -869,7 +874,7 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging {
val fileStatusSeq = Seq(
DeltaSharingLogFileStatus(
path = jsonFilePath,
size = jsonLogStr.length,
size = jsonLogStr.getBytes(StandardCharsets.UTF_8).length,
modificationTime = 0L
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,114 @@ class DeltaFormatSharingSourceSuite
}
}

test("restart works sharing with special chars") {
withTempDirs { (inputDir, outputDir, checkpointDir) =>
val deltaTableName = "delta_table_restart_special"
withTable(deltaTableName) {
// scalastyle:off nonascii
sql(s"""CREATE TABLE $deltaTableName (`第一列` STRING) USING DELTA""".stripMargin)
val sharedTableName = "shared_streaming_table_special"
prepareMockedClientMetadata(deltaTableName, sharedTableName)
val profileFile = prepareProfileFile(inputDir)
val tablePath = profileFile.getCanonicalPath + s"#share1.default.$sharedTableName"

withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
def InsertToDeltaTable(values: String): Unit = {
sql(s"INSERT INTO $deltaTableName VALUES $values")
}

// TODO: check testStream() function helper
def processAllAvailableInStream(): Unit = {
val q = spark.readStream
.format("deltaSharing")
.option("responseFormat", "delta")
.load(tablePath)
.filter($"第一列" contains "keep")
.writeStream
.format("delta")
.option("checkpointLocation", checkpointDir.toString)
.start(outputDir.toString)
// scalastyle:on nonascii

try {
q.processAllAvailable()
} finally {
q.stop()
}
}

// Able to stream snapshot at version 1.
InsertToDeltaTable("""("keep1"), ("keep2"), ("drop1")""")
prepareMockedClientAndFileSystemResult(
deltaTable = deltaTableName,
sharedTable = sharedTableName,
versionAsOf = Some(1L)
)
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)
processAllAvailableInStream()
checkAnswer(
spark.read.format("delta").load(outputDir.getCanonicalPath),
Seq("keep1", "keep2").toDF()
)

// No new data, so restart will not process any new data.
processAllAvailableInStream()
checkAnswer(
spark.read.format("delta").load(outputDir.getCanonicalPath),
Seq("keep1", "keep2").toDF()
)

// Able to stream new data at version 2.
InsertToDeltaTable("""("keep3"), ("keep4"), ("drop2")""")
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)
prepareMockedClientAndFileSystemResultForStreaming(
deltaTableName,
sharedTableName,
2,
2
)
processAllAvailableInStream()
checkAnswer(
spark.read.format("delta").load(outputDir.getCanonicalPath),
Seq("keep1", "keep2", "keep3", "keep4").toDF()
)

sql(s"""OPTIMIZE $deltaTableName""")
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)
prepareMockedClientAndFileSystemResultForStreaming(
deltaTableName,
sharedTableName,
2,
3
)
// Optimize doesn't produce new data, so restart will not process any new data.
processAllAvailableInStream()
checkAnswer(
spark.read.format("delta").load(outputDir.getCanonicalPath),
Seq("keep1", "keep2", "keep3", "keep4").toDF()
)

// Able to stream new data at version 3.
InsertToDeltaTable("""("keep5"), ("keep6"), ("drop3")""")
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)
prepareMockedClientAndFileSystemResultForStreaming(
deltaTableName,
sharedTableName,
3,
4
)

processAllAvailableInStream()
checkAnswer(
spark.read.format("delta").load(outputDir.getCanonicalPath),
Seq("keep1", "keep2", "keep3", "keep4", "keep5", "keep6").toDF()
)
assertBlocksAreCleanedUp()
}
}
}
}

test("streaming works with deletes on basic table") {
withTempDir { inputDir =>
val deltaTableName = "delta_table_deletes"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,112 @@ trait DeltaSharingDataSourceDeltaSuiteBase
}
}

test("DeltaSharingDataSource able to read data with special chars") {
withTempDir { tempDir =>
val deltaTableName = "delta_table_special"
withTable(deltaTableName) {
// scalastyle:off nonascii
sql(s"""CREATE TABLE $deltaTableName (`第一列` INT, c2 STRING)
|USING DELTA PARTITIONED BY (c2)
|""".stripMargin)
// The table operations take about 6~10 seconds.
for (i <- 0 to 99) {
val iteration = s"iteration $i"
val valuesBuilder = Seq.newBuilder[String]
for (j <- 0 to 99) {
valuesBuilder += s"""(${i * 10 + j}, "$iteration")"""
}
sql(s"INSERT INTO $deltaTableName VALUES ${valuesBuilder.result().mkString(",")}")
}

val sharedTableName = "shared_table_more"
prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName)
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)

val expectedSchema: StructType = new StructType()
.add("第一列", IntegerType)
.add("c2", StringType)
// scalastyle:on nonascii
val expected = spark.read.format("delta").table(deltaTableName)

def test(tablePath: String): Unit = {
assert(
expectedSchema == spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.load(tablePath)
.schema
)
val df =
spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath)
checkAnswer(df, expected)
}

withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
val profileFile = prepareProfileFile(tempDir)
test(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName")
}
}
}
}

test("DeltaSharingDataSource able to read cdf with special chars") {
withTempDir { tempDir =>
val deltaTableName = "delta_table_cdf_special"
withTable(deltaTableName) {
// scalastyle:off nonascii
sql(s"""CREATE TABLE $deltaTableName (`第一列` INT, c2 STRING)
|USING DELTA PARTITIONED BY (c2)
|TBLPROPERTIES (delta.enableChangeDataFeed = true)
|""".stripMargin)
// The table operations take about 20~30 seconds.
for (i <- 0 to 9) {
val iteration = s"iteration $i"
val valuesBuilder = Seq.newBuilder[String]
for (j <- 0 to 49) {
valuesBuilder += s"""(${i * 10 + j}, "$iteration")"""
}
sql(s"INSERT INTO $deltaTableName VALUES ${valuesBuilder.result().mkString(",")}")
sql(s"""UPDATE $deltaTableName SET `第一列` = `第一列` + 100 where c2 = "${iteration}"""")
// scalastyle:on nonascii
sql(s"""DELETE FROM $deltaTableName where c2 = "${iteration}"""")
}

val sharedTableName = "shard_table_cdf_special"
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)
Seq(0, 10, 20, 30).foreach { startingVersion =>
prepareMockedClientAndFileSystemResultForCdf(
deltaTableName,
sharedTableName,
startingVersion
)

val expected = spark.read
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", startingVersion)
.table(deltaTableName)

def test(tablePath: String): Unit = {
val df = spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", startingVersion)
.load(tablePath)
checkAnswer(df, expected)
assert(df.count() > 0)
}

withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
val profileFile = prepareProfileFile(tempDir)
test(profileFile.getCanonicalPath + s"#share1.default.$sharedTableName")
}
}
}
}
}

/**
* deletion vector tests
*/
Expand Down

0 comments on commit e1dd541

Please sign in to comment.