Skip to content

Commit dd07c52

Browse files
committed
renaming to ColumnFamilySchema
1 parent 6f53117 commit dd07c52

File tree

13 files changed

+107
-33
lines changed

13 files changed

+107
-33
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.internal.Logging
2020
import org.apache.spark.sql.Encoder
2121
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2222
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA}
23-
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilyMetadataV1, NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors}
23+
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors}
2424
import org.apache.spark.sql.streaming.ListState
2525

2626
/**
@@ -44,7 +44,7 @@ class ListStateImpl[S](
4444

4545
private val stateTypesEncoder = StateTypesEncoder(keySerializer, valEncoder, stateName)
4646

47-
val columnFamilyMetadata = new ColumnFamilyMetadataV1(
47+
val columnFamilyMetadata = new ColumnFamilySchemaV1(
4848
stateName, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), false)
4949
store.createColFamilyIfAbsent(columnFamilyMetadata)
5050

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming
1919
import org.apache.spark.sql.Encoder
2020
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2121
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
22-
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilyMetadataV1, NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors}
22+
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors}
2323
import org.apache.spark.sql.streaming.{ListState, TTLConfig}
2424
import org.apache.spark.util.NextIterator
2525

@@ -52,7 +52,7 @@ class ListStateImplWithTTL[S](
5252
private lazy val ttlExpirationMs =
5353
StateTTL.calculateExpirationTimeForDuration(ttlConfig.ttlDuration, batchTimestampMs)
5454

55-
val columnFamilyMetadata = new ColumnFamilyMetadataV1(
55+
val columnFamilyMetadata = new ColumnFamilySchemaV1(
5656
stateName, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL,
5757
NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), true)
5858
initialize()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.internal.Logging
2020
import org.apache.spark.sql.Encoder
2121
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2222
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA}
23-
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilyMetadataV1, PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors, UnsafeRowPair}
23+
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors, UnsafeRowPair}
2424
import org.apache.spark.sql.streaming.MapState
2525

2626
class MapStateImpl[K, V](
@@ -34,7 +34,7 @@ class MapStateImpl[K, V](
3434
private val stateTypesEncoder = new CompositeKeyStateEncoder(
3535
keySerializer, userKeyEnc, valEncoder, COMPOSITE_KEY_ROW_SCHEMA, stateName)
3636

37-
val columnFamilyMetadata = new ColumnFamilyMetadataV1(
37+
val columnFamilyMetadata = new ColumnFamilySchemaV1(
3838
stateName, COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA,
3939
PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1), false)
4040

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.internal.Logging
2020
import org.apache.spark.sql.Encoder
2121
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2222
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
23-
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilyMetadataV1, PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors}
23+
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors}
2424
import org.apache.spark.sql.streaming.{MapState, TTLConfig}
2525
import org.apache.spark.util.NextIterator
2626

@@ -55,7 +55,7 @@ class MapStateImplWithTTL[K, V](
5555
private val ttlExpirationMs =
5656
StateTTL.calculateExpirationTimeForDuration(ttlConfig.ttlDuration, batchTimestampMs)
5757

58-
val columnFamilyMetadata = new ColumnFamilyMetadataV1(
58+
val columnFamilyMetadata = new ColumnFamilySchemaV1(
5959
stateName, COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL,
6060
PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1), false)
6161
initialize()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ class StatefulProcessorHandleImpl(
8484
timeMode: TimeMode,
8585
isStreaming: Boolean = true,
8686
batchTimestampMs: Option[Long] = None,
87-
metrics: Map[String, SQLMetric] = Map.empty)
87+
metrics: Map[String, SQLMetric] = Map.empty,
88+
existingColFamilies: Map[String, ColumnFamilyAccumulator] = Map.empty)
8889
extends StatefulProcessorHandle with Logging {
8990
import StatefulProcessorHandleState._
9091

@@ -97,8 +98,8 @@ class StatefulProcessorHandleImpl(
9798
private[sql] val stateVariables: util.List[StateVariableInfo] =
9899
new util.ArrayList[StateVariableInfo]()
99100

100-
private[sql] val columnFamilyMetadatas: util.List[ColumnFamilyMetadata] =
101-
new util.ArrayList[ColumnFamilyMetadata]()
101+
private[sql] val columnFamilyMetadatas: util.List[ColumnFamilySchema] =
102+
new util.ArrayList[ColumnFamilySchema]()
102103

103104
private val BATCH_QUERY_ID = "00000000-0000-0000-0000-000000000000"
104105

@@ -168,6 +169,7 @@ class StatefulProcessorHandleImpl(
168169
throw StateStoreErrors.cannotPerformOperationWithInvalidHandleState(operationType,
169170
currState.toString)
170171
}
172+
171173
}
172174

173175
private def verifyTimerOperations(operationType: String): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,22 @@ case class TransformWithStateExec(
9393
"operatorPropsFromExecutor"
9494
)
9595

96+
private lazy val colFamilyAccumulators: Map[String, ColumnFamilyAccumulator] =
97+
initializeColFamilyAccumulators()
98+
99+
private def initializeColFamilyAccumulators(): Map[String, ColumnFamilyAccumulator] = {
100+
val stateCheckpointPath = new Path(stateInfo.get.checkpointLocation,
101+
getStateInfo.operatorId.toString)
102+
val hadoopConf = session.sqlContext.sessionState.newHadoopConf()
103+
104+
val reader = new SchemaV3Reader(stateCheckpointPath, hadoopConf)
105+
106+
reader.read.map { colFamilyMetadata =>
107+
val acc = ColumnFamilyAccumulator.create(colFamilyMetadata, sparkContext)
108+
colFamilyMetadata.asInstanceOf[ColumnFamilySchemaV1].columnFamilyName -> acc
109+
}.toMap
110+
}
111+
96112
/** Metadata of this stateful operator and its states stores. */
97113
override def operatorStateMetadata(): OperatorStateMetadata = {
98114
val info = getStateInfo
@@ -414,6 +430,7 @@ case class TransformWithStateExec(
414430

415431
override protected def doExecute(): RDD[InternalRow] = {
416432
metrics // force lazy init at driver
433+
colFamilyAccumulators
417434

418435
validateTimeMode()
419436

@@ -453,10 +470,7 @@ case class TransformWithStateExec(
453470
}
454471
} else {
455472
if (isStreaming) {
456-
val stateCheckpointPath = new Path(stateInfo.get.checkpointLocation,
457-
getStateInfo.operatorId.toString)
458-
val hadoopConf = session.sqlContext.sessionState.newHadoopConf()
459-
val reader = new SchemaV3Reader(stateCheckpointPath, hadoopConf)
473+
460474
child.execute().mapPartitionsWithStateStore[InternalRow](
461475
getStateInfo,
462476
KEY_ROW_SCHEMA,
@@ -535,7 +549,7 @@ case class TransformWithStateExec(
535549
CompletionIterator[InternalRow, Iterator[InternalRow]] = {
536550
val processorHandle = new StatefulProcessorHandleImpl(
537551
store, getStateInfo.queryRunId, keyEncoder, timeMode,
538-
isStreaming, batchTimestampMs, metrics)
552+
isStreaming, batchTimestampMs, metrics, colFamilyAccumulators)
539553
assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
540554
statefulProcessor.setHandle(processorHandle)
541555
statefulProcessor.init(outputMode, timeMode)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.internal.Logging
2020
import org.apache.spark.sql.Encoder
2121
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2222
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA}
23-
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilyMetadataV1, NoPrefixKeyStateEncoderSpec, StateStore}
23+
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore}
2424
import org.apache.spark.sql.streaming.ValueState
2525

2626
/**
@@ -42,7 +42,7 @@ class ValueStateImpl[S](
4242
private val keySerializer = keyExprEnc.createSerializer()
4343
private val stateTypesEncoder = StateTypesEncoder(keySerializer, valEncoder, stateName)
4444

45-
val columnFamilyMetadata = new ColumnFamilyMetadataV1(
45+
val columnFamilyMetadata = new ColumnFamilySchemaV1(
4646
stateName, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), false)
4747
initialize()
4848

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming
1919
import org.apache.spark.sql.Encoder
2020
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2121
import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
22-
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilyMetadataV1, NoPrefixKeyStateEncoderSpec, StateStore}
22+
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore}
2323
import org.apache.spark.sql.streaming.{TTLConfig, ValueState}
2424

2525
/**
@@ -49,7 +49,7 @@ class ValueStateImplWithTTL[S](
4949
private val ttlExpirationMs =
5050
StateTTL.calculateExpirationTimeForDuration(ttlConfig.ttlDuration, batchTimestampMs)
5151

52-
val columnFamilyMetadata = new ColumnFamilyMetadataV1(
52+
val columnFamilyMetadata = new ColumnFamilySchemaV1(
5353
stateName, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL,
5454
NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), false)
5555
initialize()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
246246
throw StateStoreErrors.unsupportedOperationException("merge", providerName)
247247
}
248248

249-
override def createColFamilyIfAbsent(colFamilyMetadata: ColumnFamilyMetadataV1): Unit = {
249+
override def createColFamilyIfAbsent(colFamilyMetadata: ColumnFamilySchemaV1): Unit = {
250250
throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName)
251251
}
252252
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ private[sql] class RocksDBStateStoreProvider
265265
result
266266
}
267267

268-
override def createColFamilyIfAbsent(colFamilyMetadata: ColumnFamilyMetadataV1): Unit = {
268+
override def createColFamilyIfAbsent(colFamilyMetadata: ColumnFamilySchemaV1): Unit = {
269269
createColFamilyIfAbsent(
270270
colFamilyMetadata.columnFamilyName,
271271
colFamilyMetadata.keySchema,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,23 @@ import org.json4s.JsonDSL._
2626
import org.json4s.jackson.JsonMethods
2727
import org.json4s.jackson.JsonMethods.{compact, render}
2828

29+
import org.apache.spark.SparkContext
2930
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MetadataVersionUtil}
3031
import org.apache.spark.sql.types.StructType
31-
import org.apache.spark.util.Utils
32+
import org.apache.spark.util.{AccumulatorV2, Utils}
3233

33-
sealed trait ColumnFamilyMetadata extends Serializable {
34+
sealed trait ColumnFamilySchema extends Serializable {
3435
def jsonValue: JsonAST.JObject
3536

3637
def json: String
3738
}
3839

39-
case class ColumnFamilyMetadataV1(
40+
case class ColumnFamilySchemaV1(
4041
val columnFamilyName: String,
4142
val keySchema: StructType,
4243
val valueSchema: StructType,
4344
val keyStateEncoderSpec: KeyStateEncoderSpec,
44-
val multipleValuesPerKey: Boolean) extends ColumnFamilyMetadata {
45+
val multipleValuesPerKey: Boolean) extends ColumnFamilySchema {
4546
def jsonValue: JsonAST.JObject = {
4647
("columnFamilyName" -> JString(columnFamilyName)) ~
4748
("keySchema" -> keySchema.json) ~
@@ -55,12 +56,69 @@ case class ColumnFamilyMetadataV1(
5556
}
5657
}
5758

58-
object ColumnFamilyMetadataV1 {
59-
def fromJson(json: List[Map[String, Any]]): List[ColumnFamilyMetadata] = {
59+
class ColumnFamilyAccumulator(
60+
columnFamilyMetadata: ColumnFamilySchema) extends
61+
AccumulatorV2[ColumnFamilySchema, ColumnFamilySchema] {
62+
63+
private var _value: ColumnFamilySchema = columnFamilyMetadata
64+
/**
65+
* Returns if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero
66+
* value; for a list accumulator, Nil is zero value.
67+
*/
68+
override def isZero: Boolean = _value == null
69+
70+
/**
71+
* Creates a new copy of this accumulator.
72+
*/
73+
override def copy(): AccumulatorV2[ColumnFamilySchema, ColumnFamilySchema] = {
74+
new ColumnFamilyAccumulator(_value)
75+
}
76+
77+
/**
78+
* Resets this accumulator, which is zero value. i.e. call `isZero` must
79+
* return true.
80+
*/
81+
override def reset(): Unit = {
82+
_value = null
83+
}
84+
85+
/**
86+
* Takes the inputs and accumulates.
87+
*/
88+
override def add(v: ColumnFamilySchema): Unit = {
89+
_value = v
90+
}
91+
92+
/**
93+
* Merges another same-type accumulator into this one and update its state, i.e. this should be
94+
* merge-in-place.
95+
*/
96+
override def merge(other: AccumulatorV2[ColumnFamilySchema, ColumnFamilySchema]): Unit = {
97+
_value = other.value
98+
}
99+
100+
/**
101+
* Defines the current value of this accumulator
102+
*/
103+
override def value: ColumnFamilySchema = _value
104+
}
105+
106+
object ColumnFamilyAccumulator {
107+
def create(
108+
columnFamilyMetadata: ColumnFamilySchema,
109+
sparkContext: SparkContext): ColumnFamilyAccumulator = {
110+
val acc = new ColumnFamilyAccumulator(columnFamilyMetadata)
111+
acc.register(sparkContext)
112+
acc
113+
}
114+
}
115+
116+
object ColumnFamilySchemaV1 {
117+
def fromJson(json: List[Map[String, Any]]): List[ColumnFamilySchema] = {
60118
assert(json.isInstanceOf[List[_]])
61119

62120
json.map { colFamilyMap =>
63-
new ColumnFamilyMetadataV1(
121+
new ColumnFamilySchemaV1(
64122
colFamilyMap("columnFamilyName").asInstanceOf[String],
65123
StructType.fromString(colFamilyMap("keySchema").asInstanceOf[String]),
66124
StructType.fromString(colFamilyMap("valueSchema").asInstanceOf[String]),
@@ -122,7 +180,7 @@ object SchemaHelper {
122180
private val schemaFilePath = SchemaV3Writer.getSchemaFilePath(stateCheckpointPath)
123181

124182
private lazy val fm = CheckpointFileManager.create(stateCheckpointPath, hadoopConf)
125-
def read: List[ColumnFamilyMetadata] = {
183+
def read: List[ColumnFamilySchema] = {
126184
if (!fm.exists(schemaFilePath)) {
127185
return List.empty
128186
}
@@ -139,7 +197,7 @@ object SchemaHelper {
139197
s"Expected List but got ${deserializedList.getClass}")
140198
val columnFamilyMetadatas = deserializedList.asInstanceOf[List[Map[String, Any]]]
141199
// Extract each JValue to StateVariableInfo
142-
ColumnFamilyMetadataV1.fromJson(columnFamilyMetadatas)
200+
ColumnFamilySchemaV1.fromJson(columnFamilyMetadatas)
143201
}
144202
}
145203

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ trait StateStore extends ReadStateStore {
138138
isInternal: Boolean = false): Unit
139139

140140
def createColFamilyIfAbsent(
141-
colFamilyMetadata: ColumnFamilyMetadataV1
141+
colFamilyMetadata: ColumnFamilySchemaV1
142142
): Unit
143143

144144
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class MemoryStateStore extends StateStore() {
7575
throw new UnsupportedOperationException("Doesn't support multiple values per key")
7676
}
7777

78-
override def createColFamilyIfAbsent(colFamilyMetadata: ColumnFamilyMetadataV1): Unit = {
78+
override def createColFamilyIfAbsent(colFamilyMetadata: ColumnFamilySchemaV1): Unit = {
7979
throw StateStoreErrors.removingColumnFamiliesNotSupported("MemoryStateStoreProvider")
8080
}
8181
}

0 commit comments

Comments
 (0)