Skip to content

HDFS-15202 Boost short circuit cache (rebase PR-1884) #2016

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class ClientContext {
/**
* Caches short-circuit file descriptors, mmap regions.
*/
private final ShortCircuitCache shortCircuitCache;
private final ShortCircuitCache[] shortCircuitCache;

/**
* Caches TCP and UNIX domain sockets for reuse.
Expand Down Expand Up @@ -132,13 +132,23 @@ public class ClientContext {
*/
private DeadNodeDetector deadNodeDetector = null;

/**
* ShortCircuitCache array size.
*/
private final int clientShortCircuitNum;

private ClientContext(String name, DfsClientConf conf,
Configuration config) {
final ShortCircuitConf scConf = conf.getShortCircuitConf();

this.name = name;
this.confString = scConf.confAsString();
this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
this.clientShortCircuitNum = conf.getClientShortCircuitNum();
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
for (int i = 0; i < this.clientShortCircuitNum; i++) {
this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
}

this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
scConf.getSocketCacheExpiry());
this.keyProviderCache = new KeyProviderCache(
Expand Down Expand Up @@ -228,7 +238,11 @@ public String getConfString() {
}

public ShortCircuitCache getShortCircuitCache() {
return shortCircuitCache;
return shortCircuitCache[0];
}

public ShortCircuitCache getShortCircuitCache(long idx) {
return shortCircuitCache[(int) (idx % clientShortCircuitNum)];
}

public PeerCache getPeerCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ public interface HdfsClientConfigKeys {
"dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
60000;
String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num";
int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1;
String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
"dfs.client.slow.io.warning.threshold.ms";
long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ private BlockReader getBlockReaderLocal() throws IOException {
"giving up on BlockReaderLocal.", this, pathInfo);
return null;
}
ShortCircuitCache cache = clientContext.getShortCircuitCache();
ShortCircuitCache cache =
clientContext.getShortCircuitCache(block.getBlockId());
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
block.getBlockPoolId());
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
Expand Down Expand Up @@ -527,7 +528,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
if (curPeer.fromCache) remainingCacheTries--;
DomainPeer peer = (DomainPeer)curPeer.peer;
Slot slot = null;
ShortCircuitCache cache = clientContext.getShortCircuitCache();
ShortCircuitCache cache =
clientContext.getShortCircuitCache(block.getBlockId());
try {
MutableBoolean usedPeer = new MutableBoolean(false);
slot = cache.allocShmSlot(datanode, peer, usedPeer,
Expand Down Expand Up @@ -582,7 +584,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
*/
private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
Slot slot) throws IOException {
ShortCircuitCache cache = clientContext.getShortCircuitCache();
ShortCircuitCache cache =
clientContext.getShortCircuitCache(block.getBlockId());
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE));
SlotId slotId = slot == null ? null : slot.getSlotId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public class DfsClientConf {
private final long refreshReadBlockLocationsMS;

private final ShortCircuitConf shortCircuitConf;
private final int clientShortCircuitNum;

private final long hedgedReadThresholdMillis;
private final int hedgedReadThreadpoolSize;
Expand Down Expand Up @@ -272,8 +273,6 @@ public DfsClientConf(Configuration conf) {
HdfsClientConfigKeys.
DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);

shortCircuitConf = new ShortCircuitConf(conf);

hedgedReadThresholdMillis = conf.getLong(
HedgedRead.THRESHOLD_MILLIS_KEY,
HedgedRead.THRESHOLD_MILLIS_DEFAULT);
Expand All @@ -296,6 +295,17 @@ public DfsClientConf(Configuration conf) {
leaseHardLimitPeriod =
conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY,
HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000;

shortCircuitConf = new ShortCircuitConf(conf);
clientShortCircuitNum = conf.getInt(
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT);
Preconditions.checkArgument(clientShortCircuitNum >= 1,
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
"can't be less then 1.");
Preconditions.checkArgument(clientShortCircuitNum <= 5,
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
"can't be more then 5.");
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -601,6 +611,13 @@ public long getSlowIoWarningThresholdMs() {
return slowIoWarningThresholdMs;
}

/*
* @return the clientShortCircuitNum
*/
public int getClientShortCircuitNum() {
return clientShortCircuitNum;
}

/**
* @return the hedgedReadThresholdMillis
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4178,6 +4178,16 @@
</description>
</property>

<property>
<name>dfs.client.short.circuit.num</name>
<value>1</value>
<description>
Number of short-circuit caches. This setting should
be in the range 1 - 5. Lower values will result in lower CPU consumption; higher
values may speed up massive parallel reading files.
</description>
</property>

<property>
<name>dfs.client.read.striped.threadpool.size</name>
<value>18</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public void testZeroCopyMmapCache() throws Exception {
fsIn.close();
fsIn = fs.open(TEST_PATH);
final ShortCircuitCache cache = ClientContext.get(
CONTEXT, conf).getShortCircuitCache();
CONTEXT, conf).getShortCircuitCache(0);
cache.accept(new CountingVisitor(0, 5, 5, 0));
results[0] = fsIn.read(null, BLOCK_SIZE,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Expand Down Expand Up @@ -654,12 +654,12 @@ public void testZeroCopyReadOfCachedData() throws Exception {
BLOCK_SIZE), byteBufferToArray(result2));
fsIn2.releaseBuffer(result2);
fsIn2.close();

// check that the replica is anchored
final ExtendedBlock firstBlock =
DFSTestUtil.getFirstBlock(fs, TEST_PATH);
final ShortCircuitCache cache = ClientContext.get(
CONTEXT, conf).getShortCircuitCache();
CONTEXT, conf).getShortCircuitCache(0);
waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
// Uncache the replica
fs.removeCacheDirective(directiveId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ private void testShortCircuitCacheUnbufferWithDisableInterval(

try (FSDataInputStream in = dfs.open(testFile)) {
Assert.assertEquals(0,
dfs.getClient().getClientContext().getShortCircuitCache()
dfs.getClient().getClientContext().getShortCircuitCache(0)
.getReplicaInfoMapSize());

final byte[] buf = new byte[testFileLen];
Expand All @@ -398,12 +398,12 @@ private void testShortCircuitCacheUnbufferWithDisableInterval(

// Set cache size to 0 so the replica marked evictable by unbuffer
// will be purged immediately.
dfs.getClient().getClientContext().getShortCircuitCache()
dfs.getClient().getClientContext().getShortCircuitCache(0)
.setMaxTotalSize(0);
LOG.info("Unbuffering");
in.unbuffer();
Assert.assertEquals(0,
dfs.getClient().getClientContext().getShortCircuitCache()
dfs.getClient().getClientContext().getShortCircuitCache(0)
.getReplicaInfoMapSize());

DFSTestUtil.appendFile(dfs, testFile, "append more data");
Expand Down Expand Up @@ -432,7 +432,7 @@ private void validateReadResult(final DistributedFileSystem dfs,
final int expectedScrRepMapSize) {
Assert.assertThat(expected, CoreMatchers.is(actual));
Assert.assertEquals(expectedScrRepMapSize,
dfs.getClient().getClientContext().getShortCircuitCache()
dfs.getClient().getClientContext().getShortCircuitCache(0)
.getReplicaInfoMapSize());
}

Expand Down Expand Up @@ -467,7 +467,7 @@ public void testShortCircuitReadFromServerWithoutShm() throws Exception {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
fs.getClient().getClientContext().getShortCircuitCache(0);
final DatanodeInfo datanode = new DatanodeInfoBuilder()
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
.build();
Expand Down Expand Up @@ -516,7 +516,7 @@ public void testShortCircuitReadFromClientWithoutShm() throws Exception {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
fs.getClient().getClientContext().getShortCircuitCache(0);
Assert.assertEquals(null, cache.getDfsClientShmManager());
cluster.shutdown();
sockDir.close();
Expand Down Expand Up @@ -548,7 +548,7 @@ public void testShortCircuitCacheShutdown() throws Exception {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
fs.getClient().getClientContext().getShortCircuitCache(0);
cache.close();
Assert.assertTrue(cache.getDfsClientShmManager().
getDomainSocketWatcher().isClosed());
Expand Down
Loading