@@ -1035,10 +1035,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
1035
1035
}
1036
1036
}
1037
1037
1038
- override def getData (store : StateStore ): Set [((String , Int ), Int )] = {
1039
- store.iterator().map(rowPairToDataPair).toSet
1040
- }
1041
-
1042
1038
override def getDefaultSQLConf (
1043
1039
minDeltasForSnapshot : Int ,
1044
1040
numOfVersToRetainInMemory : Int ): SQLConf = {
@@ -1118,10 +1114,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
1118
1114
filePath.delete()
1119
1115
filePath.createNewFile()
1120
1116
}
1121
-
1122
- override def getLatestVersion (storeProvider : HDFSBackedStateStoreProvider ): Long = {
1123
- if (storeProvider.loadedMaps.isEmpty) 0 else storeProvider.loadedMaps.firstKey()
1124
- }
1125
1117
}
1126
1118
1127
1119
abstract class StateStoreSuiteBase [ProviderClass <: StateStoreProvider ]
@@ -1136,46 +1128,65 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
1136
1128
testWithAllCodec(" get, put, remove, commit, and all data iterator" ) { colFamiliesEnabled =>
1137
1129
tryWithProviderResource(newStoreProvider(colFamiliesEnabled)) { provider =>
1138
1130
// Verify state before starting a new set of updates
1131
+ assert(getLatestData(provider, useColumnFamilies = colFamiliesEnabled).isEmpty)
1139
1132
1140
1133
val store = provider.getStore(0 )
1141
- assert(getData(store).isEmpty)
1142
1134
assert(! store.hasCommitted)
1143
1135
assert(get(store, " a" , 0 ) === None )
1144
1136
assert(store.iterator().isEmpty)
1137
+
1145
1138
// Verify state after updating
1146
1139
put(store, " a" , 0 , 1 )
1147
1140
assert(get(store, " a" , 0 ) === Some (1 ))
1141
+
1148
1142
assert(store.iterator().nonEmpty)
1149
- assert(getLatestVersion(provider) === 0 )
1143
+ assert(getLatestData(provider, useColumnFamilies = colFamiliesEnabled).isEmpty)
1144
+
1150
1145
// Make updates, commit and then verify state
1151
1146
put(store, " b" , 0 , 2 )
1152
1147
put(store, " aa" , 0 , 3 )
1153
1148
remove(store, _._1.startsWith(" a" ))
1154
1149
assert(store.commit() === 1 )
1155
1150
assert(store.metrics.numKeys === 1 )
1151
+
1156
1152
assert(store.hasCommitted)
1157
- val store1 = provider.getStore(1 )
1158
- assert(rowPairsToDataSet(store1.iterator()) === Set (( " b " , 0 ) -> 2 ))
1159
- assert(getData(store1 ) === Set ((" b" , 0 ) -> 2 ))
1160
- store1.commit()
1153
+ assert(rowPairsToDataSet( provider.getStore(1 ).iterator()) === Set (( " b " , 0 ) -> 2 ) )
1154
+ assert(getLatestData(provider,
1155
+ useColumnFamilies = colFamiliesEnabled ) === Set ((" b" , 0 ) -> 2 ))
1156
+
1161
1157
// Trying to get newer versions should fail
1162
1158
var e = intercept[SparkException ] {
1163
- provider.getStore(3 )
1159
+ provider.getStore(2 )
1164
1160
}
1165
1161
assert(e.getMessage.contains(" does not exist" ))
1162
+
1166
1163
e = intercept[SparkException ] {
1167
- getData(provider, 3 , useColumnFamilies = colFamiliesEnabled)
1164
+ getData(provider, 2 , useColumnFamilies = colFamiliesEnabled)
1168
1165
}
1169
1166
assert(e.getMessage.contains(" does not exist" ))
1167
+
1168
+ // New updates to the reloaded store with new version, and does not change old version
1169
+ tryWithProviderResource(newStoreProvider(store.id, colFamiliesEnabled)) { reloadedProvider =>
1170
+ val reloadedStore = reloadedProvider.getStore(1 )
1171
+ put(reloadedStore, " c" , 0 , 4 )
1172
+ assert(reloadedStore.commit() === 2 )
1173
+ assert(rowPairsToDataSet(reloadedProvider.getStore(2 ).iterator()) ===
1174
+ Set ((" b" , 0 ) -> 2 , (" c" , 0 ) -> 4 ))
1175
+ assert(getLatestData(provider, useColumnFamilies = colFamiliesEnabled)
1176
+ === Set ((" b" , 0 ) -> 2 , (" c" , 0 ) -> 4 ))
1177
+ assert(getData(provider, version = 1 , useColumnFamilies = colFamiliesEnabled)
1178
+ === Set ((" b" , 0 ) -> 2 ))
1179
+ }
1170
1180
}
1171
1181
}
1172
1182
1173
1183
testWithAllCodec(" prefix scan" ) { colFamiliesEnabled =>
1174
1184
tryWithProviderResource(newStoreProvider(keySchema, PrefixKeyScanStateEncoderSpec (keySchema, 1 ),
1175
1185
colFamiliesEnabled)) { provider =>
1176
1186
// Verify state before starting a new set of updates
1187
+ assert(getLatestData(provider, useColumnFamilies = false ).isEmpty)
1188
+
1177
1189
var store = provider.getStore(0 )
1178
- assert(getData(store).isEmpty)
1179
1190
1180
1191
def putCompositeKeys (keys : Seq [(String , Int )]): Unit = {
1181
1192
val randomizedKeys = scala.util.Random .shuffle(keys.toList)
@@ -1202,21 +1213,27 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
1202
1213
verifyScan(key1AtVersion0, key2AtVersion0)
1203
1214
1204
1215
assert(store.prefixScan(dataToPrefixKeyRow(" non-exist" )).isEmpty)
1216
+
1205
1217
// committing and loading the version 1 (the version being committed)
1206
1218
store.commit()
1207
1219
store = provider.getStore(1 )
1220
+
1208
1221
// before putting the new key-value pairs, verify prefix scan works for existing keys
1209
1222
verifyScan(key1AtVersion0, key2AtVersion0)
1223
+
1210
1224
val key1AtVersion1 = Seq (" c" , " d" )
1211
1225
val key2AtVersion1 = Seq (4 , 5 , 6 )
1212
1226
val keysAtVersion1 = for (k1 <- key1AtVersion1; k2 <- key2AtVersion1) yield (k1, k2)
1227
+
1213
1228
// put a new key-value pairs, and verify that prefix scan reflects the changes
1214
1229
putCompositeKeys(keysAtVersion1)
1215
1230
verifyScan(Seq (" c" ), Seq (1 , 2 , 3 , 4 , 5 , 6 ))
1216
1231
verifyScan(Seq (" d" ), Seq (4 , 5 , 6 ))
1232
+
1217
1233
// aborting and loading the version 1 again (keysAtVersion1 should be rolled back)
1218
1234
store.abort()
1219
1235
store = provider.getStore(1 )
1236
+
1220
1237
// prefix scan should not reflect the uncommitted changes
1221
1238
verifyScan(key1AtVersion0, key2AtVersion0)
1222
1239
verifyScan(Seq (" d" ), Seq .empty)
@@ -1226,7 +1243,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
1226
1243
testWithAllCodec(s " numKeys metrics " ) { colFamiliesEnabled =>
1227
1244
tryWithProviderResource(newStoreProvider(colFamiliesEnabled)) { provider =>
1228
1245
// Verify state before starting a new set of updates
1229
- // assert(getLatestData(provider, useColumnFamilies = colFamiliesEnabled).isEmpty)
1246
+ assert(getLatestData(provider, useColumnFamilies = colFamiliesEnabled).isEmpty)
1230
1247
1231
1248
val store = provider.getStore(0 )
1232
1249
put(store, " a" , 0 , 1 )
@@ -1254,6 +1271,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
1254
1271
testWithAllCodec(s " removing while iterating " ) { colFamiliesEnabled =>
1255
1272
tryWithProviderResource(newStoreProvider(colFamiliesEnabled)) { provider =>
1256
1273
// Verify state before starting a new set of updates
1274
+ assert(getLatestData(provider, useColumnFamilies = colFamiliesEnabled).isEmpty)
1257
1275
val store = provider.getStore(0 )
1258
1276
put(store, " a" , 0 , 1 )
1259
1277
put(store, " b" , 0 , 2 )
@@ -1271,7 +1289,6 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
1271
1289
tuple => keyRowToData(tuple.key) == (" b" , 0 ) }
1272
1290
filtered2.foreach { tuple => store.remove(tuple.key) }
1273
1291
assert(get(store, " b" , 0 ) === None )
1274
- store.commit()
1275
1292
}
1276
1293
}
1277
1294
@@ -1280,10 +1297,10 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
1280
1297
val store = provider.getStore(0 )
1281
1298
put(store, " a" , 0 , 1 )
1282
1299
store.commit()
1300
+ assert(rowPairsToDataSet(provider.getStore(1 ).iterator()) === Set ((" a" , 0 ) -> 1 ))
1283
1301
1284
1302
// cancelUpdates should not change the data in the files
1285
1303
val store1 = provider.getStore(1 )
1286
- assert(rowPairsToDataSet(store1.iterator()) === Set ((" a" , 0 ) -> 1 ))
1287
1304
put(store1, " b" , 0 , 1 )
1288
1305
store1.abort()
1289
1306
}
@@ -1826,18 +1843,13 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
1826
1843
def getLatestData (storeProvider : ProviderClass ,
1827
1844
useColumnFamilies : Boolean ): Set [((String , Int ), Int )]
1828
1845
1829
- /** Get the latest data referred to by the given provider but not using this provider */
1830
- def getLatestVersion (storeProvider : ProviderClass ): Long
1831
-
1832
1846
/**
1833
1847
* Get a specific version of data referred to by the given provider but not using
1834
1848
* this provider
1835
1849
*/
1836
1850
def getData (storeProvider : ProviderClass , version : Int ,
1837
1851
useColumnFamilies : Boolean ): Set [((String , Int ), Int )]
1838
1852
1839
- def getData (store : StateStore ): Set [((String , Int ), Int )]
1840
-
1841
1853
protected def testQuietly (name : String )(f : => Unit ): Unit = {
1842
1854
test(name) {
1843
1855
quietly {
0 commit comments