File tree Expand file tree Collapse file tree 2 files changed +10
-14
lines changed
core/src/main/scala/org/apache/spark/storage
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest Expand file tree Collapse file tree 2 files changed +10
-14
lines changed Original file line number Diff line number Diff line change @@ -24,9 +24,9 @@ import java.nio.channels.Channels
24
24
import java .util .Collections
25
25
import java .util .concurrent .{CompletableFuture , ConcurrentHashMap , TimeUnit }
26
26
27
+ import scala .collection .JavaConverters ._
27
28
import scala .collection .mutable
28
29
import scala .collection .mutable .HashMap
29
- import scala .collection .JavaConverters ._
30
30
import scala .concurrent .{ExecutionContext , Future }
31
31
import scala .concurrent .duration ._
32
32
import scala .reflect .ClassTag
@@ -1819,6 +1819,12 @@ private[spark] class BlockManager(
1819
1819
@ volatile var running = true
1820
1820
override def run (): Unit = {
1821
1821
var migrating : Option [(Int , Long )] = None
1822
+ val storageLevel = StorageLevel (
1823
+ useDisk = true ,
1824
+ useMemory = false ,
1825
+ useOffHeap = false ,
1826
+ deserialized = false ,
1827
+ replication = 1 )
1822
1828
// Once a block fails to transfer to an executor stop trying to transfer more blocks
1823
1829
try {
1824
1830
while (running) {
@@ -1839,25 +1845,15 @@ private[spark] class BlockManager(
1839
1845
peer.executorId,
1840
1846
indexBlockId,
1841
1847
indexBuffer,
1842
- StorageLevel (
1843
- useDisk= true ,
1844
- useMemory= false ,
1845
- useOffHeap= false ,
1846
- deserialized= false ,
1847
- replication= 1 ),
1848
+ storageLevel,
1848
1849
null )// class tag, we don't need for shuffle
1849
1850
blockTransferService.uploadBlockSync(
1850
1851
peer.host,
1851
1852
peer.port,
1852
1853
peer.executorId,
1853
1854
dataBlockId,
1854
1855
dataBuffer,
1855
- StorageLevel (
1856
- useDisk= true ,
1857
- useMemory= false ,
1858
- useOffHeap= false ,
1859
- deserialized= false ,
1860
- replication= 1 ),
1856
+ storageLevel,
1861
1857
null )// class tag, we don't need for shuffle
1862
1858
}
1863
1859
}
Original file line number Diff line number Diff line change @@ -31,7 +31,7 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
31
31
.set(" spark.storage.decommission.enabled" , " true" )
32
32
.set(" spark.storage.decommission.shuffle_blocks" , " true" )
33
33
.set(" spark.storage.decommission.shuffle_blocks" , " true" )
34
- // Ensure we have somewhere to migrate our data too
34
+ // Ensure we have somewhere to migrate our data too
35
35
.set(" spark.executor.instances" , " 3" )
36
36
// The default of 30 seconds is fine, but for testing we just want to get this done fast.
37
37
.set(" spark.storage.decommission.replicationReattemptInterval" , " 1s" )
You can’t perform that action at this time.
0 commit comments