Skip to content

Commit

Permalink
Merge branch 'devel_nodb_2' of github.com:radical-cybertools/radical.…
Browse files Browse the repository at this point in the history
…pilot into devel_nodb_2
  • Loading branch information
andre-merzky committed Sep 18, 2023
2 parents a5e6532 + 2af1ed9 commit 12eb5e5
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 40 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ name: CI

on:
push:
branches: [ devel ]
branches:
- 'devel*'
pull_request:
branches: [ devel ]
branches:
- 'devel*'

jobs:

Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ name: 'Test Jupyter notebooks'

on:
push:
branches: [ devel ]
branches:
- 'devel*'
pull_request:
branches: [ devel ]
branches:
- 'devel*'

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
Expand Down
24 changes: 23 additions & 1 deletion .github/workflows/run-rp-notebook.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,30 @@ jobs:
- name: Run Jupyter Notebook
env:
TARGET_PATH: ${{ format('{0}/{1}/{2}', inputs.documentation-path, inputs.notebook-path, inputs.notebook-name) }}
timeout-minutes: 5
# continue-on-error: true
run: |
. testenv/bin/activate
jupyter nbconvert --clear-output --inplace $TARGET_PATH
jupyter nbconvert --to notebook --execute --inplace $TARGET_PATH
- name: Collect session
if: always()
run: |
SIDCLIENT=$(ls -rt | grep rp.session)
SIDAGENT="$HOME/radical.pilot.sandbox/$SIDCLIENT"
CLIENTNAME="${{inputs.notebook-name}}_client_$SIDCLIENT"
AGENTNAME="${{inputs.notebook-name}}_agent_$SIDCLIENT"
mkdir session
tar cvfj $CLIENTNAME.tar.bz2 $SIDCLIENT
cp -R $CLIENTNAME.tar.bz2 session
if [ -d "$SIDAGENT" ]; then
tar cvfj $AGENTNAME.tar.bz2 $SIDAGENT
cp -R $AGENTNAME.tar.bz2 session
fi
- name: upload session
if: always()
uses: actions/upload-artifact@v3
with:
name: session
path: session
retention-days: 5
1 change: 0 additions & 1 deletion .readthedocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ version: 2
formats: [htmlzip]

python:
system_packages: true
version: 3.7
install:
- requirements: requirements-docs.txt
Expand Down
73 changes: 73 additions & 0 deletions bin/radical-pilot-test-slurm
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/bin/sh

ACCOUNT=
PARTITION=
EXCLUSIVE=''

NNODES=1
NTASKS=2
CPT=1
MEM=0
NODELIST=


while getopts "a:p:e" OPTION; do
case $OPTION in
a) ACCOUNT="$OPTARG" ;;
p) PARTITION="$OPTARG" ;;
e) EXCLUSIVE=1 ;;
*) echo "Unknown option: '$OPTION'='$OPTARG'"
return 1;;
esac
done

echo "cmd -a $ACCOUNT -p $PARTITION -e $EXCLUSIVE"

if test "$EXCLUSIVE" = "1"
then
SBATCH_EXCLUSIVE='#SBATCH --exclusive'
SRUN_EXCLUSIVE='--exact'
fi


cat > /tmp/$$.slurm <<EOT
#!/bin/sh
$SBATCH_EXCLUSIVE
#SBATCH --nodes "$NNODES"
#SBATCH --account "$ACCOUNT"
#SBATCH --partition "$PARTITION"
#SBATCH --time "00:05:00"
runtime=30
noise=5
nodelist=\$SLURM_NODELIST
test -z "\$nodelist" && nodelist=\$SLURM_JOB_NODELIST
test -z "\$nodelist" && exit -1
node_1=\$(echo \$nodelist | cut -f 1 -d ,)
options="--nodes=1 --ntasks=1 --cpus-per-task=1 --mem=0 --nodelist='\$node_1'"
start=\$(date "+%s")
srun \$options $SRUN_EXCLUSIVE /bin/sleep \$runtime &
srun \$options $SRUN_EXCLUSIVE /bin/sleep \$runtime &
wait
stop=\$(date "+%s")
duration=\$((stop - start))
if test \$duration -gt \$((2 * runtime - noise))
then
echo "duration too long, no concurrency"
exit 1
else
echo "duration consistent with concurrent execution"
exit 0
fi
EOT

echo "submitting /tmp/$$.slurm"
sbatch /tmp/$$.slurm

44 changes: 36 additions & 8 deletions docs/source/tutorials/staging_data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
"\n",
"<div class=\"alert alert-info\">\n",
"\n",
"__Note:__ In our examples, we will not show a progression bar while waiting for some operation to complete, e.g., while waiting for a pilot to stop. That is because the progression bar offered by RP's reporter does not work well within a notebook. You could use the reporter's progression bar when executing your RP application as a standalone Python script.\n",
"__Note:__ In these examples, we will not show a progression bar while waiting for some operation to complete, e.g., while waiting for a pilot to stop. That is because the progression bar offered by RP's reporter does not work well within a notebook. You could use the reporter's progression bar when executing your RP application as a standalone Python script.\n",
"\n",
"</div>"
]
Expand Down Expand Up @@ -180,7 +180,13 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"For this example, we create a directory `input_dir` within the current working directory, and place a file into this directory. That file will be the input data for every task (this input file is referred in the [radical.pilot.TaskDescription.arguments](../apidoc.rst) attribute). The newly created directory `input_dir` is staged into the `pilot://` location with all its files."
"For this example, create a new directory `input_dir` within the current working directory, and place a file into this directory. That file will be the input data for every task (this input file is referred in the [radical.pilot.TaskDescription.arguments](../apidoc.rst) attribute).\n",
"\n",
"<div class=\"alert alert-warning\">\n",
"\n",
"__Warning:__ You need to ensure that the directory, where your script will create the data for staging, is writable. Also, you are responsible to cleanup that data after it is staged.\n",
"\n",
"</div>"
]
},
{
Expand All @@ -189,19 +195,41 @@
"metadata": {},
"outputs": [],
"source": [
"# Staging directives for the pilot.\n",
"\n",
"import os\n",
"os.makedirs('./input_dir', exist_ok=True)\n",
"\n",
"with open('./input_dir/input.txt', 'w') as f:\n",
" f.write('Staged data (task_id=$RP_TASK_ID | pilot_id=$RP_PILOT_ID | session_id=$RP_SESSION_ID)')\n",
"input_dir = os.path.join(os.getcwd(), 'input_dir')\n",
"os.makedirs(input_dir, exist_ok=True)\n",
"\n",
"with open(input_dir + '/input.txt', 'w') as f:\n",
" f.write('Staged data (task_id=$RP_TASK_ID | pilot_id=$RP_PILOT_ID | session_id=$RP_SESSION_ID)')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You will stage the newly created directory `input_dir` with all its files into the `pilot://` location.\n",
"\n",
"<div class=\"alert alert-info\">\n",
"\n",
"__Note:__ If provided path for `input_staging` is not an absolute path, then RP will look for it within the current working directory. Using absolute paths will guarantee that the staging data will be located correctly.\n",
"\n",
"</div>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Staging directives for the pilot.\n",
"\n",
"pd = rp.PilotDescription({\n",
" 'resource' : 'local.localhost',\n",
" 'cores' : 2,\n",
" 'runtime' : 15,\n",
" 'input_staging': ['input_dir'],\n",
" 'input_staging': [input_dir],\n",
" 'exit_on_error': False\n",
"})\n",
"\n",
Expand Down
2 changes: 1 addition & 1 deletion src/radical/pilot/configs/resource_csc.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"agent_config" : "default",
"agent_scheduler" : "CONTINUOUS",
"agent_spawner" : "POPEN",
"default_remote_workdir" : "/scratch/project_%(pd.project)s",
"default_remote_workdir" : "/scratch/%(pd.project)s",

"pre_bootstrap_0" : [
"module load tykky"
Expand Down
6 changes: 4 additions & 2 deletions tests/unit_tests/test_agent_0/test_agent_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ def test_start_sub_agents(self, mocked_run_sh_callout, mocked_ru_env_prep,
agent_0._pwd = tempfile.gettempdir()
agent_0._log = mock.Mock()
agent_0._sid = 'rp.session.0'
agent_0._cfg = ru.Config(from_dict={

agent_0._session = mock.Mock()
agent_0._session.cfg = ru.Config(from_dict={
'agents': {
'agent_1': {'target' : 'node',
'components': {'agent_executing': {'count': 1}}}
Expand Down Expand Up @@ -199,7 +201,7 @@ def check_agent_task(agent_task, *args, **kwargs):
os.unlink(agent_file)

# incorrect config setup for agent ('target' is in ['local', 'node'])
agent_0._cfg['agents']['agent_1']['target'] = 'incorrect_target'
agent_0._session.cfg['agents']['agent_1']['target'] = 'incorrect_target'
with self.assertRaises(ValueError):
agent_0._start_sub_agents()

Expand Down
25 changes: 11 additions & 14 deletions tests/unit_tests/test_executing/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def work(self, tasks):

for spawner in spawners:
session = ru.Config(cfg={
'_rcfg': { 'agent_spawner' : spawner}})
'rcfg': {'agent_spawner' : spawner}})
try:
AgentExecutingComponent.create(cfg=spawner, session=session)
except:
Expand All @@ -62,22 +62,19 @@ def work(self, tasks):
def test_initialize(self, mocked_rm, mocked_init):

ec = AgentExecutingComponent(cfg=None, session=None)
ec._cfg = ru.TypedDict(from_dict={
'sid' : 'sid.0000',

ec._session = mock.Mock()
ec._session.uid = 'sid.0000'
ec._session.cfg = ru.TypedDict(from_dict={
'resource' : 'resource_config_label',
'resource_sandbox': '',
'session_sandbox' : '',
'pilot_sandbox' : '',
'resource' : 'resource_config_label',
'resource_cfg' : {'order': [],
'launch_methods': {'SRUN': {}}}
'pilot_sandbox' : ''
})
ec._reg = ru.Config(cfg={
'cfg' : {'resource' : 'localhost',
'pilot_sandbox' : '',
'session_sandbox' : '',
'resource_sandbox': ''},
'rcfg': {'resource_manager': 'FORK',
'agent_spawner' : 'POPEN'}})
ec._session.rcfg = ru.TypedDict(from_dict={
'resource_manager': 'FORK',
'agent_spawner' : 'POPEN'})

ec._log = ec._prof = mock.Mock()
ec.work = ec.control_cb = mock.Mock()
ec.register_input = ec.register_output = mock.Mock()
Expand Down
13 changes: 7 additions & 6 deletions tests/unit_tests/test_executing/test_popen.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ def test_handle_task(self, mocked_sp_popen, mocked_lm_init,
pex._log = pex._prof = pex._watch_queue = mock.Mock()
pex._log._debug_level = 1

pex._reg = ru.Config(from_dict={'rcfg.new_session_per_task': False})
pex._cfg = dict()
pex._pwd = ''
pex._pid = 'pilot.0000'
pex.sid = 'session.0000'
Expand All @@ -92,7 +90,9 @@ def test_handle_task(self, mocked_sp_popen, mocked_lm_init,
pex.psbox = ''
pex.gtod = ''
pex.prof = ''
pex._session = ru.Config(cfg={'cfg': {'resource_cfg': {}}})

pex._session = mock.Mock()
pex._session.rcfg = ru.Config(from_dict={'new_session_per_task': False})

pex._rm = mock.Mock()
pex._rm.find_launcher = mocked_find_launcher
Expand Down Expand Up @@ -135,8 +135,9 @@ def test_handle_task(self, mocked_sp_popen, mocked_lm_init,
def test_extend_pre_exec(self, mocked_init):

pex = Popen(cfg=None, session=None)
pex._session = mock.Mock()
pex._session.cfg.get.return_value = None

pex._session = mock.Mock()
pex._session.rcfg = {}

td = {'cores_per_rank': 2,
'threading_type': '',
Expand All @@ -154,7 +155,7 @@ def test_extend_pre_exec(self, mocked_init):
'gpu_type' : rpc.CUDA})

# we target attribute "task_pre_exec"
pex._session.cfg.get.return_value = ['export TEST_ENV=test']
pex._session.rcfg = {'task_pre_exec': ['export TEST_ENV=test']}

pex._extend_pre_exec(td, ranks)
self.assertIn('export OMP_NUM_THREADS=2', td['pre_exec'])
Expand Down
9 changes: 6 additions & 3 deletions tests/unit_tests/test_scheduler/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def test_initialize(self, mocked_env_eval, mocked_hostname, mocked_mp,
sched.nodes = []
sched._partitions = {}

sched._session = mock.Mock()

for c in self._test_cases['initialize']:

def _mock_get(_c, name):
Expand All @@ -62,9 +64,10 @@ def _mock_get(_c, name):
from functools import partial

mock_get = partial(_mock_get, c)
sched._cfg = ru.Config(from_dict={'reg_addr': 'addr'})
sched._reg = ru.Config(from_dict={'cfg': c['config'],
'rcfg': c['config']['resource_cfg']})
sched._session.cfg = ru.Config(
from_dict=c['config'])
sched._session.rcfg = ru.Config(
from_dict=c['config']['resource_cfg'])

with mock.patch.object(ru.zmq.RegistryClient, 'get', mock_get):
if 'RuntimeError' in c['result']:
Expand Down

0 comments on commit 12eb5e5

Please sign in to comment.