Skip to content

Commit 627f89b

Browse files
committed
HDFS-167. Fix a bug in DFSClient that caused infinite retries on write. Contributed by Bill Zeller
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@799769 13f79535-47bb-0310-9956-ffa450edef68
1 parent fc86756 commit 627f89b

File tree

3 files changed

+194
-27
lines changed

3 files changed

+194
-27
lines changed

CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ Trunk (unreleased changes)
123123
HDFS-119. Fix a bug in logSync(), which causes NameNode block forever.
124124
(Suresh Srinivas via shv)
125125

126+
HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
127+
(Bill Zeller via szetszwo)
128+
126129
Release 0.20.1 - Unreleased
127130

128131
IMPROVEMENTS

src/java/org/apache/hadoop/hdfs/DFSClient.java

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
127127
public static final Log LOG = LogFactory.getLog(DFSClient.class);
128128
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
129129
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
130-
final private ClientProtocol namenode;
131-
final private ClientProtocol rpcNamenode;
130+
private ClientProtocol namenode;
131+
private ClientProtocol rpcNamenode;
132132
final UnixUserGroupInformation ugi;
133133
volatile boolean clientRunning = true;
134134
Random r = new Random();
@@ -219,6 +219,29 @@ public DFSClient(Configuration conf) throws IOException {
219219
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
220220
FileSystem.Statistics stats)
221221
throws IOException {
222+
this(conf, stats);
223+
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
224+
this.namenode = createNamenode(this.rpcNamenode);
225+
}
226+
227+
/**
228+
* Create a new DFSClient connected to the given namenode
229+
* and rpcNamenode objects.
230+
*
231+
* This constructor was written to allow easy testing of the DFSClient class.
232+
* End users will most likely want to use one of the other constructors.
233+
*/
234+
public DFSClient(ClientProtocol namenode, ClientProtocol rpcNamenode,
235+
Configuration conf, FileSystem.Statistics stats)
236+
throws IOException {
237+
this(conf, stats);
238+
this.namenode = namenode;
239+
this.rpcNamenode = rpcNamenode;
240+
}
241+
242+
243+
private DFSClient(Configuration conf, FileSystem.Statistics stats)
244+
throws IOException {
222245
this.conf = conf;
223246
this.stats = stats;
224247
this.socketTimeout = conf.getInt("dfs.socket.timeout",
@@ -240,9 +263,6 @@ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
240263
throw (IOException)(new IOException().initCause(e));
241264
}
242265

243-
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
244-
this.namenode = createNamenode(rpcNamenode);
245-
246266
String taskId = conf.get("mapred.task.id");
247267
if (taskId != null) {
248268
this.clientName = "DFSClient_" + taskId;
@@ -2856,7 +2876,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
28562876
}
28572877

28582878
private LocatedBlock locateFollowingBlock(long start) throws IOException {
2859-
int retries = 5;
2879+
int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
28602880
long sleeptime = 400;
28612881
while (true) {
28622882
long localstart = System.currentTimeMillis();
@@ -2872,26 +2892,32 @@ private LocatedBlock locateFollowingBlock(long start) throws IOException {
28722892
if (ue != e) {
28732893
throw ue; // no need to retry these exceptions
28742894
}
2875-
2876-
if (--retries == 0 &&
2877-
!NotReplicatedYetException.class.getName().
2895+
2896+
2897+
if (NotReplicatedYetException.class.getName().
28782898
equals(e.getClassName())) {
2879-
throw e;
2880-
} else {
2881-
LOG.info(StringUtils.stringifyException(e));
2882-
if (System.currentTimeMillis() - localstart > 5000) {
2883-
LOG.info("Waiting for replication for "
2884-
+ (System.currentTimeMillis() - localstart) / 1000
2885-
+ " seconds");
2886-
}
2887-
try {
2888-
LOG.warn("NotReplicatedYetException sleeping " + src
2889-
+ " retries left " + retries);
2890-
Thread.sleep(sleeptime);
2891-
sleeptime *= 2;
2892-
} catch (InterruptedException ie) {
2899+
if (retries == 0) {
2900+
throw e;
2901+
} else {
2902+
--retries;
2903+
LOG.info(StringUtils.stringifyException(e));
2904+
if (System.currentTimeMillis() - localstart > 5000) {
2905+
LOG.info("Waiting for replication for "
2906+
+ (System.currentTimeMillis() - localstart) / 1000
2907+
+ " seconds");
2908+
}
2909+
try {
2910+
LOG.warn("NotReplicatedYetException sleeping " + src
2911+
+ " retries left " + retries);
2912+
Thread.sleep(sleeptime);
2913+
sleeptime *= 2;
2914+
} catch (InterruptedException ie) {
2915+
}
28932916
}
2894-
}
2917+
} else {
2918+
throw e;
2919+
}
2920+
28952921
}
28962922
}
28972923
}

src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Lines changed: 141 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,18 @@
2121
import java.io.InputStream;
2222
import java.io.OutputStream;
2323

24+
import org.apache.commons.logging.Log;
25+
import org.apache.commons.logging.LogFactory;
2426
import org.apache.hadoop.conf.Configuration;
25-
import org.apache.hadoop.fs.FileSystem;
26-
import org.apache.hadoop.fs.Path;
27-
import org.apache.hadoop.io.IOUtils;
27+
import org.apache.hadoop.fs.*;
28+
import org.apache.hadoop.fs.permission.FsPermission;
29+
import org.apache.hadoop.hdfs.protocol.*;
30+
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
31+
import org.apache.hadoop.hdfs.server.common.*;
32+
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
33+
import org.apache.hadoop.io.*;
34+
import org.apache.hadoop.ipc.RemoteException;
35+
import org.apache.hadoop.security.AccessControlException;
2836

2937
import junit.framework.TestCase;
3038

@@ -34,6 +42,8 @@
3442
* properly in case of errors.
3543
*/
3644
public class TestDFSClientRetries extends TestCase {
45+
public static final Log LOG =
46+
LogFactory.getLog(TestDFSClientRetries.class.getName());
3747

3848
// writes 'len' bytes of data to out.
3949
private static void writeData(OutputStream out, int len) throws IOException {
@@ -97,4 +107,132 @@ public void testWriteTimeoutAtDataNode() throws IOException,
97107
}
98108

99109
// more tests related to different failure cases can be added here.
110+
111+
class TestNameNode implements ClientProtocol
112+
{
113+
int num_calls = 0;
114+
115+
// The total number of calls that can be made to addBlock
116+
// before an exception is thrown
117+
int num_calls_allowed;
118+
public final String ADD_BLOCK_EXCEPTION = "Testing exception thrown from"
119+
+ "TestDFSClientRetries::"
120+
+ "TestNameNode::addBlock";
121+
public final String RETRY_CONFIG
122+
= "dfs.client.block.write.locateFollowingBlock.retries";
123+
124+
public TestNameNode(Configuration conf) throws IOException
125+
{
126+
// +1 because the configuration value is the number of retries and
127+
// the first call is not a retry (e.g., 2 retries == 3 total
128+
// calls allowed)
129+
this.num_calls_allowed = conf.getInt(RETRY_CONFIG, 5) + 1;
130+
}
131+
132+
public long getProtocolVersion(String protocol,
133+
long clientVersion)
134+
throws IOException
135+
{
136+
return versionID;
137+
}
138+
139+
public LocatedBlock addBlock(String src, String clientName)
140+
throws IOException
141+
{
142+
num_calls++;
143+
if (num_calls > num_calls_allowed) {
144+
throw new IOException("addBlock called more times than "
145+
+ RETRY_CONFIG
146+
+ " allows.");
147+
} else {
148+
throw new RemoteException(NotReplicatedYetException.class.getName(),
149+
ADD_BLOCK_EXCEPTION);
150+
}
151+
}
152+
153+
154+
// The following methods are stub methods that are not needed by this mock class
155+
156+
public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException { return null; }
157+
158+
public void create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, short replication, long blockSize) throws IOException {}
159+
160+
public LocatedBlock append(String src, String clientName) throws IOException { return null; }
161+
162+
public boolean setReplication(String src, short replication) throws IOException { return false; }
163+
164+
public void setPermission(String src, FsPermission permission) throws IOException {}
165+
166+
public void setOwner(String src, String username, String groupname) throws IOException {}
167+
168+
public void abandonBlock(Block b, String src, String holder) throws IOException {}
169+
170+
public boolean complete(String src, String clientName) throws IOException { return false; }
171+
172+
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {}
173+
174+
public boolean rename(String src, String dst) throws IOException { return false; }
175+
176+
public boolean delete(String src) throws IOException { return false; }
177+
178+
public boolean delete(String src, boolean recursive) throws IOException { return false; }
179+
180+
public boolean mkdirs(String src, FsPermission masked) throws IOException { return false; }
181+
182+
public FileStatus[] getListing(String src) throws IOException { return null; }
183+
184+
public void renewLease(String clientName) throws IOException {}
185+
186+
public long[] getStats() throws IOException { return null; }
187+
188+
public DatanodeInfo[] getDatanodeReport(FSConstants.DatanodeReportType type) throws IOException { return null; }
189+
190+
public long getPreferredBlockSize(String filename) throws IOException { return 0; }
191+
192+
public boolean setSafeMode(FSConstants.SafeModeAction action) throws IOException { return false; }
193+
194+
public void saveNamespace() throws IOException {}
195+
196+
public boolean restoreFailedStorage(String arg) throws AccessControlException { return false; }
197+
198+
public void refreshNodes() throws IOException {}
199+
200+
public void finalizeUpgrade() throws IOException {}
201+
202+
public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException { return null; }
203+
204+
public void metaSave(String filename) throws IOException {}
205+
206+
public FileStatus getFileInfo(String src) throws IOException { return null; }
207+
208+
public ContentSummary getContentSummary(String path) throws IOException { return null; }
209+
210+
public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException {}
211+
212+
public void fsync(String src, String client) throws IOException {}
213+
214+
public void setTimes(String src, long mtime, long atime) throws IOException {}
215+
216+
}
217+
218+
public void testNotYetReplicatedErrors() throws IOException
219+
{
220+
Configuration conf = new Configuration();
221+
222+
// allow 1 retry (2 total calls)
223+
conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
224+
225+
TestNameNode tnn = new TestNameNode(conf);
226+
DFSClient client = new DFSClient(tnn, tnn, conf, null);
227+
OutputStream os = client.create("testfile", true);
228+
os.write(20); // write one random byte
229+
230+
try {
231+
os.close();
232+
} catch (Exception e) {
233+
assertTrue("Retries are not being stopped correctly",
234+
e.getMessage().equals(tnn.ADD_BLOCK_EXCEPTION));
235+
}
236+
}
237+
100238
}

0 commit comments

Comments
 (0)