Skip to content

Commit eb51e33

Browse files
committed
Merge branch 'trunk' of github.com:apache/hadoop into HADOOP-15984-trunk-2
2 parents 92a1a87 + 1c15987 commit eb51e33

File tree

4 files changed

+151
-46
lines changed

4 files changed

+151
-46
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java

Lines changed: 58 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
4141
private static final Logger LOG =
4242
LoggerFactory.getLogger(FadvisedFileRegion.class);
4343

44+
private final Object closeLock = new Object();
4445
private final boolean manageOsCache;
4546
private final int readaheadLength;
4647
private final ReadaheadPool readaheadPool;
@@ -51,12 +52,12 @@ public class FadvisedFileRegion extends DefaultFileRegion {
5152
private final int shuffleBufferSize;
5253
private final boolean shuffleTransferToAllowed;
5354
private final FileChannel fileChannel;
54-
55-
private ReadaheadRequest readaheadRequest;
55+
56+
private volatile ReadaheadRequest readaheadRequest;
5657

5758
public FadvisedFileRegion(RandomAccessFile file, long position, long count,
5859
boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
59-
String identifier, int shuffleBufferSize,
60+
String identifier, int shuffleBufferSize,
6061
boolean shuffleTransferToAllowed) throws IOException {
6162
super(file.getChannel(), position, count);
6263
this.manageOsCache = manageOsCache;
@@ -73,97 +74,110 @@ public FadvisedFileRegion(RandomAccessFile file, long position, long count,
7374

7475
@Override
7576
public long transferTo(WritableByteChannel target, long position)
76-
throws IOException {
77-
if (readaheadPool != null && readaheadLength > 0) {
78-
readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
79-
position() + position, readaheadLength,
80-
position() + count(), readaheadRequest);
77+
throws IOException {
78+
synchronized (closeLock) {
79+
if (fd.valid()) {
80+
if (readaheadPool != null && readaheadLength > 0) {
81+
readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
82+
position() + position, readaheadLength,
83+
position() + count(), readaheadRequest);
84+
}
85+
86+
if(this.shuffleTransferToAllowed) {
87+
return super.transferTo(target, position);
88+
} else {
89+
return customShuffleTransfer(target, position);
90+
}
91+
} else {
92+
return 0L;
93+
}
8194
}
82-
83-
if(this.shuffleTransferToAllowed) {
84-
return super.transferTo(target, position);
85-
} else {
86-
return customShuffleTransfer(target, position);
87-
}
95+
8896
}
8997

9098
/**
91-
* This method transfers data using local buffer. It transfers data from
92-
* a disk to a local buffer in memory, and then it transfers data from the
99+
* This method transfers data using local buffer. It transfers data from
100+
* a disk to a local buffer in memory, and then it transfers data from the
93101
* buffer to the target. This is used only if transferTo is disallowed in
94-
* the configuration file. super.TransferTo does not perform well on Windows
95-
* due to a small IO request generated. customShuffleTransfer can control
96-
* the size of the IO requests by changing the size of the intermediate
102+
* the configuration file. super.TransferTo does not perform well on Windows
103+
* due to a small IO request generated. customShuffleTransfer can control
104+
* the size of the IO requests by changing the size of the intermediate
97105
* buffer.
98106
*/
99107
@VisibleForTesting
100108
long customShuffleTransfer(WritableByteChannel target, long position)
101-
throws IOException {
109+
throws IOException {
102110
long actualCount = this.count - position;
103111
if (actualCount < 0 || position < 0) {
104112
throw new IllegalArgumentException(
105-
"position out of range: " + position +
106-
" (expected: 0 - " + (this.count - 1) + ')');
113+
"position out of range: " + position +
114+
" (expected: 0 - " + (this.count - 1) + ')');
107115
}
108116
if (actualCount == 0) {
109117
return 0L;
110118
}
111-
119+
112120
long trans = actualCount;
113121
int readSize;
114122
ByteBuffer byteBuffer = ByteBuffer.allocate(
115-
Math.min(
116-
this.shuffleBufferSize,
117-
trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
118-
123+
Math.min(
124+
this.shuffleBufferSize,
125+
trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
126+
119127
while(trans > 0L &&
120-
(readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
128+
(readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
121129
//adjust counters and buffer limit
122130
if(readSize < trans) {
123131
trans -= readSize;
124132
position += readSize;
125133
byteBuffer.flip();
126134
} else {
127-
//We can read more than we need if the actualCount is not multiple
135+
//We can read more than we need if the actualCount is not multiple
128136
//of the byteBuffer size and file is big enough. In that case we cannot
129137
//use flip method but we need to set buffer limit manually to trans.
130138
byteBuffer.limit((int)trans);
131139
byteBuffer.position(0);
132-
position += trans;
140+
position += trans;
133141
trans = 0;
134142
}
135-
143+
136144
//write data to the target
137145
while(byteBuffer.hasRemaining()) {
138146
target.write(byteBuffer);
139147
}
140-
148+
141149
byteBuffer.clear();
142150
}
143-
151+
144152
return actualCount - trans;
145153
}
146154

147-
155+
148156
@Override
149157
protected void deallocate() {
150-
if (readaheadRequest != null) {
151-
readaheadRequest.cancel();
158+
synchronized (closeLock) {
159+
if (readaheadRequest != null) {
160+
readaheadRequest.cancel();
161+
readaheadRequest = null;
162+
}
163+
super.deallocate();
152164
}
153-
super.deallocate();
154165
}
155-
166+
156167
/**
157168
* Call when the transfer completes successfully so we can advise the OS that
158169
* we don't need the region to be cached anymore.
159170
*/
160171
public void transferSuccessful() {
161-
if (manageOsCache && count() > 0) {
162-
try {
163-
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
164-
fd, position(), count(), POSIX_FADV_DONTNEED);
165-
} catch (Throwable t) {
166-
LOG.warn("Failed to manage OS cache for " + identifier, t);
172+
synchronized (closeLock) {
173+
if (fd.valid() && manageOsCache && count() > 0) {
174+
try {
175+
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
176+
fd, position(), count(), POSIX_FADV_DONTNEED);
177+
} catch (Throwable t) {
178+
LOG.warn("Failed to manage OS cache for " + identifier +
179+
" fd " + fd, t);
180+
}
167181
}
168182
}
169183
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1056,7 +1056,7 @@ private void copyPlacementQueueToSubmissionContext(
10561056
LOG.info("Placed application with ID " + context.getApplicationId() +
10571057
" in queue: " + placementContext.getQueue() +
10581058
", original submission queue was: " + context.getQueue());
1059-
context.setQueue(placementContext.getQueue());
1059+
context.setQueue(placementContext.getFullQueuePath());
10601060
}
10611061
}
10621062

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ public void setUp() throws IOException {
283283
setupDispatcher(rmContext, conf);
284284
}
285285

286-
private static PlacementManager createMockPlacementManager(
286+
public static PlacementManager createMockPlacementManager(
287287
String userRegex, String placementQueue, String placementParentQueue
288288
) throws YarnException {
289289
PlacementManager placementMgr = mock(PlacementManager.class);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1651,6 +1651,97 @@ public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception {
16511651
assertUnmanagedAMQueueMetrics(qm2, 1, 0, 0, 1);
16521652
}
16531653

1654+
// Test behavior of an app if two same name leaf queue with different queuePath
1655+
// during work preserving rm restart with %specified mapping Placement Rule.
1656+
// Test case does following:
1657+
//1. Submit an apps to queue root.joe.test.
1658+
//2. While the applications is running, restart the rm and
1659+
// check whether the app submitted to the queue it was submitted initially.
1660+
//3. Verify that application running successfully.
1661+
@Test(timeout = 60000)
1662+
public void testQueueRecoveryOnRMWorkPreservingRestart() throws Exception {
1663+
if (getSchedulerType() != SchedulerType.CAPACITY) {
1664+
return;
1665+
}
1666+
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
1667+
1668+
csConf.setQueues(
1669+
CapacitySchedulerConfiguration.ROOT, new String[] {"default", "joe", "john"});
1670+
csConf.setCapacity(
1671+
CapacitySchedulerConfiguration.ROOT + "." + "joe", 25);
1672+
csConf.setCapacity(
1673+
CapacitySchedulerConfiguration.ROOT + "." + "john", 25);
1674+
csConf.setCapacity(
1675+
CapacitySchedulerConfiguration.ROOT + "." + "default", 50);
1676+
1677+
final String q1 = CapacitySchedulerConfiguration.ROOT + "." + "joe";
1678+
final String q2 = CapacitySchedulerConfiguration.ROOT + "." + "john";
1679+
csConf.setQueues(q1, new String[] {"test"});
1680+
csConf.setQueues(q2, new String[] {"test"});
1681+
csConf.setCapacity(
1682+
CapacitySchedulerConfiguration.ROOT + "." + "joe.test", 100);
1683+
csConf.setCapacity(
1684+
CapacitySchedulerConfiguration.ROOT + "." + "john.test", 100);
1685+
1686+
csConf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON,
1687+
"{\"rules\" : [{\"type\": \"user\", \"policy\" : \"specified\", " +
1688+
"\"fallbackResult\" : \"skip\", \"matches\" : \"*\"}]}");
1689+
1690+
// start RM
1691+
rm1 = new MockRM(csConf);
1692+
rm1.start();
1693+
MockMemoryRMStateStore memStore =
1694+
(MockMemoryRMStateStore) rm1.getRMStateStore();
1695+
MockNM nm1 =
1696+
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
1697+
nm1.registerNode();
1698+
1699+
RMContext newMockRMContext = rm1.getRMContext();
1700+
newMockRMContext.setQueuePlacementManager(TestAppManager.createMockPlacementManager(
1701+
"user1|user2", "test", "root.joe"));
1702+
1703+
MockRMAppSubmissionData data =
1704+
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm1)
1705+
.withAppName("app")
1706+
.withQueue("root.joe.test")
1707+
.withUser("user1")
1708+
.withAcls(null)
1709+
.build();
1710+
1711+
RMApp app = MockRMAppSubmitter.submit(rm1, data);
1712+
MockAM am = MockRM.launchAndRegisterAM(app, rm1, nm1);
1713+
rm1.waitForState(app.getApplicationId(), RMAppState.RUNNING);
1714+
1715+
MockRM rm2 = new MockRM(csConf, memStore) {
1716+
@Override
1717+
protected RMAppManager createRMAppManager() {
1718+
return new RMAppManager(this.rmContext, this.scheduler,
1719+
this.masterService, this.applicationACLsManager, conf) {
1720+
@Override
1721+
ApplicationPlacementContext placeApplication(
1722+
PlacementManager placementManager,
1723+
ApplicationSubmissionContext context, String user,
1724+
boolean isRecovery) throws YarnException {
1725+
return super.placeApplication(
1726+
newMockRMContext.getQueuePlacementManager(), context, user, isRecovery);
1727+
}
1728+
};
1729+
}
1730+
};
1731+
1732+
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
1733+
rm2.start();
1734+
RMApp recoveredApp0 =
1735+
rm2.getRMContext().getRMApps().get(app.getApplicationId());
1736+
1737+
rm2.waitForState(recoveredApp0.getApplicationId(), RMAppState.ACCEPTED);
1738+
am.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
1739+
am.registerAppAttempt(true);
1740+
rm2.waitForState(recoveredApp0.getApplicationId(), RMAppState.RUNNING);
1741+
1742+
Assert.assertEquals("root.joe.test", recoveredApp0.getQueue());
1743+
}
1744+
16541745
private void assertUnmanagedAMQueueMetrics(QueueMetrics qm, int appsSubmitted,
16551746
int appsPending, int appsRunning, int appsCompleted) {
16561747
Assert.assertEquals(appsSubmitted, qm.getUnmanagedAppsSubmitted());

0 commit comments

Comments
 (0)