Skip to content

[dask] sync from custom-images ; merge tests from rapids #1254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 76 additions & 32 deletions dask/dask.sh
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ function main() {
echo "Dask for ${DASK_RUNTIME} successfully initialized."
}

function exit_handler() (
set +e
function exit_handler() {
set +ex
echo "Exit handler invoked"

# Free conda cache
Expand All @@ -528,16 +528,29 @@ function exit_handler() (
# Clear pip cache
pip cache purge || echo "unable to purge pip cache"

# remove the tmpfs conda pkgs_dirs
if [[ -d /mnt/shm ]] ; then /opt/conda/miniconda3/bin/conda config --remove pkgs_dirs /mnt/shm ; fi
# If system memory was sufficient to mount memory-backed filesystems
if [[ "${tmpdir}" == "/mnt/shm" ]] ; then
# Stop hadoop services
systemctl list-units | perl -n -e 'qx(systemctl stop $1) if /^.*? ((hadoop|knox|hive|mapred|yarn|hdfs)\S*).service/'

# Clean up shared memory mounts
for shmdir in /var/cache/apt/archives /var/cache/dnf /mnt/shm ; do
if grep -q "^tmpfs ${shmdir}" /proc/mounts ; then
rm -rf ${shmdir}/*
umount -f ${shmdir}
fi
done
# remove the tmpfs conda pkgs_dirs
/opt/conda/miniconda3/bin/conda config --remove pkgs_dirs /mnt/shm || echo "unable to remove pkgs_dirs conda config"

# remove the tmpfs pip cache-dir
pip config unset global.cache-dir || echo "unable to unset global pip cache"

# Clean up shared memory mounts
for shmdir in /var/cache/apt/archives /var/cache/dnf /mnt/shm ; do
if grep -q "^tmpfs ${shmdir}" /proc/mounts ; then
sync
sleep 3s
execute_with_retries umount -f ${shmdir}
fi
done

umount -f /tmp
systemctl list-units | perl -n -e 'qx(systemctl start $1) if /^.*? ((hadoop|knox|hive|mapred|yarn|hdfs)\S*).service/'
fi

# Clean up OS package cache ; re-hold systemd package
if is_debuntu ; then
Expand All @@ -547,34 +560,65 @@ function exit_handler() (
dnf clean all
fi

# print disk usage statistics
if is_debuntu ; then
# Rocky doesn't have sort -h and fails when the argument is passed
du --max-depth 3 -hx / | sort -h | tail -10
# print disk usage statistics for large components
if is_ubuntu ; then
du -hs \
/usr/lib/{pig,hive,hadoop,jvm,spark,google-cloud-sdk,x86_64-linux-gnu} \
/usr/lib \
/opt/nvidia/* \
/usr/local/cuda-1?.? \
/opt/conda/miniconda3 \
"${DASK_CONDA_ENV}"
elif is_debian ; then
du -hs \
/usr/lib/{pig,hive,hadoop,jvm,spark,google-cloud-sdk,x86_64-linux-gnu} \
/usr/lib \
/usr/local/cuda-1?.? \
/opt/conda/miniconda3 \
"${DASK_CONDA_ENV}"
else
du -hs \
/var/lib/docker \
/usr/lib/{pig,hive,hadoop,firmware,jvm,spark,atlas} \
/usr/lib64/google-cloud-sdk \
/usr/lib \
/opt/nvidia/* \
/usr/local/cuda-1?.? \
/opt/conda/miniconda3 \
"${DASK_CONDA_ENV}"
fi

# Process disk usage logs from installation period
rm -f "${tmpdir}/keep-running-df"
sleep 6s
rm -f /run/keep-running-df
sync
sleep 5.01s
# compute maximum size of disk during installation
# Log file contains logs like the following (minus the preceeding #):
#Filesystem Size Used Avail Use% Mounted on
#/dev/vda2 6.8G 2.5G 4.0G 39% /
df -h / | tee -a "${tmpdir}/disk-usage.log"
perl -e '$max=( sort
map { (split)[2] =~ /^(\d+)/ }
grep { m:^/: } <STDIN> )[-1];
print( "maximum-disk-used: $max", $/ );' < "${tmpdir}/disk-usage.log"
#Filesystem 1K-blocks Used Available Use% Mounted on
#/dev/vda2 7096908 2611344 4182932 39% /
df / | tee -a "/run/disk-usage.log"

perl -e '@siz=( sort { $a => $b }
map { (split)[2] =~ /^(\d+)/ }
grep { m:^/: } <STDIN> );
$max=$siz[0]; $min=$siz[-1]; $inc=$max-$min;
print( " samples-taken: ", scalar @siz, $/,
"maximum-disk-used: $max", $/,
"minimum-disk-used: $min", $/,
" increased-by: $inc", $/ )' < "/run/disk-usage.log"

echo "exit_handler has completed"

# zero free disk space
if [[ -n "$(get_metadata_attribute creating-image)" ]]; then
dd if=/dev/zero of=/zero ; sync ; rm -f /zero
dd if=/dev/zero of=/zero
sync
sleep 3s
rm -f /zero
fi

return 0
)
}

function prepare_to_install() {
readonly DEFAULT_CUDA_VERSION="12.4"
Expand All @@ -600,7 +644,7 @@ function prepare_to_install() {

free_mem="$(awk '/^MemFree/ {print $2}' /proc/meminfo)"
# Write to a ramdisk instead of churning the persistent disk
if [[ ${free_mem} -ge 5250000 ]]; then
if [[ ${free_mem} -ge 10500000 ]]; then
tmpdir=/mnt/shm
mkdir -p /mnt/shm
mount -t tmpfs tmpfs /mnt/shm
Expand All @@ -622,19 +666,19 @@ function prepare_to_install() {
else
tmpdir=/tmp
fi
install_log="${tmpdir}/install.log"
install_log="/run/install.log"
trap exit_handler EXIT

# Monitor disk usage in a screen session
if is_debuntu ; then
apt-get install -y -qq screen
else
dnf -y -q install screen
fi
rm -f "${tmpdir}/disk-usage.log"
touch "${tmpdir}/keep-running-df"
df / | tee "/run/disk-usage.log"
touch "/run/keep-running-df"
screen -d -m -US keep-running-df \
bash -c "while [[ -f ${tmpdir}/keep-running-df ]] ; do df -h / | tee -a ${tmpdir}/disk-usage.log ; sleep 5s ; done"
bash -c "while [[ -f /run/keep-running-df ]] ; do df / | tee -a /run/disk-usage.log ; sleep 5s ; done"
}

prepare_to_install
Expand Down
162 changes: 94 additions & 68 deletions dask/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,78 +9,104 @@


class DaskTestCase(DataprocTestCase):
COMPONENT = 'dask'
INIT_ACTIONS = ['dask/dask.sh']

DASK_YARN_TEST_SCRIPT = 'verify_dask_yarn.py'
DASK_STANDALONE_TEST_SCRIPT = 'verify_dask_standalone.py'

def verify_dask_yarn(self, name):
self._run_dask_test_script(name, self.DASK_YARN_TEST_SCRIPT)

def verify_dask_standalone(self, name, master_hostname):
script=self.DASK_STANDALONE_TEST_SCRIPT
verify_cmd = "/opt/conda/miniconda3/envs/dask/bin/python {} {}".format(
script,
master_hostname
)
abspath=os.path.join(os.path.dirname(os.path.abspath(__file__)),script)
self.upload_test_file(abspath, name)
COMPONENT = 'dask'
INIT_ACTIONS = [
'dask/dask.sh'
]
INTERPRETER = '/opt/conda/miniconda3/envs/dask/bin/python'

DASK_YARN_TEST_SCRIPT = 'verify_dask_yarn.py'
DASK_STANDALONE_TEST_SCRIPT = 'verify_dask_standalone.py'

def verify_dask_standalone(self, name, master_hostname):
script=self.DASK_STANDALONE_TEST_SCRIPT
verify_cmd = "{} {} {}".format(
INTERPRETER,
script,
master_hostname
)
abspath=os.path.join(os.path.dirname(os.path.abspath(__file__)),script)
self.upload_test_file(abspath, name)
self.assert_instance_command(name, verify_cmd)
self.remove_test_script(script, name)

def _run_dask_test_script(self, name, script):
test_filename=os.path.join(os.path.dirname(os.path.abspath(__file__)),
script), name)
self.upload_test_file(test_filename, name)
verify_cmd = "{} {}".format(
INTERPRETER,
script)
command_asserted=0
for try_number in range(0, 3):
try:
self.assert_instance_command(name, verify_cmd)
command_asserted=1
break
except:
time.sleep(2**try_number)
if command_asserted == 0:
raise Exception("Unable to assert instance command [{}]".format(verify_cmd))

self.remove_test_script(script, name)

def verify_dask_worker_service(self, name):
verify_cmd = "[[ X$(systemctl show dask-worker -p SubState --value)X == XrunningX ]]"
# Retry the first ssh to ensure it has enough time to propagate SSH keys
command_asserted=0
for try_number in range(0, 3):
try:
self.assert_instance_command(name, verify_cmd)
self.remove_test_script(script, name)

def _run_dask_test_script(self, name, script):
verify_cmd = "/opt/conda/miniconda3/envs/dask/bin/python {}".format(
script)
self.upload_test_file(
os.path.join(os.path.dirname(os.path.abspath(__file__)),
script), name)
command_asserted=0
for try_number in range(0, 7):
try:
self.assert_instance_command(name, verify_cmd)
command_asserted=1
break
except:
time.sleep(2**try_number)
if command_asserted == 0:
raise Exception("Unable to assert instance command [{}]".format(verify_cmd))

self.remove_test_script(script, name)


@parameterized.parameters(
("STANDARD", ["m", "w-0"], "yarn"),
("STANDARD", ["m"], "standalone"),
("KERBEROS", ["m"], "standalone"),
command_asserted=1
break
except:
time.sleep(2**try_number)
if command_asserted == 0:
raise Exception("Unable to assert instance command [{}]".format(verify_cmd))

def verify_dask_config(self, name):
self.assert_instance_command(
name, "[[ $(wc -l /etc/dask/config.yaml) == 11 ]]")

@parameterized.parameters(
("STANDARD", ["m", "w-0"], "yarn"),
("STANDARD", ["m", "w-0", "w-1"], "standalone"),
("KERBEROS", ["m"], None),
("HA", ["m-0"], None),
("SINGLE", ["m"], None),
)
def test_dask(self, configuration, machine_suffixes, dask_runtime):

if self.getImageVersion() < pkg_resources.parse_version("2.0"):
self.skipTest("Not supported in pre-2.0 images")

metadata = None
if dask_runtime:
metadata = "dask-runtime={}".format(dask_runtime)

self.createCluster(
configuration,
self.INIT_ACTIONS,
metadata=metadata,
machine_type='n1-standard-8',
timeout_in_minutes=20
)
def test_dask(self, configuration, instances, runtime):

if self.getImageVersion() < pkg_resources.parse_version("2.0"):
self.skipTest("Not supported in pre-2.0 images")
c_name=self.getClusterName()
if configuration == 'HA':
master_hostname = c_name + '-m-0'
else:
master_hostname = c_name + '-m'

metadata = None
if runtime:
metadata = "dask-runtime={}".format(runtime)
for machine_suffix in machine_suffixes:
machine_name = "{}-{}".format(c_name, machine_suffix)

self.createCluster(configuration,
self.INIT_ACTIONS,
machine_type='n1-standard-16',
metadata=metadata,
timeout_in_minutes=20)

if configuration == 'HA':
master_hostname = self.getClusterName() + '-m-0'
else:
master_hostname = self.getClusterName() + '-m'

for instance in instances:
name = "{}-{}".format(self.getClusterName(), instance)

if runtime == "standalone":
self.verify_dask_standalone(name, master_hostname)
else:
self.verify_dask_yarn(name)
if dask_runtime == 'standalone' or dask_runtime == None:
self.verify_dask_worker_service(machine_name)
self.verify_dask_standalone(machine_name, master_hostname)
elif dask_runtime == 'yarn':
self.verify_dask_config(machine_name)
self._run_dask_test_script(name, self.DASK_YARN_TEST_SCRIPT)

if __name__ == '__main__':
absltest.main()
absltest.main()