Skip to content

Commit 16838f3

Browse files
committed
test
1 parent d4f9eb1 commit 16838f3

File tree

5 files changed

+144
-74
lines changed

5 files changed

+144
-74
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@
172172
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
173173
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
174174
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
175+
import org.apache.hadoop.ipc.RetriableException;
175176
import org.apache.hadoop.ipc.Server;
176177
import org.apache.hadoop.util.*;
177178
import org.apache.hadoop.hdfs.client.BlockReportOptions;
@@ -753,13 +754,13 @@ private String reconfDataXceiverParameters(String property, String newVal)
753754
try {
754755
LOG.info("Reconfiguring {} to {}", property, newVal);
755756
if (property.equals(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY)) {
756-
Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
757+
checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
757758
int threads = (newVal == null ? DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT :
758759
Integer.parseInt(newVal));
759760
result = Integer.toString(threads);
760761
getXferServer().setMaxXceiverCount(threads);
761762
} else if (property.equals(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY)) {
762-
Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
763+
checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
763764
long bandwidthPerSec = (newVal == null ?
764765
DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT : Long.parseLong(newVal));
765766
DataTransferThrottler transferThrottler = null;
@@ -771,7 +772,7 @@ private String reconfDataXceiverParameters(String property, String newVal)
771772
result = Long.toString(bandwidthPerSec);
772773
getXferServer().setTransferThrottler(transferThrottler);
773774
} else if (property.equals(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY)) {
774-
Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
775+
checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
775776
long bandwidthPerSec = (newVal == null ? DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT :
776777
Long.parseLong(newVal));
777778
DataTransferThrottler writeThrottler = null;
@@ -783,7 +784,7 @@ private String reconfDataXceiverParameters(String property, String newVal)
783784
result = Long.toString(bandwidthPerSec);
784785
getXferServer().setWriteThrottler(writeThrottler);
785786
} else if (property.equals(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY)) {
786-
Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
787+
checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
787788
long bandwidthPerSec = (newVal == null ? DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_DEFAULT :
788789
Long.parseLong(newVal));
789790
DataTransferThrottler readThrottler = null;
@@ -807,7 +808,7 @@ private String reconfCacheReportParameters(String property, String newVal)
807808
String result;
808809
try {
809810
LOG.info("Reconfiguring {} to {}", property, newVal);
810-
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
811+
checkNotNull(dnConf, "DNConf has not been initialized.");
811812
long reportInterval = (newVal == null ? DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT :
812813
Long.parseLong(newVal));
813814
result = Long.toString(reportInterval);
@@ -825,7 +826,7 @@ private String reconfBlockReportParameters(String property, String newVal)
825826
try {
826827
LOG.info("Reconfiguring {} to {}", property, newVal);
827828
if (property.equals(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)) {
828-
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
829+
checkNotNull(dnConf, "DNConf has not been initialized.");
829830
long intervalMs = newVal == null ? DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT :
830831
Long.parseLong(newVal);
831832
result = Long.toString(intervalMs);
@@ -838,13 +839,13 @@ private String reconfBlockReportParameters(String property, String newVal)
838839
}
839840
}
840841
} else if (property.equals(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY)) {
841-
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
842+
checkNotNull(dnConf, "DNConf has not been initialized.");
842843
long threshold = newVal == null ? DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT :
843844
Long.parseLong(newVal);
844845
result = Long.toString(threshold);
845846
dnConf.setBlockReportSplitThreshold(threshold);
846847
} else if (property.equals(DFS_BLOCKREPORT_INITIAL_DELAY_KEY)) {
847-
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
848+
checkNotNull(dnConf, "DNConf has not been initialized.");
848849
int initialDelay = newVal == null ? DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT :
849850
Integer.parseInt(newVal);
850851
result = Integer.toString(initialDelay);
@@ -863,7 +864,7 @@ private String reconfSlowPeerParameters(String property, String newVal)
863864
try {
864865
LOG.info("Reconfiguring {} to {}", property, newVal);
865866
if (property.equals(DFS_DATANODE_PEER_STATS_ENABLED_KEY)) {
866-
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
867+
checkNotNull(dnConf, "DNConf has not been initialized.");
867868
if (newVal != null && !newVal.equalsIgnoreCase("true")
868869
&& !newVal.equalsIgnoreCase("false")) {
869870
throw new IllegalArgumentException("Not a valid Boolean value for " + property +
@@ -878,19 +879,19 @@ private String reconfSlowPeerParameters(String property, String newVal)
878879
peerMetrics = DataNodePeerMetrics.create(getDisplayName(), getConf());
879880
}
880881
} else if (property.equals(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY)) {
881-
Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
882+
checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
882883
long minNodes = (newVal == null ? DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT :
883884
Long.parseLong(newVal));
884885
result = Long.toString(minNodes);
885886
peerMetrics.setMinOutlierDetectionNodes(minNodes);
886887
} else if (property.equals(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY)) {
887-
Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
888+
checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
888889
long threshold = (newVal == null ? DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT :
889890
Long.parseLong(newVal));
890891
result = Long.toString(threshold);
891892
peerMetrics.setLowThresholdMs(threshold);
892893
} else if (property.equals(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY)) {
893-
Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
894+
checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
894895
long minSamples = (newVal == null ?
895896
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT :
896897
Long.parseLong(newVal));
@@ -1066,7 +1067,7 @@ private String reconfSlowIoWarningThresholdParameters(String property, String ne
10661067
String result;
10671068
try {
10681069
LOG.info("Reconfiguring {} to {}", property, newVal);
1069-
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
1070+
checkNotNull(dnConf, "DNConf has not been initialized.");
10701071
long slowIoWarningThreshold = (newVal == null ?
10711072
DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT :
10721073
Long.parseLong(newVal));
@@ -1484,7 +1485,7 @@ private static String getHostName(Configuration config)
14841485
}
14851486

14861487
/**
1487-
* @see DFSUtil#getHttpPolicy(org.apache.hadoop.conf.Configuration)
1488+
* @see DFSUtil#getHttpPolicy(Configuration)
14881489
* for information related to the different configuration options and
14891490
* Http Policy is decided.
14901491
*/
@@ -1812,7 +1813,7 @@ public void reportCorruptedBlocks(
18121813
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap =
18131814
corruptedBlocks.getCorruptionMap();
18141815
if (corruptionMap != null) {
1815-
for (Map.Entry<ExtendedBlock, Set<DatanodeInfo>> entry :
1816+
for (Entry<ExtendedBlock, Set<DatanodeInfo>> entry :
18161817
corruptionMap.entrySet()) {
18171818
for (DatanodeInfo dnInfo : entry.getValue()) {
18181819
reportRemoteBadBlock(dnInfo, entry.getKey());
@@ -1828,7 +1829,7 @@ public void reportCorruptedBlocks(
18281829
*/
18291830
private BPOfferService getBPOSForBlock(ExtendedBlock block)
18301831
throws IOException {
1831-
Preconditions.checkNotNull(block);
1832+
checkNotNull(block);
18321833
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
18331834
if (bpos == null) {
18341835
throw new IOException("cannot locate OfferService thread for bp="+
@@ -2438,7 +2439,7 @@ public long getMaxNumberOfBlocksToLog() {
24382439
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
24392440
Token<BlockTokenIdentifier> token) throws IOException {
24402441
checkBlockLocalPathAccess();
2441-
checkBlockToken(block, token, BlockTokenIdentifier.AccessMode.READ);
2442+
checkBlockToken(block, token, AccessMode.READ);
24422443
checkStorageState("getBlockLocalPathInfo");
24432444
BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
24442445
if (info != null) {
@@ -2886,7 +2887,7 @@ void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
28862887

28872888
final DataTransfer dataTransferTask = new DataTransfer(xferTargets,
28882889
xferTargetStorageTypes, xferTargetStorageIDs, block,
2889-
BlockConstructionStage.PIPELINE_SETUP_CREATE, "");
2890+
PIPELINE_SETUP_CREATE, "");
28902891

28912892
this.xferService.execute(dataTransferTask);
28922893
}
@@ -3068,7 +3069,7 @@ public void run() {
30683069
// Header info
30693070
//
30703071
Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
3071-
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
3072+
EnumSet.of(AccessMode.WRITE),
30723073
targetStorageTypes, targetStorageIds);
30733074

30743075
long writeTimeout = dnConf.socketWriteTimeout +
@@ -3561,7 +3562,7 @@ private void checkReadAccess(final ExtendedBlock block) throws IOException {
35613562
getDNRegistrationForBP(block.getBlockPoolId());
35623563
} catch (IOException e) {
35633564
// if it has not registered with the NN, throw an exception back.
3564-
throw new org.apache.hadoop.ipc.RetriableException(
3565+
throw new RetriableException(
35653566
"Datanode not registered. Try again later.");
35663567
}
35673568

@@ -3576,7 +3577,7 @@ private void checkReadAccess(final ExtendedBlock block) throws IOException {
35763577
BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
35773578
LOG.debug("BlockTokenIdentifier: {}", id);
35783579
blockPoolTokenSecretManager.checkAccess(id, null, block,
3579-
BlockTokenIdentifier.AccessMode.READ, null, null);
3580+
AccessMode.READ, null, null);
35803581
}
35813582
}
35823583
}
@@ -4275,7 +4276,7 @@ public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException {
42754276
@Override
42764277
public String getDiskBalancerSetting(String key) throws IOException {
42774278
checkSuperuserPrivilege();
4278-
Preconditions.checkNotNull(key);
4279+
checkNotNull(key);
42794280
switch (key) {
42804281
case DiskBalancerConstants.DISKBALANCER_VOLUME_NAME:
42814282
return getDiskBalancer().getVolumeNames();
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.datanode;
20+
21+
import java.util.List;
22+
23+
/**
24+
* This interface is used to generate sub lock name for a blockid.
25+
*/
26+
public interface DataSetSubLockStrategy {
27+
28+
/**
29+
* Generate sub lock name for the given blockid.
30+
* @param blockid
31+
* @return sub lock name for the input blockid.
32+
*/
33+
String blockIdToSubLock(long blockid);
34+
35+
List<String> getAllSubLockName();
36+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -129,31 +129,6 @@ public static File idToBlockDir(File root, long blockId) {
129129
return new File(root, path);
130130
}
131131

132-
/**
133-
* Take an example. We hava a block with blockid mapping to:
134-
* "/data1/hadoop/hdfs/datanode/current/BP-xxxx/current/finalized/subdir0/subdir0"
135-
* We return "subdir0/subdir0"
136-
* @param blockId blockId
137-
* @return The two-level subdir name
138-
*/
139-
public static String idToBlockDirSuffixName(long blockId) {
140-
int d1 = (int) ((blockId >> 16) & 0x1F);
141-
int d2 = (int) ((blockId >> 8) & 0x1F);
142-
return DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
143-
DataStorage.BLOCK_SUBDIR_PREFIX + d2;
144-
}
145-
146-
public static List<String> getAllSubDirNameForDataSetLock() {
147-
List<String> res = new ArrayList<>();
148-
for (int d1 = 0; d1 <= 0x1F; d1++) {
149-
for (int d2 = 0; d2 <= 0x1F; d2++) {
150-
res.add(DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
151-
DataStorage.BLOCK_SUBDIR_PREFIX + d2);
152-
}
153-
}
154-
return res;
155-
}
156-
157132
/**
158133
* @return the FileInputStream for the meta data of the given block.
159134
* @throws FileNotFoundException
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.datanode;
20+
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
27+
public class ModDataSetSubLockStrategy implements DataSetSubLockStrategy {
28+
public static final Logger LOG = LoggerFactory.getLogger(DataSetSubLockStrategy.class);
29+
30+
private static final String LOCK_NAME_PERFIX = "SubLock";
31+
private long modFactor;
32+
33+
public ModDataSetSubLockStrategy(long mod) {
34+
if (mod <= 0) {
35+
mod = 1L;
36+
}
37+
this.modFactor = mod;
38+
}
39+
40+
@Override
41+
public String blockIdToSubLock(long blockid) {
42+
return LOCK_NAME_PERFIX + String.valueOf(blockid % modFactor);
43+
}
44+
45+
@Override
46+
public List<String> getAllSubLockName() {
47+
List<String> res = new ArrayList<>();
48+
for (long i = 0L; i < modFactor; i++) {
49+
res.add(LOCK_NAME_PERFIX + i);
50+
}
51+
return res;
52+
}
53+
}

0 commit comments

Comments
 (0)