Skip to content

Commit d2142a8

Browse files
BukrosSzabolcsmeszibalu
authored andcommitted
HBASE-23085 Network and Data related Actions
Add monkey actions: - manipulate network packages with tc (reorder, loose,...) - add CPU load - fill the disk - corrupt or delete regionserver data files Extend HBaseClusterManager to allow sudo calls. Signed-off-by: Josh Elser <elserj@apache.org> Signed-off-by: Balazs Meszaros <meszibalu@apache.org>
1 parent f0f7fae commit d2142a8

20 files changed

+1085
-26
lines changed

hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
6262
"timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\"";
6363
private String tunnelCmd;
6464

65+
/**
66+
* The command format that is used to execute the remote command with sudo. Arguments:
67+
* 1 SSH options, 2 user name , 3 "@" if username is set, 4 host,
68+
* 5 original command, 6 timeout.
69+
*/
70+
private static final String DEFAULT_TUNNEL_SUDO_CMD =
71+
"timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\"";
72+
private String tunnelSudoCmd;
73+
6574
private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
6675
private static final int DEFAULT_RETRY_ATTEMPTS = 5;
6776

@@ -86,6 +95,7 @@ public void setConf(Configuration conf) {
8695
sshOptions = (sshOptions == null) ? "" : sshOptions;
8796
sshUserName = (sshUserName == null) ? "" : sshUserName;
8897
tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
98+
tunnelSudoCmd = conf.get("hbase.it.clustermanager.ssh.sudo.cmd", DEFAULT_TUNNEL_SUDO_CMD);
8999
// Print out ssh special config if any.
90100
if ((sshUserName != null && sshUserName.length() > 0) ||
91101
(sshOptions != null && sshOptions.length() > 0)) {
@@ -152,10 +162,32 @@ public String[] getExecString() {
152162
LOG.info("Executing full command [" + cmd + "]");
153163
return new String[] { "/usr/bin/env", "bash", "-c", cmd };
154164
}
165+
}
166+
167+
/**
168+
* Executes commands over SSH
169+
*/
170+
protected class RemoteSudoShell extends Shell.ShellCommandExecutor {
171+
private String hostname;
172+
173+
public RemoteSudoShell(String hostname, String[] execString, long timeout) {
174+
this(hostname, execString, null, null, timeout);
175+
}
176+
177+
public RemoteSudoShell(String hostname, String[] execString, File dir, Map<String, String> env,
178+
long timeout) {
179+
super(execString, dir, env, timeout);
180+
this.hostname = hostname;
181+
}
155182

156183
@Override
157-
public void execute() throws IOException {
158-
super.execute();
184+
public String[] getExecString() {
185+
String at = sshUserName.isEmpty() ? "" : "@";
186+
String remoteCmd = StringUtils.join(super.getExecString(), " ");
187+
String cmd = String.format(tunnelSudoCmd, sshOptions, sshUserName, at, hostname, remoteCmd,
188+
timeOutInterval/1000f);
189+
LOG.info("Executing full command [" + cmd + "]");
190+
return new String[] { "/usr/bin/env", "bash", "-c", cmd };
159191
}
160192
}
161193

@@ -299,7 +331,8 @@ protected CommandProvider getCommandProvider(ServiceType service) throws IOExcep
299331
*/
300332
private Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
301333
throws IOException {
302-
LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname);
334+
LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "),
335+
hostname);
303336

304337
RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd);
305338
try {
@@ -312,8 +345,8 @@ private Pair<Integer, String> exec(String hostname, ServiceType service, String.
312345
+ ", stdout: " + output);
313346
}
314347

315-
LOG.info("Executed remote command, exit code:" + shell.getExitCode()
316-
+ " , output:" + shell.getOutput());
348+
LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(),
349+
shell.getOutput());
317350

318351
return new Pair<>(shell.getExitCode(), shell.getOutput());
319352
}
@@ -331,7 +364,52 @@ private Pair<Integer, String> execWithRetries(String hostname, ServiceType servi
331364
retryCounter.sleepUntilNextRetry();
332365
} catch (InterruptedException ex) {
333366
// ignore
334-
LOG.warn("Sleep Interrupted:" + ex);
367+
LOG.warn("Sleep Interrupted:", ex);
368+
}
369+
}
370+
}
371+
372+
/**
373+
* Execute the given command on the host using SSH
374+
* @return pair of exit code and command output
375+
* @throws IOException if something goes wrong.
376+
*/
377+
public Pair<Integer, String> execSudo(String hostname, long timeout, String... cmd)
378+
throws IOException {
379+
LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "),
380+
hostname);
381+
382+
RemoteSudoShell shell = new RemoteSudoShell(hostname, cmd, timeout);
383+
try {
384+
shell.execute();
385+
} catch (Shell.ExitCodeException ex) {
386+
// capture the stdout of the process as well.
387+
String output = shell.getOutput();
388+
// add output for the ExitCodeException.
389+
throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
390+
+ ", stdout: " + output);
391+
}
392+
393+
LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(),
394+
shell.getOutput());
395+
396+
return new Pair<>(shell.getExitCode(), shell.getOutput());
397+
}
398+
399+
public Pair<Integer, String> execSudoWithRetries(String hostname, long timeout, String... cmd)
400+
throws IOException {
401+
RetryCounter retryCounter = retryCounterFactory.create();
402+
while (true) {
403+
try {
404+
return execSudo(hostname, timeout, cmd);
405+
} catch (IOException e) {
406+
retryOrThrow(retryCounter, e, hostname, cmd);
407+
}
408+
try {
409+
retryCounter.sleepUntilNextRetry();
410+
} catch (InterruptedException ex) {
411+
// ignore
412+
LOG.warn("Sleep Interrupted:", ex);
335413
}
336414
}
337415
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hbase.chaos.actions;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.hadoop.hbase.ServerName;
24+
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
/**
29+
* Action that adds high cpu load to a random regionserver for a given duration
30+
*/
31+
public class AddCPULoadAction extends SudoCommandAction {
32+
protected static final Logger LOG = LoggerFactory.getLogger(AddCPULoadAction.class);
33+
private static final String CPU_LOAD_COMMAND =
34+
"seq 1 %s | xargs -I{} -n 1 -P %s timeout %s dd if=/dev/urandom of=/dev/null bs=1M " +
35+
"iflag=fullblock";
36+
37+
private final long duration;
38+
private long processes;
39+
40+
/**
41+
* Add high load to cpu
42+
*
43+
* @param duration Duration that this thread should generate the load for in milliseconds
44+
* @param processes The number of parallel processes, should be equal to cpu threads for max load
45+
*/
46+
public AddCPULoadAction(long duration, long processes, long timeout) {
47+
super(timeout);
48+
this.duration = duration;
49+
this.processes = processes;
50+
}
51+
52+
protected void localPerform() throws IOException {
53+
LOG.info("Starting to execute AddCPULoadAction");
54+
ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers());
55+
String hostname = server.getHostname();
56+
57+
try {
58+
clusterManager.execSudo(hostname, timeout, getCommand());
59+
} catch (IOException ex){
60+
//This will always happen. We use timeout to kill a continously running process
61+
//after the duration expires
62+
}
63+
LOG.info("Finished to execute AddCPULoadAction");
64+
}
65+
66+
private String getCommand(){
67+
return String.format(CPU_LOAD_COMMAND, processes, processes, duration/1000f);
68+
}
69+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hbase.chaos.actions;
20+
21+
import org.apache.commons.lang3.RandomUtils;
22+
import org.apache.hadoop.fs.FSDataOutputStream;
23+
import org.apache.hadoop.fs.FileSystem;
24+
import org.apache.hadoop.fs.LocatedFileStatus;
25+
import org.apache.hadoop.fs.Path;
26+
import org.apache.hadoop.fs.RemoteIterator;
27+
import org.apache.hadoop.hbase.io.hfile.HFile;
28+
import org.apache.hadoop.hbase.util.CommonFSUtils;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
/**
33+
* Action corrupts HFiles with a certain chance.
34+
*/
35+
public class CorruptDataFilesAction extends Action {
36+
private static final Logger LOG = LoggerFactory.getLogger(CorruptDataFilesAction.class);
37+
private float chance;
38+
39+
/**
40+
* Corrupts HFiles with a certain chance
41+
* @param chance chance to corrupt any give data file (0.5 => 50%)
42+
*/
43+
public CorruptDataFilesAction(float chance) {
44+
this.chance = chance * 100;
45+
}
46+
47+
@Override
48+
public void perform() throws Exception {
49+
LOG.info("Start corrupting data files");
50+
51+
FileSystem fs = CommonFSUtils.getRootDirFileSystem(getConf());
52+
Path rootDir = CommonFSUtils.getRootDir(getConf());
53+
Path defaultDir = rootDir.suffix("/data/default");
54+
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(defaultDir, true);
55+
while (iterator.hasNext()){
56+
LocatedFileStatus status = iterator.next();
57+
if(!HFile.isHFileFormat(fs, status.getPath())){
58+
continue;
59+
}
60+
if(RandomUtils.nextFloat(0, 100) > chance){
61+
continue;
62+
}
63+
64+
FSDataOutputStream out = fs.create(status.getPath(), true);
65+
try {
66+
out.write(0);
67+
} finally {
68+
out.close();
69+
}
70+
LOG.info("Corrupting {}", status.getPath());
71+
}
72+
LOG.info("Done corrupting data files");
73+
}
74+
75+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hbase.chaos.actions;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.hadoop.hbase.ServerName;
24+
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
/**
29+
*
30+
* Corrupt network packages on a random regionserver.
31+
*/
32+
public class CorruptPackagesCommandAction extends TCCommandAction {
33+
private static final Logger LOG = LoggerFactory.getLogger(CorruptPackagesCommandAction.class);
34+
private float ratio;
35+
private long duration;
36+
37+
/**
38+
* Corrupt network packages on a random regionserver.
39+
*
40+
* @param ratio the ratio of packages corrupted
41+
* @param duration the time this issue persists in milliseconds
42+
* @param timeout the timeout for executing required commands on the region server in milliseconds
43+
* @param network network interface the regionserver uses for communication
44+
*/
45+
public CorruptPackagesCommandAction(float ratio, long duration, long timeout, String network) {
46+
super(timeout, network);
47+
this.ratio = ratio;
48+
this.duration = duration;
49+
}
50+
51+
protected void localPerform() throws IOException {
52+
LOG.info("Starting to execute CorruptPackagesCommandAction");
53+
ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers());
54+
String hostname = server.getHostname();
55+
56+
try {
57+
clusterManager.execSudoWithRetries(hostname, timeout, getCommand(ADD));
58+
Thread.sleep(duration);
59+
} catch (InterruptedException e) {
60+
LOG.debug("Failed to run the command for the full duration", e);
61+
} finally {
62+
clusterManager.execSudoWithRetries(hostname, timeout, getCommand(DELETE));
63+
}
64+
65+
LOG.info("Finished to execute CorruptPackagesCommandAction");
66+
}
67+
68+
private String getCommand(String operation){
69+
return String.format("tc qdisc %s dev %s root netem corrupt %s%%", operation, network,
70+
ratio * 100);
71+
}
72+
}

0 commit comments

Comments
 (0)