Skip to content

Commit

Permalink
[apache#1428] fix(server): allow specifying negative fallback thresho…
Browse files Browse the repository at this point in the history
…ld to avoid event being discarded
  • Loading branch information
zuston committed Jan 10, 2024
1 parent 5d027c0 commit 84d6812
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,7 @@ public class ShuffleServerConf extends RssBaseConf {
public static final ConfigOption<Long> FALLBACK_MAX_FAIL_TIMES =
ConfigOptions.key("rss.server.hybrid.storage.fallback.max.fail.times")
.longType()
.checkValue(
ConfigUtils.NON_NEGATIVE_LONG_VALIDATOR, " fallback times must be non-negative")
.defaultValue(0L)
.defaultValue(-1L)
.withDescription("For hybrid storage, fail times exceed the number, will switch storage")
.withDeprecatedKeys("rss.server.multistorage.fallback.max.fail.times");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,62 @@
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class HybridStorageManagerTest {

/**
* this tests the fallback strategy when encountering the local storage is invalid.
* 1. When specifying the fallback max fail time = 0, the event will be discarded
* 2. When specifying the fallback max fail time < 0, the event will be taken by Hadoop Storage.
*/
@Test
public void fallbackTestWhenLocalStorageCorrupted() {
ShuffleServerConf conf = new ShuffleServerConf();
conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 2000L);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
conf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
conf.setString(ShuffleServerConf.HYBRID_STORAGE_MANAGER_SELECTOR_CLASS,
"org.apache.uniffle.server.storage.hybrid.HugePartitionSensitiveStorageManagerSelector");
conf.setString(ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
"org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy");

// case1: fallback to hadoop storage when fallback_max_fail_time = -1
conf.setLong(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES, -1);
HybridStorageManager manager = new HybridStorageManager(conf);

LocalStorageManager localStorageManager = (LocalStorageManager) manager.getWarmStorageManager();
localStorageManager.getStorages().get(0).markCorrupted();

String remoteStorage = "test";
String appId = "selectStorageManagerWithSelectorAndFallbackStrategy_appId";
manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
List<ShufflePartitionedBlock> blocks =
Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, (byte[]) null));
ShuffleDataFlushEvent event =
new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null, null);
assertTrue((manager.selectStorage(event) instanceof HadoopStorage));

// case2: fallback invalid when fallback_max_fail_time = 0
conf.setLong(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES, 0);
manager = new HybridStorageManager(conf);

localStorageManager = (LocalStorageManager) manager.getWarmStorageManager();
localStorageManager.getStorages().get(0).markCorrupted();

event =
new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null, null);
manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
assertNull(manager.selectStorage(event));
// and the event failed at the first time, it will fallback to hadoop storage
event.increaseRetryTimes();
manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
assertTrue((manager.selectStorage(event) instanceof HadoopStorage));
}

@Test
public void selectStorageManagerTest() {
ShuffleServerConf conf = new ShuffleServerConf();
Expand Down

0 comments on commit 84d6812

Please sign in to comment.