Skip to content

Commit ea68863

Browse files
authored
MAPREDUCE-7237. Supports config the shuffle's path cache related parameters (#1397)
1 parent 1d772dc commit ea68863

File tree

2 files changed

+68
-52
lines changed
  • hadoop-mapreduce-project/hadoop-mapreduce-client

2 files changed

+68
-52
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1266,6 +1266,29 @@
12661266
</description>
12671267
</property>
12681268

1269+
<property>
1270+
<name>mapreduce.shuffle.pathcache.max-weight</name>
1271+
<value>10485760</value>
1272+
<description>The maximum total weight of entries the cache may contain.
1273+
</description>
1274+
</property>
1275+
1276+
<property>
1277+
<name>mapreduce.shuffle.pathcache.expire-after-access-minutes</name>
1278+
<value>5</value>
1279+
<description>The length of time after an entry is last accessed that it
1280+
should be automatically removed.
1281+
</description>
1282+
</property>
1283+
1284+
<property>
1285+
<name>mapreduce.shuffle.pathcache.concurrency-level</name>
1286+
<value>16</value>
1287+
<description>Uses the concurrency level to create a fixed number of hashtable
1288+
segments, each governed by its own write lock.
1289+
</description>
1290+
</property>
1291+
12691292
<property>
12701293
<name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name>
12711294
<value>org.apache.hadoop.mapreduce.task.reduce.Shuffle</value>

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java

Lines changed: 45 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,6 @@
139139
import com.google.common.cache.CacheLoader;
140140
import com.google.common.cache.LoadingCache;
141141
import com.google.common.cache.RemovalListener;
142-
import com.google.common.cache.RemovalNotification;
143-
import com.google.common.cache.Weigher;
144142
import com.google.common.util.concurrent.ThreadFactoryBuilder;
145143
import org.apache.hadoop.thirdparty.protobuf.ByteString;
146144

@@ -836,63 +834,58 @@ public ChannelPipeline getPipeline() throws Exception {
836834
// TODO factor out encode/decode to permit binary shuffle
837835
// TODO factor out decode of index to permit alt. models
838836
}
839-
840837
}
841838

842839
class Shuffle extends SimpleChannelUpstreamHandler {
843-
private static final int MAX_WEIGHT = 10 * 1024 * 1024;
844-
private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5;
845-
private static final int ALLOWED_CONCURRENCY = 16;
846-
private final Configuration conf;
840+
private static final String MAX_WEIGHT =
841+
"mapreduce.shuffle.pathcache.max-weight";
842+
private static final int DEFAULT_MAX_WEIGHT = 10 * 1024 * 1024;
843+
844+
private static final String EXPIRE_AFTER_ACCESS_MINUTES =
845+
"mapreduce.shuffle.pathcache.expire-after-access-minutes";
846+
private static final int DEFAULT_EXPIRE_AFTER_ACCESS_MINUTES = 5;
847+
848+
private static final String CONCURRENCY_LEVEL =
849+
"mapreduce.shuffle.pathcache.concurrency-level";
850+
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
851+
847852
private final IndexCache indexCache;
853+
private final
854+
LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache;
855+
848856
private int port;
849-
private final LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache =
850-
CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES,
851-
TimeUnit.MINUTES).softValues().concurrencyLevel(ALLOWED_CONCURRENCY).
852-
removalListener(
853-
new RemovalListener<AttemptPathIdentifier, AttemptPathInfo>() {
854-
@Override
855-
public void onRemoval(RemovalNotification<AttemptPathIdentifier,
856-
AttemptPathInfo> notification) {
857-
if (LOG.isDebugEnabled()) {
858-
LOG.debug("PathCache Eviction: " + notification.getKey() +
859-
", Reason=" + notification.getCause());
860-
}
861-
}
862-
}
863-
).maximumWeight(MAX_WEIGHT).weigher(
864-
new Weigher<AttemptPathIdentifier, AttemptPathInfo>() {
865-
@Override
866-
public int weigh(AttemptPathIdentifier key,
867-
AttemptPathInfo value) {
868-
return key.jobId.length() + key.user.length() +
869-
key.attemptId.length()+
870-
value.indexPath.toString().length() +
871-
value.dataPath.toString().length();
872-
}
873-
}
874-
).build(new CacheLoader<AttemptPathIdentifier, AttemptPathInfo>() {
875-
@Override
876-
public AttemptPathInfo load(AttemptPathIdentifier key) throws
877-
Exception {
878-
String base = getBaseLocation(key.jobId, key.user);
879-
String attemptBase = base + key.attemptId;
880-
Path indexFileName = getAuxiliaryLocalPathHandler()
881-
.getLocalPathForRead(attemptBase + "/" + INDEX_FILE_NAME);
882-
Path mapOutputFileName = getAuxiliaryLocalPathHandler()
883-
.getLocalPathForRead(attemptBase + "/" + DATA_FILE_NAME);
884-
885-
if (LOG.isDebugEnabled()) {
886-
LOG.debug("Loaded : " + key + " via loader");
887-
}
888-
return new AttemptPathInfo(indexFileName, mapOutputFileName);
889-
}
890-
});
891857

892-
public Shuffle(Configuration conf) {
893-
this.conf = conf;
894-
indexCache = new IndexCache(new JobConf(conf));
858+
Shuffle(Configuration conf) {
895859
this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
860+
this.indexCache = new IndexCache(new JobConf(conf));
861+
this.pathCache = CacheBuilder.newBuilder()
862+
.expireAfterAccess(conf.getInt(EXPIRE_AFTER_ACCESS_MINUTES,
863+
DEFAULT_EXPIRE_AFTER_ACCESS_MINUTES), TimeUnit.MINUTES)
864+
.softValues()
865+
.concurrencyLevel(conf.getInt(CONCURRENCY_LEVEL,
866+
DEFAULT_CONCURRENCY_LEVEL))
867+
.removalListener((RemovalListener<AttemptPathIdentifier,
868+
AttemptPathInfo>) notification ->
869+
LOG.debug("PathCache Eviction: {}, Reason={}",
870+
notification.getKey(), notification.getCause()))
871+
.maximumWeight(conf.getInt(MAX_WEIGHT, DEFAULT_MAX_WEIGHT))
872+
.weigher((key, value) -> key.jobId.length() + key.user.length() +
873+
key.attemptId.length()+ value.indexPath.toString().length() +
874+
value.dataPath.toString().length())
875+
.build(new CacheLoader<AttemptPathIdentifier, AttemptPathInfo>() {
876+
@Override
877+
public AttemptPathInfo load(AttemptPathIdentifier key) throws
878+
Exception {
879+
String base = getBaseLocation(key.jobId, key.user);
880+
String attemptBase = base + key.attemptId;
881+
Path indexFileName = getAuxiliaryLocalPathHandler()
882+
.getLocalPathForRead(attemptBase + "/" + INDEX_FILE_NAME);
883+
Path mapOutputFileName = getAuxiliaryLocalPathHandler()
884+
.getLocalPathForRead(attemptBase + "/" + DATA_FILE_NAME);
885+
LOG.debug("Loaded : {} via loader", key);
886+
return new AttemptPathInfo(indexFileName, mapOutputFileName);
887+
}
888+
});
896889
}
897890

898891
public void setPort(int port) {

0 commit comments

Comments
 (0)