Skip to content

Commit

Permalink
Limit total size of mini-cluster logs in C++ and Java; fix remote_bui…
Browse files Browse the repository at this point in the history
…ld.py

Summary:
Limit total size of mini-cluster logs in C++ and Java mini-cluster tests.
Fix home directory handling in remote_build.py.

Test Plan:
Jenkins
Run remote_build.py

Reviewers: timur, sergei

Reviewed By: sergei

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D5854
  • Loading branch information
mbautin committed Dec 15, 2018
1 parent 0638992 commit efab68b
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 36 deletions.
22 changes: 14 additions & 8 deletions bin/remote_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def remote_communicate(args, remote_command, error_ok=False):
return True


def check_remote_files(args, files):
remote_command = "cd {0} && git diff --name-status".format(args.remote_path)
def check_remote_files(escaped_remote_path, args, files):
remote_command = "cd {0} && git diff --name-status".format(escaped_remote_path)
remote_changed, remote_deleted = \
parse_name_status(check_output_lines(['ssh', args.host, remote_command]))
unexpected = []
Expand All @@ -76,9 +76,9 @@ def check_remote_files(args, files):
if unexpected:
command = 'cd {0}'.format(args.remote_path)
message = 'Reverting:\n'
for file in unexpected:
for file_path in unexpected:
message += ' {0}\n'.format(file)
command += ' && git checkout -- {0}'.format(shlex.quote(file))
command += ' && git checkout -- {0}'.format(shlex.quote(file_path))
print(message)
remote_communicate(args, command)

Expand Down Expand Up @@ -161,9 +161,15 @@ def main():
time.sleep(1)

remote_commit = fetch_remote_commit(args)

if args.remote_path.startswith('~/'):
escaped_remote_path = '$HOME/' + shlex.quote(args.remote_path[2:])
else:
escaped_remote_path = shlex.quote(args.remote_path)

if remote_commit != commit:
print("Remote commit mismatch, syncing")
remote_command = 'cd {0} && '.format(shlex.quote(args.remote_path))
remote_command = 'cd {0} && '.format(escaped_remote_path)
remote_command += 'git checkout -- . && '
remote_command += 'git clean -f . && '
remote_command += 'git checkout master && '
Expand Down Expand Up @@ -199,13 +205,13 @@ def main():
sys.exit(proc.returncode)

if del_files:
remote_command = 'cd {0} && rm -f '.format(args.remote_path)
remote_command = 'cd {0} && rm -f '.format(escaped_remote_path)
for file in del_files:
remote_command += shlex.quote(file)
remote_command += ' '
remote_communicate(args, remote_command)

check_remote_files(args, files)
check_remote_files(escaped_remote_path, args, files)

if args.skip_build:
sys.exit(0)
Expand All @@ -223,7 +229,7 @@ def main():
ybd_args = add_extra_ybd_args(ybd_args,
['--host-for-tests', os.environ['YB_HOST_FOR_RUNNING_TESTS']])

remote_command = "cd {0} && ./yb_build.sh".format(args.remote_path)
remote_command = "cd {0} && ./yb_build.sh".format(escaped_remote_path)
for arg in ybd_args:
remote_command += " {0}".format(shlex.quote(arg))
print("Remote command: {0}".format(remote_command))
Expand Down
36 changes: 34 additions & 2 deletions build-support/common-build-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,10 @@ detect_linuxbrew() {
break
fi
done

if [[ -z ${YB_LINUXBREW_DIR:-} ]]; then
log "Could not find Linuxbrew in any of these directories:" "${candidates[@]}"
fi
}

# -------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1852,9 +1856,9 @@ activate_virtualenv() {
remove_path_entry "$old_virtual_env/bin"
fi
# We need to be using system python to install the virtualenv module or create a new virtualenv.
pip2 install virtualenv --user
(
set -x
pip2 install virtualenv --user
mkdir -p "$virtualenv_parent_dir"
cd "$virtualenv_parent_dir"
python2 -m virtualenv "$YB_VIRTUALENV_BASENAME"
Expand All @@ -1868,7 +1872,8 @@ activate_virtualenv() {
pip_no_cache="--no-cache-dir"
fi

pip2 install -r "$YB_SRC_ROOT/requirements.txt" $pip_no_cache
run_with_retries 10 0.5 pip2 install -r "$YB_SRC_ROOT/python_requirements_frozen.txt" \
$pip_no_cache
}

check_python_interpreter_version() {
Expand Down Expand Up @@ -1999,6 +2004,33 @@ lint_java_code() {
return 1
fi
}

run_with_retries() {
if [[ $# -lt 2 ]]; then
fatal "run_with_retries requires at least three arguments: max_attempts, delay_sec, and " \
"the command to run (at least one additional argument)."
fi
declare -i -r max_attempts=$1
declare -r delay_sec=$2
shift 2

declare -i attempt_index=1
while [[ $attempt_index -le $max_attempts ]]; do
set +e
"$@"
declare exit_code=$?
set -e
if [[ $exit_code -eq 0 ]]; then
return
fi
log "Warning: command failed with exit code $exit_code at attempt $attempt_index: $*." \
"Waiting for $delay_sec sec, will then re-try for up to $max_attempts attempts."
let attempt_index+=1
sleep "$delay_sec"
done
fatal "Failed to execute command after $max_attempts attempts: $*"
}

# -------------------------------------------------------------------------------------------------
# Initialization
# -------------------------------------------------------------------------------------------------
Expand Down
56 changes: 39 additions & 17 deletions java/yb-client/src/test/java/org/yb/minicluster/LogPrinter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.yb.util.EnvAndSysPropertyUtil;

/**
* Helper runnable that can log what the processes are sending on their stdout and stderr that
Expand All @@ -36,10 +39,18 @@ public class LogPrinter {
private final AtomicBoolean stopRequested = new AtomicBoolean(false);
private final Object stopper = new Object();

private static final AtomicLong totalLoggedSize = new AtomicLong();
private static final AtomicBoolean logSizeExceededThrown = new AtomicBoolean(false);

private boolean stopped = false;

private static final boolean LOG_PRINTER_DEBUG = false;

private static final long MAX_ALLOWED_LOGGED_BYTES =
EnvAndSysPropertyUtil.getLongEnvVarOrSystemProperty(
"YB_JAVA_TEST_MAX_ALLOWED_LOG_BYTES",
50 * 1024 * 1024);

// A mechanism to wait for a line in the log that says that the server is starting.
private LogErrorListener errorListener;

Expand All @@ -66,28 +77,39 @@ private void runThread() {
try {
String line;
BufferedReader in = new BufferedReader(new InputStreamReader(stream));
if (LOG_PRINTER_DEBUG) {
LOG.info("Starting log printer with prefix " + logPrefix);
}
try {
while (!stopRequested.get()) {
while ((line = in.readLine()) != null) {
if (errorListener != null) {
errorListener.handleLine(line);
if (LOG_PRINTER_DEBUG) {
LOG.info("Starting log printer with prefix " + logPrefix);
}
try {
while (!stopRequested.get()) {
while ((line = in.readLine()) != null) {
if (errorListener != null) {
errorListener.handleLine(line);
}
System.out.println(logPrefix + line);
if (totalLoggedSize.addAndGet(line.length() + 1) > MAX_ALLOWED_LOGGED_BYTES) {
if (logSizeExceededThrown.compareAndSet(false, true)) {
String errMsg = "Max total log size exceeded: " + MAX_ALLOWED_LOGGED_BYTES;
LOG.warn(errMsg);
throw new AssertionError(errMsg);
}
return;
}
System.out.flush();
}
System.out.println(logPrefix + line);
System.out.flush();
// Sleep for a short time and give the child process a chance to generate more output.
Thread.sleep(10);
}
// Sleep for a short time and give the child process a chance to generate more output.
Thread.sleep(10);
} catch (InterruptedException iex) {
// This probably means we're stopping, OK to ignore.
}
} catch (InterruptedException iex) {
// This probably means we're stopping, OK to ignore.
}
if (LOG_PRINTER_DEBUG) {
LOG.info("Finished log printer with prefix " + logPrefix);
if (LOG_PRINTER_DEBUG) {
LOG.info("Finished log printer with prefix " + logPrefix);
}
} finally {
in.close();
}
in.close();
} catch (Exception e) {
String msg = e.getMessage();
if (msg == null || !msg.contains("Stream closed")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,12 @@ public static String getEnvVarOrSystemProperty(String envVarName) {
return getEnvVarOrSystemProperty(envVarName, null);
}

public static long getLongEnvVarOrSystemProperty(String envVarName, long defaultValue) {
String strValue = getEnvVarOrSystemProperty(envVarName);
if (strValue != null) {
return Long.valueOf(strValue);
}
return defaultValue;
}

}
4 changes: 4 additions & 0 deletions python_requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
PyYAML
six
yugabyte_pycommon
compiledb
8 changes: 8 additions & 0 deletions python_requirements_frozen.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
bashlex==0.12
Click==7.0
compiledb==0.9.8
enum34==1.1.6
PyYAML==3.13
six==1.12.0
yugabyte-pycommon==1.9.3

4 changes: 0 additions & 4 deletions requirements.txt

This file was deleted.

19 changes: 14 additions & 5 deletions src/yb/integration-tests/external_mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ DEFINE_bool(external_daemon_safe_shutdown, false,
DECLARE_int64(outbound_rpc_block_size);
DECLARE_int64(outbound_rpc_memory_limit);

DEFINE_int64(external_mini_cluster_max_log_bytes, 50_MB,
"Max total size of log bytes produced by all external mini-cluster daemons. "
"The test is shut down if this limit is exceeded.");

namespace yb {

static const char* const kMasterBinaryName = "yb-master";
Expand Down Expand Up @@ -1240,16 +1244,16 @@ namespace {
// and is never deallocated.
struct GlobalLogTailerState {
mutex logging_mutex;
atomic<int> next_log_tailer_id;
atomic<int> next_log_tailer_id{0};

// We need some references to these heap-allocated atomic booleans so that ASAN would not consider
// them memory leaks.
mutex id_to_stopped_flag_mutex;
map<int, atomic<bool>*> id_to_stopped_flag;

GlobalLogTailerState() {
next_log_tailer_id.store(0);
}
// This is used to limit the total amount of logs produced by external daemons over the lifetime
// of a test program. Guarded by logging_mutex.
size_t total_bytes_logged = 0;
};

} // anonymous namespace
Expand Down Expand Up @@ -1282,20 +1286,25 @@ class ExternalDaemon::LogTailerThread {
// The "stopped" flag itself is never deallocated.
bool is_eof = false;
bool is_fgets_null = false;
auto& logging_mutex = global_state()->logging_mutex;
auto& total_bytes_logged = global_state()->total_bytes_logged;
while (!(is_eof = feof(fp)) &&
!(is_fgets_null = (fgets(buf, sizeof(buf), fp) == nullptr)) &&
!stopped->load()) {
size_t l = strlen(buf);
const char* maybe_end_of_line = l > 0 && buf[l - 1] == '\n' ? "" : "\n";
// Synchronize tailing output from all external daemons for simplicity.
lock_guard<mutex> lock(global_state()->logging_mutex);
lock_guard<mutex> lock(logging_mutex);
if (stopped->load()) break;
// Make sure we always output an end-of-line character.
*out << line_prefix << " " << buf << maybe_end_of_line;
auto listener = listener_.load();
if (listener) {
listener->Handle(GStringPiece(buf, maybe_end_of_line ? l : l - 1));
}
total_bytes_logged += strlen(buf) + strlen(maybe_end_of_line);
// Abort the test if it produces too much log spew.
CHECK_LE(total_bytes_logged, FLAGS_external_mini_cluster_max_log_bytes);
}
fclose(fp);
if (!stopped->load()) {
Expand Down

0 comments on commit efab68b

Please sign in to comment.