Skip to content

Commit 371daeb

Browse files
committed
HBASE-28955 Improve lease renew for FanOutOneBlockAsyncDFSOutput (#6440)
Signed-off-by: Istvan Toth <stoty@apache.org> (cherry picked from commit db3ba44)
1 parent e5b558c commit 371daeb

File tree

7 files changed

+297
-48
lines changed

7 files changed

+297
-48
lines changed

hbase-asyncfs/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@
9999
<artifactId>mockito-core</artifactId>
100100
<scope>test</scope>
101101
</dependency>
102+
<dependency>
103+
<groupId>org.mockito</groupId>
104+
<artifactId>mockito-inline</artifactId>
105+
<scope>test</scope>
106+
</dependency>
102107
<dependency>
103108
<groupId>org.slf4j</groupId>
104109
<artifactId>jcl-over-slf4j</artifactId>

hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.ArrayList;
3737
import java.util.Collection;
3838
import java.util.Collections;
39+
import java.util.EnumSet;
3940
import java.util.Iterator;
4041
import java.util.List;
4142
import java.util.Map;
@@ -47,6 +48,7 @@
4748
import java.util.function.Supplier;
4849
import org.apache.hadoop.conf.Configuration;
4950
import org.apache.hadoop.crypto.Encryptor;
51+
import org.apache.hadoop.fs.CreateFlag;
5052
import org.apache.hadoop.fs.Path;
5153
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
5254
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
@@ -57,6 +59,7 @@
5759
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
5860
import org.apache.hadoop.hdfs.DFSClient;
5961
import org.apache.hadoop.hdfs.DistributedFileSystem;
62+
import org.apache.hadoop.hdfs.DummyDFSOutputStream;
6063
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
6164
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
6265
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -122,7 +125,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
122125

123126
private final String src;
124127

125-
private HdfsFileStatus stat;
128+
private final HdfsFileStatus stat;
126129

127130
private final ExtendedBlock block;
128131

@@ -138,6 +141,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
138141

139142
private final ByteBufAllocator alloc;
140143

144+
// a dummy DFSOutputStream used for lease renewal
145+
private final DummyDFSOutputStream dummyStream;
146+
141147
private static final class Callback {
142148

143149
private final CompletableFuture<Long> future;
@@ -356,8 +362,9 @@ private void setupReceiver(int timeoutMs) {
356362

357363
FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client,
358364
ClientProtocol namenode, String clientName, String src, HdfsFileStatus stat,
359-
LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap,
360-
DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
365+
EnumSet<CreateFlag> createFlags, LocatedBlock locatedBlock, Encryptor encryptor,
366+
Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer, ByteBufAllocator alloc,
367+
StreamSlowMonitor streamSlowMonitor) {
361368
this.conf = conf;
362369
this.dfs = dfs;
363370
this.client = client;
@@ -376,6 +383,7 @@ private void setupReceiver(int timeoutMs) {
376383
this.state = State.STREAMING;
377384
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
378385
this.streamSlowMonitor = streamSlowMonitor;
386+
this.dummyStream = new DummyDFSOutputStream(this, client, src, stat, createFlags, summer);
379387
}
380388

381389
@Override
@@ -593,7 +601,7 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException
593601
buf = null;
594602
}
595603
closeDataNodeChannelsAndAwait();
596-
endFileLease(client, stat);
604+
endFileLease(this);
597605
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
598606
reporter == null ? new CancelOnClose(client) : reporter);
599607
}
@@ -608,7 +616,7 @@ public void close() throws IOException {
608616
state = State.CLOSED;
609617
closeDataNodeChannelsAndAwait();
610618
block.setNumBytes(ackedBlockLength);
611-
completeFile(client, namenode, src, clientName, block, stat);
619+
completeFile(this, client, namenode, src, clientName, block, stat);
612620
}
613621

614622
@Override
@@ -626,4 +634,20 @@ public long getSyncedLength() {
626634
Map<Channel, DatanodeInfo> getDatanodeInfoMap() {
627635
return this.datanodeInfoMap;
628636
}
637+
638+
DFSClient getClient() {
639+
return client;
640+
}
641+
642+
DummyDFSOutputStream getDummyStream() {
643+
return dummyStream;
644+
}
645+
646+
boolean isClosed() {
647+
return state == State.CLOSED;
648+
}
649+
650+
HdfsFileStatus getStat() {
651+
return stat;
652+
}
629653
}

hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java

Lines changed: 28 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,9 @@ private FanOutOneBlockAsyncDFSOutputHelper() {
141141

142142
private interface LeaseManager {
143143

144-
void begin(DFSClient client, HdfsFileStatus stat);
144+
void begin(FanOutOneBlockAsyncDFSOutput output);
145145

146-
void end(DFSClient client, HdfsFileStatus stat);
146+
void end(FanOutOneBlockAsyncDFSOutput output);
147147
}
148148

149149
private static final LeaseManager LEASE_MANAGER;
@@ -178,44 +178,28 @@ private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException
178178
beginFileLeaseMethod.setAccessible(true);
179179
Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", String.class);
180180
endFileLeaseMethod.setAccessible(true);
181-
Method getConfigurationMethod = DFSClient.class.getDeclaredMethod("getConfiguration");
182-
getConfigurationMethod.setAccessible(true);
183-
Method getNamespaceMehtod = HdfsFileStatus.class.getDeclaredMethod("getNamespace");
184-
181+
Method getUniqKeyMethod = DFSOutputStream.class.getMethod("getUniqKey");
185182
return new LeaseManager() {
186183

187-
private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
188-
"dfs.client.output.stream.uniq.default.key";
189-
private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT";
190-
191-
private String getUniqId(DFSClient client, HdfsFileStatus stat)
192-
throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
193-
// Copied from DFSClient in Hadoop 3.4.0
194-
long fileId = stat.getFileId();
195-
String namespace = (String) getNamespaceMehtod.invoke(stat);
196-
if (namespace == null) {
197-
Configuration conf = (Configuration) getConfigurationMethod.invoke(client);
198-
String defaultKey = conf.get(DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY,
199-
DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT);
200-
return defaultKey + "_" + fileId;
201-
} else {
202-
return namespace + "_" + fileId;
203-
}
184+
private String getUniqKey(FanOutOneBlockAsyncDFSOutput output)
185+
throws IllegalAccessException, InvocationTargetException {
186+
return (String) getUniqKeyMethod.invoke(output.getDummyStream());
204187
}
205188

206189
@Override
207-
public void begin(DFSClient client, HdfsFileStatus stat) {
190+
public void begin(FanOutOneBlockAsyncDFSOutput output) {
208191
try {
209-
beginFileLeaseMethod.invoke(client, getUniqId(client, stat), null);
192+
beginFileLeaseMethod.invoke(output.getClient(), getUniqKey(output),
193+
output.getDummyStream());
210194
} catch (IllegalAccessException | InvocationTargetException e) {
211195
throw new RuntimeException(e);
212196
}
213197
}
214198

215199
@Override
216-
public void end(DFSClient client, HdfsFileStatus stat) {
200+
public void end(FanOutOneBlockAsyncDFSOutput output) {
217201
try {
218-
endFileLeaseMethod.invoke(client, getUniqId(client, stat));
202+
endFileLeaseMethod.invoke(output.getClient(), getUniqKey(output));
219203
} catch (IllegalAccessException | InvocationTargetException e) {
220204
throw new RuntimeException(e);
221205
}
@@ -232,18 +216,19 @@ private static LeaseManager createLeaseManager3() throws NoSuchMethodException {
232216
return new LeaseManager() {
233217

234218
@Override
235-
public void begin(DFSClient client, HdfsFileStatus stat) {
219+
public void begin(FanOutOneBlockAsyncDFSOutput output) {
236220
try {
237-
beginFileLeaseMethod.invoke(client, stat.getFileId(), null);
221+
beginFileLeaseMethod.invoke(output.getClient(), output.getDummyStream().getFileId(),
222+
output.getDummyStream());
238223
} catch (IllegalAccessException | InvocationTargetException e) {
239224
throw new RuntimeException(e);
240225
}
241226
}
242227

243228
@Override
244-
public void end(DFSClient client, HdfsFileStatus stat) {
229+
public void end(FanOutOneBlockAsyncDFSOutput output) {
245230
try {
246-
endFileLeaseMethod.invoke(client, stat.getFileId());
231+
endFileLeaseMethod.invoke(output.getClient(), output.getDummyStream().getFileId());
247232
} catch (IllegalAccessException | InvocationTargetException e) {
248233
throw new RuntimeException(e);
249234
}
@@ -323,12 +308,12 @@ public boolean progress() {
323308
}
324309
}
325310

326-
static void beginFileLease(DFSClient client, HdfsFileStatus stat) {
327-
LEASE_MANAGER.begin(client, stat);
311+
private static void beginFileLease(FanOutOneBlockAsyncDFSOutput output) {
312+
LEASE_MANAGER.begin(output);
328313
}
329314

330-
static void endFileLease(DFSClient client, HdfsFileStatus stat) {
331-
LEASE_MANAGER.end(client, stat);
315+
static void endFileLease(FanOutOneBlockAsyncDFSOutput output) {
316+
LEASE_MANAGER.end(output);
332317
}
333318

334319
static DataChecksum createChecksum(DFSClient client) {
@@ -540,20 +525,19 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
540525
LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
541526
getDataNodeInfo(toExcludeNodes), retry);
542527
}
528+
EnumSetWritable<CreateFlag> createFlags = getCreateFlags(overwrite, noLocalWrite);
543529
HdfsFileStatus stat;
544530
try {
545531
stat = FILE_CREATOR.create(namenode, src,
546532
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
547-
getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize,
548-
CryptoProtocolVersion.supported());
533+
createFlags, createParent, replication, blockSize, CryptoProtocolVersion.supported());
549534
} catch (Exception e) {
550535
if (e instanceof RemoteException) {
551536
throw (RemoteException) e;
552537
} else {
553538
throw new NameNodeException(e);
554539
}
555540
}
556-
beginFileLease(client, stat);
557541
boolean succ = false;
558542
LocatedBlock locatedBlock = null;
559543
List<Future<Channel>> futureList = null;
@@ -578,7 +562,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
578562
Encryptor encryptor = createEncryptor(conf, stat, client);
579563
FanOutOneBlockAsyncDFSOutput output =
580564
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, stat,
581-
locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
565+
createFlags.get(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
566+
beginFileLease(output);
582567
succ = true;
583568
return output;
584569
} catch (RemoteException e) {
@@ -617,7 +602,6 @@ public void operationComplete(Future<Channel> future) throws Exception {
617602
});
618603
}
619604
}
620-
endFileLease(client, stat);
621605
}
622606
}
623607
}
@@ -654,12 +638,13 @@ public static boolean shouldRetryCreate(RemoteException e) {
654638
return e.getClassName().endsWith("RetryStartFileException");
655639
}
656640

657-
static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
658-
ExtendedBlock block, HdfsFileStatus stat) {
641+
static void completeFile(FanOutOneBlockAsyncDFSOutput output, DFSClient client,
642+
ClientProtocol namenode, String src, String clientName, ExtendedBlock block,
643+
HdfsFileStatus stat) {
659644
for (int retry = 0;; retry++) {
660645
try {
661646
if (namenode.complete(src, clientName, block, stat.getFileId())) {
662-
endFileLease(client, stat);
647+
endFileLease(output);
663648
return;
664649
} else {
665650
LOG.warn("complete file " + src + " not finished, retry = " + retry);
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs;
19+
20+
import java.io.IOException;
21+
import java.util.EnumSet;
22+
import org.apache.hadoop.fs.CreateFlag;
23+
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
24+
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
25+
import org.apache.hadoop.util.DataChecksum;
26+
import org.apache.yetus.audience.InterfaceAudience;
27+
28+
/**
29+
* A dummy DFSOutputStream which is mainly used for lease renewal.
30+
* <p>
31+
* We have to put it under this package as we want to override a package private method.
32+
*/
33+
@InterfaceAudience.Private
34+
public final class DummyDFSOutputStream extends DFSOutputStream {
35+
36+
private final AsyncFSOutput delegate;
37+
38+
public DummyDFSOutputStream(AsyncFSOutput output, DFSClient dfsClient, String src,
39+
HdfsFileStatus stat, EnumSet<CreateFlag> flag, DataChecksum checksum) {
40+
super(dfsClient, src, stat, flag, null, checksum, null, false);
41+
this.delegate = output;
42+
}
43+
44+
// public for testing
45+
@Override
46+
public void abort() throws IOException {
47+
delegate.close();
48+
}
49+
50+
@Override
51+
public void close() throws IOException {
52+
delegate.close();
53+
}
54+
}

0 commit comments

Comments
 (0)