@@ -86,12 +86,12 @@ public void testDefaultPolicy() throws Exception {
86
86
final int half = replication /2 ;
87
87
final boolean enoughReplica = replication <= nExistings ;
88
88
final boolean noReplica = nExistings == 0 ;
89
- final boolean replicationL3 = replication < 3 ;
89
+ final boolean replicationL2 = replication < 2 ;
90
90
final boolean existingsLEhalf = nExistings <= half ;
91
91
final boolean isAH = isAppend [i ] || isHflushed [j ];
92
92
93
93
final boolean expected ;
94
- if (enoughReplica || noReplica || replicationL3 ) {
94
+ if (enoughReplica || noReplica || replicationL2 ) {
95
95
expected = false ;
96
96
} else {
97
97
expected = isAH || existingsLEhalf ;
@@ -114,6 +114,50 @@ public void testDefaultPolicy() throws Exception {
114
114
}
115
115
}
116
116
117
+ /** Test replace datanode on failure with 2-replication file. */
118
+ @ Test
119
+ public void testReplaceDatanodeOnFailureWith2Replications () throws Exception {
120
+ final Configuration conf = new HdfsConfiguration ();
121
+ // do not consider load factor when selecting a data node
122
+ conf .setBoolean (DFSConfigKeys .DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY ,
123
+ false );
124
+ //set policy to DEFAULT
125
+ ReplaceDatanodeOnFailure .write (Policy .DEFAULT , false , conf );
126
+
127
+ final int repNum = 2 ;
128
+ final String [] racks = new String [repNum ];
129
+ Arrays .fill (racks , RACK0 );
130
+ final MiniDFSCluster cluster = new MiniDFSCluster .Builder (conf
131
+ ).racks (racks ).numDataNodes (repNum ).build ();
132
+
133
+ try {
134
+ cluster .waitActive ();
135
+ final DistributedFileSystem fs = cluster .getFileSystem ();
136
+ final Path dir = new Path (DIR );
137
+ final SlowWriter [] slowwriter = new SlowWriter [1 ];
138
+ slowwriter [0 ] = new SlowWriter (fs , new Path (dir , "file-rep2" ), 200L , (short ) 2 );
139
+ slowwriter [0 ].start ();
140
+
141
+ //start new datanodes
142
+ cluster .startDataNodes (conf , 1 , true , null , new String []{RACK1 });
143
+ cluster .waitActive ();
144
+ // wait for first block reports for up to 10 seconds
145
+ cluster .waitFirstBRCompleted (0 , 10000 );
146
+
147
+ //stop an old datanode
148
+ MiniDFSCluster .DataNodeProperties dnprop = cluster .stopDataNode (
149
+ AppendTestUtil .nextInt (repNum ));
150
+
151
+ sleepSeconds (3 );
152
+ Assert .assertEquals (repNum , slowwriter [0 ].out .getCurrentBlockReplication ());
153
+
154
+ slowwriter [0 ].interruptRunning ();
155
+ slowwriter [0 ].joinAndClose ();
156
+ } finally {
157
+ if (cluster != null ) {cluster .shutdown ();}
158
+ }
159
+ }
160
+
117
161
/** Test replace datanode on failure. */
118
162
@ Test
119
163
public void testReplaceDatanodeOnFailure () throws Exception {
@@ -236,6 +280,14 @@ static class SlowWriter extends Thread {
236
280
this .sleepms = sleepms ;
237
281
}
238
282
283
+ SlowWriter (DistributedFileSystem fs , Path filepath , final long sleepms ,
284
+ short replication ) throws IOException {
285
+ super (SlowWriter .class .getSimpleName () + ":" + filepath );
286
+ this .filepath = filepath ;
287
+ this .out = (HdfsDataOutputStream )fs .create (filepath , replication );
288
+ this .sleepms = sleepms ;
289
+ }
290
+
239
291
@ Override
240
292
public void run () {
241
293
int i = 0 ;
0 commit comments