Skip to content

MAPREDUCE-7120. Make hadoop consider wildcard host as datalocal #399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public abstract class TaskAttemptImpl implements
private static final Logger LOG =
LoggerFactory.getLogger(TaskAttemptImpl.class);
private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
private static final String ANY = "*";
private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);

protected final JobConf conf;
Expand Down Expand Up @@ -683,7 +684,9 @@ public TaskAttemptImpl(TaskId taskId, int i,
RackResolver.init(conf);
this.dataLocalRacks = new HashSet<String>();
for (String host : this.dataLocalHosts) {
this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
if (!ANY.equals(host)) {
this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
}
}

locality = Locality.OFF_SWITCH;
Expand Down Expand Up @@ -1586,7 +1589,7 @@ private void computeRackAndLocality() {
locality = Locality.OFF_SWITCH;
if (dataLocalHosts.size() > 0) {
String cHost = resolveHost(containerNodeId.getHost());
if (dataLocalHosts.contains(cHost)) {
if (dataLocalHosts.contains(ANY) || dataLocalHosts.contains(cHost)) {
locality = Locality.NODE_LOCAL;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ public class RMContainerAllocator extends RMContainerRequestor
implements ContainerAllocator {

static final Logger LOG = LoggerFactory.getLogger(RMContainerAllocator.class);


static final String ANY = "*";
public static final
float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;

static final Priority PRIORITY_FAST_FAIL_MAP;
static final Priority PRIORITY_REDUCE;
static final Priority PRIORITY_MAP;
Expand Down Expand Up @@ -160,6 +161,8 @@ added to the pending and are ramped up (added to scheduled) based
private int hostLocalAssigned = 0;
private int rackLocalAssigned = 0;
private int lastCompletedTasks = 0;

private boolean isLocationIrrelevant = false;

private boolean recalculateReduceSchedule = false;
private Resource mapResourceRequest = Resources.none();
Expand Down Expand Up @@ -1121,6 +1124,10 @@ void addMap(ContainerRequestEvent event) {
new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
for (String host : event.getHosts()) {
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
if (ANY.equals(host)) {
LOG.info("Location is irrelevant in map hosts");
isLocationIrrelevant = true;
}
if (list == null) {
list = new LinkedList<TaskAttemptId>();
mapsHostMapping.put(host, list);
Expand Down Expand Up @@ -1339,8 +1346,13 @@ else if (PRIORITY_MAP.equals(priority)
|| PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) {
LOG.info("Replacing MAP container " + allocated.getId());
// allocated container was for a map
String host = allocated.getNodeId().getHost();
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
LinkedList<TaskAttemptId> list;
if (isLocationIrrelevant) {
list = mapsHostMapping.get(ANY);
} else {
String host = allocated.getNodeId().getHost();
list = mapsHostMapping.get(host);
}
if (list != null && list.size() > 0) {
TaskAttemptId tId = list.removeLast();
if (maps.containsKey(tId)) {
Expand Down Expand Up @@ -1405,7 +1417,12 @@ private void assignMapsWithLocality(List<Container> allocatedContainers) {
// "if (maps.containsKey(tId))" below should be almost always true.
// hence this while loop would almost always have O(1) complexity
String host = allocated.getNodeId().getHost();
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
LinkedList<TaskAttemptId> list;
if (isLocationIrrelevant){
list = mapsHostMapping.get(ANY);
} else {
list = mapsHostMapping.get(host);
}
while (list != null && list.size() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Host matched to the request list " + host);
Expand Down