Skip to content

HDFS-15202 Boost short circuit cache #1884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
ba32adb
Update BlockReaderFactory.java
pustota2009 Mar 7, 2020
9c00b7d
Update HdfsClientConfigKeys.java
pustota2009 Mar 7, 2020
d067bfd
Update DfsClientConf.java
pustota2009 Mar 7, 2020
44748f6
Update TestEnhancedByteBufferAccess.java
pustota2009 Mar 7, 2020
04745b6
Update TestBlockReaderFactory.java
pustota2009 Mar 7, 2020
b30fa2c
Update TestShortCircuitCache.java
pustota2009 Mar 7, 2020
6229013
Update HdfsClientConfigKeys.java
pustota2009 Mar 7, 2020
7ea005a
Merge branch 'trunk' into trunk
pustota2009 Mar 7, 2020
fcb5763
Update ClientContext.java
pustota2009 Mar 7, 2020
ccfff2a
Update ClientContext.java
pustota2009 Mar 7, 2020
ed4c95f
Update ClientContext.java
pustota2009 Mar 8, 2020
ed696e1
Update BlockReaderFactory.java
pustota2009 Mar 8, 2020
81ffc9a
Update DfsClientConf.java
pustota2009 Mar 8, 2020
99ea034
Update ClientContext.java
pustota2009 Mar 8, 2020
b8d8d50
Update DfsClientConf.java
pustota2009 Mar 8, 2020
fc8fbcb
Update ClientContext.java
pustota2009 Mar 8, 2020
2f24c67
Update DfsClientConf.java
pustota2009 Mar 8, 2020
4ec972b
Update DfsClientConf.java
pustota2009 Mar 8, 2020
122ede2
Update BlockReaderFactory.java
pustota2009 Mar 8, 2020
e2fefa8
Update DfsClientConf.java
pustota2009 Mar 8, 2020
7153ae0
Update TestBlockReaderLocal.java
pustota2009 Mar 8, 2020
1ed0c13
Update TestShortCircuitCache.java
pustota2009 Mar 8, 2020
2174a29
Update TestEnhancedByteBufferAccess.java
pustota2009 Mar 8, 2020
6295b16
Update DfsClientConf.java
pustota2009 Mar 9, 2020
2876127
Update hdfs-default.xml
pustota2009 Mar 9, 2020
31692a2
Update ClientContext.java
pustota2009 Mar 9, 2020
3c79f1b
Update BlockReaderFactory.java
pustota2009 Mar 9, 2020
1308ea7
Update DfsClientConf.java
pustota2009 Mar 9, 2020
96b34d6
Update TestEnhancedByteBufferAccess.java
pustota2009 Mar 9, 2020
058b5c7
Update TestEnhancedByteBufferAccess.java
pustota2009 Mar 9, 2020
030ae58
Update TestShortCircuitCache.java
pustota2009 Mar 9, 2020
abd0919
Update TestEnhancedByteBufferAccess.java
pustota2009 Mar 9, 2020
42b9b7c
Update DfsClientConf.java
pustota2009 Mar 9, 2020
ed8bdf6
Update hdfs-default.xml
pustota2009 Mar 28, 2020
5199f67
Update TestShortCircuitCache.java
pustota2009 Apr 4, 2020
ad27cd3
Unit tests for few ShortCircuitCaches added
pustota2009 Apr 5, 2020
7092504
fixed code style
pustota2009 Apr 6, 2020
a6a9d3c
Unit tests added
pustota2009 Apr 6, 2020
8bdea34
Update HdfsClientConfigKeys.java
pustota2009 May 12, 2020
44520e1
Update hdfs-default.xml
pustota2009 May 12, 2020
bbfbf1e
Update ClientContext.java
pustota2009 May 12, 2020
ff3f977
Update ClientContext.java
pustota2009 May 12, 2020
9aa369f
Update ClientContext.java
pustota2009 May 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class ClientContext {
/**
* Caches short-circuit file descriptors, mmap regions.
*/
private final ShortCircuitCache shortCircuitCache;
private final ShortCircuitCache[] shortCircuitCache;

/**
* Caches TCP and UNIX domain sockets for reuse.
Expand Down Expand Up @@ -132,13 +132,23 @@ public class ClientContext {
*/
private DeadNodeDetector deadNodeDetector = null;

/**
* ShortCircuitCache array size.
*/
private final int clientShortCircuitNum;

private ClientContext(String name, DfsClientConf conf,
Configuration config) {
final ShortCircuitConf scConf = conf.getShortCircuitConf();

this.name = name;
this.confString = scConf.confAsString();
this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
this.clientShortCircuitNum = conf.getClientShortCircuitNum();
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
for (int i = 0; i < this.clientShortCircuitNum; i++) {
this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
}

this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
scConf.getSocketCacheExpiry());
this.keyProviderCache = new KeyProviderCache(
Expand Down Expand Up @@ -227,8 +237,13 @@ public String getConfString() {
return confString;
}

// Keeping this deprecated method to avoid runtime issues
public ShortCircuitCache getShortCircuitCache() {
return shortCircuitCache;
return getShortCircuitCache(0);
}

public ShortCircuitCache getShortCircuitCache(long idx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine. But what I realized is Hadoop applications don't really respect the annotation @InterfaceAudience.Private. You really want to gracefully deprecate a public method to avoid runtime issues. That is, keep the original getShortCircuitCache() method and let it call getShortCircuitCache(0);

Copy link
Contributor Author

@pustota2009 pustota2009 May 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is, keep the original getShortCircuitCache() method and let it call

Sure, done

return shortCircuitCache[(int) (idx % clientShortCircuitNum)];
}

public PeerCache getPeerCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public interface HdfsClientConfigKeys {
"dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
60000;
String DFS_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num";
int DFS_SHORT_CIRCUIT_NUM_DEFAULT = 1;
String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
"dfs.client.slow.io.warning.threshold.ms";
long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,8 @@ private BlockReader getBlockReaderLocal() throws IOException {
"giving up on BlockReaderLocal.", this, pathInfo);
return null;
}
ShortCircuitCache cache = clientContext.getShortCircuitCache();
ShortCircuitCache cache =
clientContext.getShortCircuitCache(block.getBlockId());
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
block.getBlockPoolId());
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
Expand Down Expand Up @@ -526,7 +527,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
if (curPeer.fromCache) remainingCacheTries--;
DomainPeer peer = (DomainPeer)curPeer.peer;
Slot slot = null;
ShortCircuitCache cache = clientContext.getShortCircuitCache();
ShortCircuitCache cache =
clientContext.getShortCircuitCache(block.getBlockId());
try {
MutableBoolean usedPeer = new MutableBoolean(false);
slot = cache.allocShmSlot(datanode, peer, usedPeer,
Expand Down Expand Up @@ -581,7 +583,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
*/
private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
Slot slot) throws IOException {
ShortCircuitCache cache = clientContext.getShortCircuitCache();
ShortCircuitCache cache =
clientContext.getShortCircuitCache(block.getBlockId());
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE));
SlotId slotId = slot == null ? null : slot.getSlotId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,15 @@ public class DfsClientConf {
private final long refreshReadBlockLocationsMS;

private final ShortCircuitConf shortCircuitConf;

private final long hedgedReadThresholdMillis;
private final int hedgedReadThreadpoolSize;
private final List<Class<? extends ReplicaAccessorBuilder>>
replicaAccessorBuilderClasses;

private final int stripedReadThreadpoolSize;

private final int clientShortCircuitNum;

private final boolean dataTransferTcpNoDelay;

private final boolean deadNodeDetectionEnabled;
Expand Down Expand Up @@ -290,6 +291,16 @@ public DfsClientConf(Configuration conf) {
leaseHardLimitPeriod =
conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY,
HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000;

clientShortCircuitNum = conf.getInt(
HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM,
HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM_DEFAULT);
Preconditions.checkArgument(clientShortCircuitNum >= 1,
HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM +
"can't be less then 1.");
Preconditions.checkArgument(clientShortCircuitNum <= 5,
HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM +
"can't be more then 5.");
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -595,6 +606,13 @@ public long getSlowIoWarningThresholdMs() {
return slowIoWarningThresholdMs;
}

/**
* @return the clientShortCircuitNum
*/
public int getClientShortCircuitNum() {
return clientShortCircuitNum;
}

/**
* @return the hedgedReadThresholdMillis
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4137,6 +4137,16 @@
</description>
</property>

<property>
<name>dfs.client.short.circuit.num</name>
<value>1</value>
<description>
Number of short-circuit caches. This setting should
be in the range 1 - 5. Lower values will result in lower CPU consumption; higher
values may speed up massive parallel reading files.
</description>
</property>

<property>
<name>dfs.client.read.striped.threadpool.size</name>
<value>18</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public void testZeroCopyMmapCache() throws Exception {
fsIn.close();
fsIn = fs.open(TEST_PATH);
final ShortCircuitCache cache = ClientContext.get(
CONTEXT, conf).getShortCircuitCache();
CONTEXT, conf).getShortCircuitCache(0);
cache.accept(new CountingVisitor(0, 5, 5, 0));
results[0] = fsIn.read(null, BLOCK_SIZE,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Expand Down Expand Up @@ -654,12 +654,12 @@ public void testZeroCopyReadOfCachedData() throws Exception {
BLOCK_SIZE), byteBufferToArray(result2));
fsIn2.releaseBuffer(result2);
fsIn2.close();

// check that the replica is anchored
final ExtendedBlock firstBlock =
DFSTestUtil.getFirstBlock(fs, TEST_PATH);
final ShortCircuitCache cache = ClientContext.get(
CONTEXT, conf).getShortCircuitCache();
CONTEXT, conf).getShortCircuitCache(0);
waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
// Uncache the replica
fs.removeCacheDirective(directiveId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ private void testShortCircuitCacheUnbufferWithDisableInterval(

try (FSDataInputStream in = dfs.open(testFile)) {
Assert.assertEquals(0,
dfs.getClient().getClientContext().getShortCircuitCache()
dfs.getClient().getClientContext().getShortCircuitCache(0)
.getReplicaInfoMapSize());

final byte[] buf = new byte[testFileLen];
Expand All @@ -398,12 +398,12 @@ private void testShortCircuitCacheUnbufferWithDisableInterval(

// Set cache size to 0 so the replica marked evictable by unbuffer
// will be purged immediately.
dfs.getClient().getClientContext().getShortCircuitCache()
dfs.getClient().getClientContext().getShortCircuitCache(0)
.setMaxTotalSize(0);
LOG.info("Unbuffering");
in.unbuffer();
Assert.assertEquals(0,
dfs.getClient().getClientContext().getShortCircuitCache()
dfs.getClient().getClientContext().getShortCircuitCache(0)
.getReplicaInfoMapSize());

DFSTestUtil.appendFile(dfs, testFile, "append more data");
Expand Down Expand Up @@ -432,7 +432,7 @@ private void validateReadResult(final DistributedFileSystem dfs,
final int expectedScrRepMapSize) {
Assert.assertThat(expected, CoreMatchers.is(actual));
Assert.assertEquals(expectedScrRepMapSize,
dfs.getClient().getClientContext().getShortCircuitCache()
dfs.getClient().getClientContext().getShortCircuitCache(0)
.getReplicaInfoMapSize());
}

Expand Down Expand Up @@ -467,7 +467,7 @@ public void testShortCircuitReadFromServerWithoutShm() throws Exception {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
fs.getClient().getClientContext().getShortCircuitCache(0);
final DatanodeInfo datanode = new DatanodeInfoBuilder()
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
.build();
Expand Down Expand Up @@ -516,7 +516,7 @@ public void testShortCircuitReadFromClientWithoutShm() throws Exception {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
fs.getClient().getClientContext().getShortCircuitCache(0);
Assert.assertEquals(null, cache.getDfsClientShmManager());
cluster.shutdown();
sockDir.close();
Expand Down Expand Up @@ -548,7 +548,7 @@ public void testShortCircuitCacheShutdown() throws Exception {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
fs.getClient().getClientContext().getShortCircuitCache(0);
cache.close();
Assert.assertTrue(cache.getDfsClientShmManager().
getDomainSocketWatcher().isClosed());
Expand Down
Loading