@@ -2123,80 +2123,82 @@ public void testBlockReportAfterDataNodeRestart() throws Exception {
2123
2123
}
2124
2124
}
2125
2125
2126
+ /**
2127
+ * Test when the numBytes of the block in the block report is set to NO_ACK,
2128
+ * the DataNode processing will not report incremental blocks.
2129
+ */
2126
2130
@ Test (timeout = 360000 )
2127
- public void testSetNoAckBlockInInvalidateBlocks () throws Exception {
2128
- {
2129
- Configuration conf = new HdfsConfiguration ();
2130
- conf .setInt (DFSConfigKeys .DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY , 500 );
2131
- conf .setInt (DFSConfigKeys .DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY , 3 );
2132
- conf .setLong (DFSConfigKeys .DFS_HEARTBEAT_INTERVAL_KEY , 1L );
2133
- MiniDFSCluster cluster = new MiniDFSCluster .Builder (conf ).numDataNodes (1 ).build ();
2134
- FSNamesystem fsn = cluster .getNamesystem ();
2135
- BlockManager bm = fsn .getBlockManager ();
2136
- DistributedFileSystem fs = cluster .getFileSystem ();
2137
- try {
2138
- // Write file.
2139
- Path file = new Path ("/test" );
2140
- DFSTestUtil .createFile (fs , file , 10240L , (short )1 , 0L );
2141
- DFSTestUtil .waitReplication (fs , file , (short ) 1 );
2142
- LocatedBlock lb = DFSTestUtil .getAllBlocks (fs , file ).get (0 );
2143
- DatanodeInfo [] loc = lb .getLocations ();
2144
- assertEquals (1 , loc .length );
2145
- List <DataNode > datanodes = cluster .getDataNodes ();
2146
- assertEquals (1 , datanodes .size ());
2147
- DataNode datanode = datanodes .get (0 );
2148
- assertEquals (datanode .getDatanodeUuid (), loc [0 ].getDatanodeUuid ());
2149
-
2150
- MetricsRecordBuilder rb = getMetrics (datanode .getMetrics ().name ());
2151
- // Check the IncrementalBlockReportsNumOps of DataNode, it will be 0.
2152
- assertEquals (1 , getLongCounter ("IncrementalBlockReportsNumOps" , rb ));
2153
-
2154
- // Delete file and remove block.
2155
- fs .delete (file , false );
2156
-
2157
- // Wait for the processing of the marked deleted block to complete.
2158
- BlockManagerTestUtil .waitForMarkedDeleteQueueIsEmpty (bm );
2159
- assertNull (bm .getStoredBlock (lb .getBlock ().getLocalBlock ()));
2160
-
2161
- // Expire heartbeat on the NameNode,and datanode to be marked dead.
2162
- datanode .setHeartbeatsDisabledForTests (true );
2163
- cluster .setDataNodeDead (datanode .getDatanodeId ());
2164
- assertFalse (bm .containsInvalidateBlock (loc [0 ], lb .getBlock ().getLocalBlock ()));
2165
-
2166
- // Wait for re-registration and heartbeat.
2167
- datanode .setHeartbeatsDisabledForTests (false );
2168
- final DatanodeDescriptor dn1Desc = cluster .getNamesystem (0 )
2169
- .getBlockManager ().getDatanodeManager ()
2170
- .getDatanode (datanode .getDatanodeId ());
2171
- GenericTestUtils .waitFor (
2172
- () -> dn1Desc .isAlive () && dn1Desc .isHeartbeatedSinceRegistration (),
2173
- 100 , 5000 );
2174
-
2175
- // Trigger BlockReports and block is not exists,
2176
- // it will add invalidateBlocks and set block size be NO_ACK.
2177
- cluster .triggerBlockReports ();
2178
- assertTrue (bm .containsInvalidateBlock (loc [0 ], lb .getBlock ().getLocalBlock ()));
2179
-
2180
- // Trigger schedule blocks for deletion at datanode.
2181
- int workCount = bm .computeInvalidateWork (1 );
2182
- assertEquals (1 , workCount );
2183
- assertFalse (bm .containsInvalidateBlock (loc [0 ], lb .getBlock ().getLocalBlock ()));
2184
-
2185
- // Wait for the blocksRemoved value in DataNode to be 1.
2186
- GenericTestUtils .waitFor (
2187
- () -> datanode .getMetrics ().getBlocksRemoved () == 1 ,
2188
- 100 , 5000 );
2189
-
2190
- // Trigger immediate deletion report at datanode.
2191
- cluster .triggerDeletionReports ();
2192
-
2193
- // Delete block size be NO_ACK and will not deletion block report,
2194
- // so check the IncrementalBlockReportsNumOps of DataNode still 1.
2195
- assertEquals (1 , getLongCounter ("IncrementalBlockReportsNumOps" , rb ));
2196
- } finally {
2197
- if (cluster != null ) {
2198
- cluster .shutdown ();
2199
- }
2131
+ public void testSetNoAckBlockInBlockReport () throws Exception {
2132
+ Configuration conf = new HdfsConfiguration ();
2133
+ conf .setInt (DFSConfigKeys .DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY , 500 );
2134
+ conf .setInt (DFSConfigKeys .DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY , 3 );
2135
+ conf .setLong (DFSConfigKeys .DFS_HEARTBEAT_INTERVAL_KEY , 1L );
2136
+ MiniDFSCluster cluster = new MiniDFSCluster .Builder (conf ).numDataNodes (1 ).build ();
2137
+ FSNamesystem fsn = cluster .getNamesystem ();
2138
+ BlockManager bm = fsn .getBlockManager ();
2139
+ DistributedFileSystem fs = cluster .getFileSystem ();
2140
+ try {
2141
+ // Write file.
2142
+ Path file = new Path ("/test" );
2143
+ DFSTestUtil .createFile (fs , file , 10240L , (short )1 , 0L );
2144
+ DFSTestUtil .waitReplication (fs , file , (short ) 1 );
2145
+ LocatedBlock lb = DFSTestUtil .getAllBlocks (fs , file ).get (0 );
2146
+ DatanodeInfo [] loc = lb .getLocations ();
2147
+ assertEquals (1 , loc .length );
2148
+ List <DataNode > datanodes = cluster .getDataNodes ();
2149
+ assertEquals (1 , datanodes .size ());
2150
+ DataNode datanode = datanodes .get (0 );
2151
+ assertEquals (datanode .getDatanodeUuid (), loc [0 ].getDatanodeUuid ());
2152
+
2153
+ MetricsRecordBuilder rb = getMetrics (datanode .getMetrics ().name ());
2154
+ // Check the IncrementalBlockReportsNumOps of DataNode, it will be 0.
2155
+ assertEquals (1 , getLongCounter ("IncrementalBlockReportsNumOps" , rb ));
2156
+
2157
+ // Delete file and remove block.
2158
+ fs .delete (file , false );
2159
+
2160
+ // Wait for the processing of the marked deleted block to complete.
2161
+ BlockManagerTestUtil .waitForMarkedDeleteQueueIsEmpty (bm );
2162
+ assertNull (bm .getStoredBlock (lb .getBlock ().getLocalBlock ()));
2163
+
2164
+ // Expire heartbeat on the NameNode,and datanode to be marked dead.
2165
+ datanode .setHeartbeatsDisabledForTests (true );
2166
+ cluster .setDataNodeDead (datanode .getDatanodeId ());
2167
+ assertFalse (bm .containsInvalidateBlock (loc [0 ], lb .getBlock ().getLocalBlock ()));
2168
+
2169
+ // Wait for re-registration and heartbeat.
2170
+ datanode .setHeartbeatsDisabledForTests (false );
2171
+ final DatanodeDescriptor dn1Desc = cluster .getNamesystem (0 )
2172
+ .getBlockManager ().getDatanodeManager ()
2173
+ .getDatanode (datanode .getDatanodeId ());
2174
+ GenericTestUtils .waitFor (
2175
+ () -> dn1Desc .isAlive () && dn1Desc .isHeartbeatedSinceRegistration (),
2176
+ 100 , 5000 );
2177
+
2178
+ // Trigger BlockReports and block is not exists,
2179
+ // it will add invalidateBlocks and set block numBytes be NO_ACK.
2180
+ cluster .triggerBlockReports ();
2181
+ assertTrue (bm .containsInvalidateBlock (loc [0 ], lb .getBlock ().getLocalBlock ()));
2182
+
2183
+ // Trigger schedule blocks for deletion at datanode.
2184
+ int workCount = bm .computeInvalidateWork (1 );
2185
+ assertEquals (1 , workCount );
2186
+ assertFalse (bm .containsInvalidateBlock (loc [0 ], lb .getBlock ().getLocalBlock ()));
2187
+
2188
+ // Wait for the blocksRemoved value in DataNode to be 1.
2189
+ GenericTestUtils .waitFor (
2190
+ () -> datanode .getMetrics ().getBlocksRemoved () == 1 ,
2191
+ 100 , 5000 );
2192
+
2193
+ // Trigger immediate deletion report at datanode.
2194
+ cluster .triggerDeletionReports ();
2195
+
2196
+ // Delete block numBytes be NO_ACK and will not deletion block report,
2197
+ // so check the IncrementalBlockReportsNumOps of DataNode still 1.
2198
+ assertEquals (1 , getLongCounter ("IncrementalBlockReportsNumOps" , rb ));
2199
+ } finally {
2200
+ if (cluster != null ) {
2201
+ cluster .shutdown ();
2200
2202
}
2201
2203
}
2202
2204
}
0 commit comments