Skip to content

OperatorStateMetadata with HDFSMetadataLog #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceErrors
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.PATH
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, OperatorStateMetadataReader, OperatorStateMetadataV1}
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, OperatorStateMetadataLog}
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, OperatorStateMetadataV1, OperatorStateMetadataV2}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand All @@ -46,6 +46,7 @@ case class StateMetadataTableEntry(
numPartitions: Int,
minBatchId: Long,
maxBatchId: Long,
operatorPropertiesJson: String,
numColsPrefixKey: Int) {
def toRow(): InternalRow = {
new GenericInternalRow(
Expand All @@ -55,7 +56,9 @@ case class StateMetadataTableEntry(
numPartitions,
minBatchId,
maxBatchId,
numColsPrefixKey))
UTF8String.fromString(operatorPropertiesJson),
numColsPrefixKey
))
}
}

Expand All @@ -68,6 +71,7 @@ object StateMetadataTableEntry {
.add("numPartitions", IntegerType)
.add("minBatchId", LongType)
.add("maxBatchId", LongType)
.add("operatorProperties", StringType)
}
}

Expand Down Expand Up @@ -193,21 +197,28 @@ class StateMetadataPartitionReader(
val opIds = fileManager
.list(stateDir, pathNameCanBeParsedAsLongFilter).map(f => pathToLong(f.getPath)).sorted
opIds.map { opId =>
new OperatorStateMetadataReader(new Path(stateDir, opId.toString), hadoopConf).read()
}
val dirLocation = new Path(stateDir, opId.toString)
val metadataFilePath = OperatorStateMetadata.metadataFilePath(dirLocation)
val log = new OperatorStateMetadataLog(hadoopConf, metadataFilePath.toString)
log.getLatest()
}.filter(_.isDefined).map(_.get._2)
}

private[state] lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {
allOperatorStateMetadata.flatMap { operatorStateMetadata =>
require(operatorStateMetadata.version == 1)
val operatorStateMetadataV1 = operatorStateMetadata.asInstanceOf[OperatorStateMetadataV1]
operatorStateMetadataV1.stateStoreInfo.map { stateStoreMetadata =>
StateMetadataTableEntry(operatorStateMetadataV1.operatorInfo.operatorId,
operatorStateMetadataV1.operatorInfo.operatorName,
require(operatorStateMetadata.version == 1 || operatorStateMetadata.version == 2)
val operatorProperties = operatorStateMetadata match {
case _: OperatorStateMetadataV1 => ""
case v2: OperatorStateMetadataV2 => v2.operatorPropertiesJson
}
operatorStateMetadata.stateStoreInfo.map { stateStoreMetadata =>
StateMetadataTableEntry(operatorStateMetadata.operatorInfo.operatorId,
operatorStateMetadata.operatorInfo.operatorName,
stateStoreMetadata.storeName,
stateStoreMetadata.numPartitions,
if (batchIds.nonEmpty) batchIds.head else -1,
if (batchIds.nonEmpty) batchIds.last else -1,
operatorProperties,
stateStoreMetadata.numColsPrefixKey
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ trait AsyncLogPurge extends Logging {

protected def purge(threshold: Long): Unit

protected def purgeOldest(): Unit

protected lazy val useAsyncPurge: Boolean = sparkSession.conf.get(SQLConf.ASYNC_LOG_PURGE)

protected def purgeAsync(batchId: Long): Unit = {
if (purgeRunning.compareAndSet(false, true)) {
asyncPurgeExecutorService.execute(() => {
try {
purge(batchId - minLogEntriesToMaintain)
purgeOldest()
} catch {
case throwable: Throwable =>
logError("Encountered error while performing async log purge", throwable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.Serialization
Expand All @@ -48,9 +49,21 @@ import org.apache.spark.util.ArrayImplicits._
* Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
* files in a directory always shows the latest files.
*/
class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String)
class HDFSMetadataLog[T <: AnyRef : ClassTag](
hadoopConf: Configuration,
path: String,
val metadataCacheEnabled: Boolean = false)
extends MetadataLog[T] with Logging {

def this(sparkSession: SparkSession, path: String) = {
this(
sparkSession.sessionState.newHadoopConf(),
path,
metadataCacheEnabled = sparkSession.sessionState.conf.getConf(
SQLConf.STREAMING_METADATA_CACHE_ENABLED)
)
}

private implicit val formats: Formats = Serialization.formats(NoTypeHints)

/** Needed to serialize type T into JSON when using Jackson */
Expand All @@ -64,15 +77,12 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
val metadataPath = new Path(path)

protected val fileManager =
CheckpointFileManager.create(metadataPath, sparkSession.sessionState.newHadoopConf())
CheckpointFileManager.create(metadataPath, hadoopConf)

if (!fileManager.exists(metadataPath)) {
fileManager.mkdirs(metadataPath)
}

protected val metadataCacheEnabled: Boolean
= sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)

/**
* Cache the latest two batches. [[StreamExecution]] usually just accesses the latest two batches
* when committing offsets, this cache will save some file system operations.
Expand Down Expand Up @@ -325,6 +335,19 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
}
}

def purgeOldest(minEntriesToMaintain: Int): Unit = {
val batchIds = listBatches.sorted
if (batchIds.length > minEntriesToMaintain) {
val filesToDelete = batchIds.take(batchIds.length - minEntriesToMaintain)
filesToDelete.foreach { batchId =>
val path = batchIdToPath(batchId)
fileManager.delete(path)
if (metadataCacheEnabled) batchCache.remove(batchId)
logTrace(s"Removed metadata log file: $path")
}
}
}

/** List the available batches on file system. */
protected def listBatches: Array[Long] = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadat
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasWithStateExec
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataV1, OperatorStateMetadataWriter}
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataV1, OperatorStateMetadataV2}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.util.{SerializableConfiguration, Utils}
Expand Down Expand Up @@ -187,17 +187,6 @@ class IncrementalExecution(
}
}

object WriteStatefulOperatorMetadataRule extends SparkPlanPartialRule {
override val rule: PartialFunction[SparkPlan, SparkPlan] = {
case stateStoreWriter: StateStoreWriter if isFirstBatch =>
val metadata = stateStoreWriter.operatorStateMetadata()
val metadataWriter = new OperatorStateMetadataWriter(new Path(
checkpointLocation, stateStoreWriter.getStateInfo.operatorId.toString), hadoopConf)
metadataWriter.write(metadata)
stateStoreWriter
}
}

object StateOpIdRule extends SparkPlanPartialRule {
override val rule: PartialFunction[SparkPlan, SparkPlan] = {
case StateStoreSaveExec(keys, None, None, None, None, stateFormatVersion,
Expand Down Expand Up @@ -433,11 +422,11 @@ class IncrementalExecution(
new Path(checkpointLocation).getParent.toString,
new SerializableConfiguration(hadoopConf))
val opMetadataList = reader.allOperatorStateMetadata
ret = opMetadataList.map { operatorMetadata =>
val metadataInfoV1 = operatorMetadata
.asInstanceOf[OperatorStateMetadataV1]
.operatorInfo
metadataInfoV1.operatorId -> metadataInfoV1.operatorName
ret = opMetadataList.map {
case OperatorStateMetadataV1(operatorInfo, _) =>
operatorInfo.operatorId -> operatorInfo.operatorName
case OperatorStateMetadataV2(operatorInfo, _, _) =>
operatorInfo.operatorId -> operatorInfo.operatorName
}.toMap
} catch {
case e: Exception =>
Expand Down Expand Up @@ -473,7 +462,6 @@ class IncrementalExecution(
}
// The rule doesn't change the plan but cause the side effect that metadata is written
// in the checkpoint directory of stateful operator.
planWithStateOpId transform WriteStatefulOperatorMetadataRule.rule
simulateWatermarkPropagation(planWithStateOpId)
planWithStateOpId transform WatermarkPropagationRule.rule
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ class MicroBatchExecution(
purgeAsync(execCtx.batchId)
} else {
purge(execCtx.batchId - minLogEntriesToMaintain)
purgeOldest()
}
}
}
Expand Down Expand Up @@ -902,6 +903,23 @@ class MicroBatchExecution(
if (!commitLog.add(execCtx.batchId, CommitMetadata(watermarkTracker.currentWatermark))) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
}
val shouldWriteMetadatas = execCtx.previousContext match {
case Some(prevCtx)
if prevCtx.executionPlan.runId == execCtx.executionPlan.runId =>
false
case _ => true
}
if (shouldWriteMetadatas) {
execCtx.executionPlan.executedPlan.collect {
case s: StateStoreWriter =>
val metadata = s.operatorStateMetadata()
val id = metadata.operatorInfo.operatorId
val metadataFile = operatorStateMetadataLogs(id)
if (!metadataFile.add(execCtx.batchId, metadata)) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
}
}
}
}
committedOffsets ++= execCtx.endOffsets
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import java.io.{BufferedReader, InputStream, InputStreamReader, OutputStream}
import java.nio.charset.StandardCharsets
import java.nio.charset.StandardCharsets._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FSDataOutputStream

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, OperatorStateMetadataV1, OperatorStateMetadataV2}
import org.apache.spark.sql.internal.SQLConf


class OperatorStateMetadataLog(
hadoopConf: Configuration,
path: String,
metadataCacheEnabled: Boolean = false)
extends HDFSMetadataLog[OperatorStateMetadata](hadoopConf, path, metadataCacheEnabled) {

def this(sparkSession: SparkSession, path: String) = {
this(
sparkSession.sessionState.newHadoopConf(),
path,
metadataCacheEnabled = sparkSession.sessionState.conf.getConf(
SQLConf.STREAMING_METADATA_CACHE_ENABLED)
)
}

override protected def serialize(metadata: OperatorStateMetadata, out: OutputStream): Unit = {
val fsDataOutputStream = out.asInstanceOf[FSDataOutputStream]
fsDataOutputStream.write(s"v${metadata.version}\n".getBytes(StandardCharsets.UTF_8))
metadata.version match {
case 1 =>
OperatorStateMetadataV1.serialize(fsDataOutputStream, metadata)
case 2 =>
OperatorStateMetadataV2.serialize(fsDataOutputStream, metadata)
}
}

override protected def deserialize(in: InputStream): OperatorStateMetadata = {
// called inside a try-finally where the underlying stream is closed in the caller
// create buffered reader from input stream
val bufferedReader = new BufferedReader(new InputStreamReader(in, UTF_8))
// read first line for version number, in the format "v{version}"
val version = bufferedReader.readLine()
version match {
case "v1" => OperatorStateMetadataV1.deserialize(bufferedReader)
case "v2" => OperatorStateMetadataV2.deserialize(bufferedReader)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming

import org.json4s.{DefaultFormats, Formats, JValue}
import org.json4s.JsonAST.{JBool, JString}
import org.json4s.JsonDSL._

// Enum to store the types of state variables we support
sealed trait StateVariableType

case object ValueState extends StateVariableType
case object ListState extends StateVariableType
case object MapState extends StateVariableType

// This object is used to convert the state type from string to the corresponding enum
object StateVariableType {
def withName(name: String): StateVariableType = name match {
case "ValueState" => ValueState
case "ListState" => ListState
case "MapState" => MapState
case _ => throw new IllegalArgumentException(s"Unknown state type: $name")
}
}

// This class is used to store the information about a state variable.
// It is stored in operatorProperties for the TransformWithStateExec operator
// to be able to validate that the State Variables are the same across restarts.
class StateVariableInfo(
val stateName: String,
val stateType: StateVariableType,
val isTtlEnabled: Boolean
) {
def jsonValue: JValue = {
("stateName" -> JString(stateName)) ~
("stateType" -> JString(stateType.toString)) ~
("isTtlEnabled" -> JBool(isTtlEnabled))
}
}

// This object is used to convert the state variable information
// from JSON to a list of StateVariableInfo
object StateVariableInfo {
implicit val formats: Formats = DefaultFormats
def fromJson(json: Any): List[StateVariableInfo] = {
assert(json.isInstanceOf[List[_]], s"Expected List but got ${json.getClass}")
val stateVariables = json.asInstanceOf[List[Map[String, Any]]]
// Extract each JValue to StateVariableInfo
stateVariables.map { stateVariable =>
new StateVariableInfo(
stateVariable("stateName").asInstanceOf[String],
StateVariableType.withName(stateVariable("stateType").asInstanceOf[String]),
stateVariable("isTtlEnabled").asInstanceOf[Boolean]
)
}
}
}
Loading
Loading