Skip to content

Commit 6ccd8be

Browse files
authored
Merge pull request #345 from pyiron/toplevel
Extend QueueAdapter to support dynamic configuration
2 parents 4bc7b9b + b8bf0fa commit 6ccd8be

File tree

3 files changed

+279
-55
lines changed

3 files changed

+279
-55
lines changed

pysqa/queueadapter.py

Lines changed: 93 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
from jinja2 import Template
66

77
from pysqa.base.abstract import QueueAdapterAbstractClass
8-
from pysqa.base.config import QueueAdapterWithConfig, read_config
9-
from pysqa.base.core import execute_command
8+
from pysqa.base.config import QueueAdapterWithConfig, Queues, read_config
9+
from pysqa.base.core import QueueAdapterCore, execute_command
1010
from pysqa.base.modular import ModularQueueAdapter
1111

1212

@@ -39,7 +39,10 @@ class QueueAdapter(QueueAdapterAbstractClass):
3939
"""
4040

4141
def __init__(
42-
self, directory: str = "~/.queues", execute_command: callable = execute_command
42+
self,
43+
directory: Optional[str] = None,
44+
queue_type: Optional[str] = None,
45+
execute_command: callable = execute_command,
4346
):
4447
"""
4548
Initialize the QueueAdapter.
@@ -48,35 +51,41 @@ def __init__(
4851
directory (str): Directory containing the queue.yaml files and corresponding templates.
4952
execute_command (callable): Function to execute commands.
5053
"""
51-
queue_yaml = os.path.join(directory, "queue.yaml")
52-
clusters_yaml = os.path.join(directory, "clusters.yaml")
53-
self._adapter = None
54-
if os.path.exists(queue_yaml):
55-
self._queue_dict = {
56-
"default": set_queue_adapter(
57-
config=read_config(file_name=queue_yaml),
58-
directory=directory,
59-
execute_command=execute_command,
60-
)
61-
}
62-
primary_queue = "default"
63-
elif os.path.exists(clusters_yaml):
64-
config = read_config(file_name=clusters_yaml)
65-
self._queue_dict = {
66-
k: set_queue_adapter(
67-
config=read_config(file_name=os.path.join(directory, v)),
68-
directory=directory,
69-
execute_command=execute_command,
54+
if directory is not None:
55+
queue_yaml = os.path.join(directory, "queue.yaml")
56+
clusters_yaml = os.path.join(directory, "clusters.yaml")
57+
self._adapter = None
58+
if os.path.exists(queue_yaml):
59+
self._queue_dict = {
60+
"default": set_queue_adapter(
61+
config=read_config(file_name=queue_yaml),
62+
directory=directory,
63+
execute_command=execute_command,
64+
)
65+
}
66+
primary_queue = "default"
67+
elif os.path.exists(clusters_yaml):
68+
config = read_config(file_name=clusters_yaml)
69+
self._queue_dict = {
70+
k: set_queue_adapter(
71+
config=read_config(file_name=os.path.join(directory, v)),
72+
directory=directory,
73+
execute_command=execute_command,
74+
)
75+
for k, v in config["cluster"].items()
76+
}
77+
primary_queue = config["cluster_primary"]
78+
else:
79+
raise ValueError(
80+
"Neither a queue.yaml file nor a clusters.yaml file were found in "
81+
+ directory
7082
)
71-
for k, v in config["cluster"].items()
72-
}
73-
primary_queue = config["cluster_primary"]
83+
self._adapter = self._queue_dict[primary_queue]
84+
elif queue_type is not None:
85+
self._queue_dict = {}
86+
self._adapter = QueueAdapterCore(queue_type=queue_type.upper())
7487
else:
75-
raise ValueError(
76-
"Neither a queue.yaml file nor a clusters.yaml file were found in "
77-
+ directory
78-
)
79-
self._adapter = self._queue_dict[primary_queue]
88+
raise ValueError()
8089

8190
def list_clusters(self) -> List[str]:
8291
"""
@@ -97,14 +106,17 @@ def switch_cluster(self, cluster_name: str):
97106
self._adapter = self._queue_dict[cluster_name]
98107

99108
@property
100-
def config(self) -> dict:
109+
def config(self) -> Union[dict, None]:
101110
"""
102111
Get the QueueAdapter configuration.
103112
104113
Returns:
105114
dict: The QueueAdapter configuration.
106115
"""
107-
return self._adapter.config
116+
if isinstance(self._adapter, QueueAdapterWithConfig):
117+
return self._adapter.config
118+
else:
119+
return None
108120

109121
@property
110122
def ssh_delete_file_on_remote(self) -> bool:
@@ -114,7 +126,10 @@ def ssh_delete_file_on_remote(self) -> bool:
114126
Returns:
115127
bool: The value of ssh_delete_file_on_remote property.
116128
"""
117-
return self._adapter.ssh_delete_file_on_remote
129+
if isinstance(self._adapter, QueueAdapterWithConfig):
130+
return self._adapter.ssh_delete_file_on_remote
131+
else:
132+
return False
118133

119134
@property
120135
def remote_flag(self) -> bool:
@@ -124,37 +139,49 @@ def remote_flag(self) -> bool:
124139
Returns:
125140
bool: The value of remote_flag property.
126141
"""
127-
return self._adapter.remote_flag
142+
if isinstance(self._adapter, QueueAdapterWithConfig):
143+
return self._adapter.remote_flag
144+
else:
145+
return False
128146

129147
@property
130-
def queue_list(self) -> List[str]:
148+
def queue_list(self) -> Union[List[str], None]:
131149
"""
132150
Get the list of available queues.
133151
134152
Returns:
135153
List[str]: The list of available queues.
136154
"""
137-
return self._adapter.queue_list
155+
if isinstance(self._adapter, QueueAdapterWithConfig):
156+
return self._adapter.queue_list
157+
else:
158+
return None
138159

139160
@property
140-
def queue_view(self) -> pandas.DataFrame:
161+
def queue_view(self) -> Union[pandas.DataFrame, None]:
141162
"""
142163
Get the Pandas DataFrame representation of the available queues.
143164
144165
Returns:
145166
pandas.DataFrame: The Pandas DataFrame representation of the available queues.
146167
"""
147-
return self._adapter.queue_view
168+
if isinstance(self._adapter, QueueAdapterWithConfig):
169+
return self._adapter.queue_view
170+
else:
171+
return None
148172

149173
@property
150-
def queues(self) -> List[str]:
174+
def queues(self) -> Union[Queues, None]:
151175
"""
152176
Get the list of available queues.
153177
154178
Returns:
155179
List[str]: The list of available queues.
156180
"""
157-
return self._adapter.queues
181+
if isinstance(self._adapter, QueueAdapterWithConfig):
182+
return self._adapter.queues
183+
else:
184+
return None
158185

159186
def submit_job(
160187
self,
@@ -220,7 +247,10 @@ def get_job_from_remote(self, working_directory: str):
220247
Args:
221248
working_directory (str): The working directory.
222249
"""
223-
self._adapter.get_job_from_remote(working_directory=working_directory)
250+
if isinstance(self._adapter, QueueAdapterWithConfig):
251+
self._adapter.get_job_from_remote(working_directory=working_directory)
252+
else:
253+
raise TypeError()
224254

225255
def transfer_file_to_remote(
226256
self,
@@ -236,11 +266,14 @@ def transfer_file_to_remote(
236266
transfer_back (bool): Whether to transfer the file back.
237267
delete_file_on_remote (bool): Whether to delete the file on the remote host.
238268
"""
239-
self._adapter.transfer_file(
240-
file=file,
241-
transfer_back=transfer_back,
242-
delete_file_on_remote=delete_file_on_remote,
243-
)
269+
if isinstance(self._adapter, QueueAdapterWithConfig):
270+
self._adapter.transfer_file(
271+
file=file,
272+
transfer_back=transfer_back,
273+
delete_file_on_remote=delete_file_on_remote,
274+
)
275+
else:
276+
raise TypeError()
244277

245278
def convert_path_to_remote(self, path: str) -> str:
246279
"""
@@ -252,7 +285,10 @@ def convert_path_to_remote(self, path: str) -> str:
252285
Returns:
253286
str: The remote path.
254287
"""
255-
return self._adapter.convert_path_to_remote(path=path)
288+
if isinstance(self._adapter, QueueAdapterWithConfig):
289+
return self._adapter.convert_path_to_remote(path=path)
290+
else:
291+
raise TypeError()
256292

257293
def delete_job(self, process_id: int) -> str:
258294
"""
@@ -334,13 +370,16 @@ def check_queue_parameters(
334370
Returns:
335371
List: A list containing the checked parameters [cores, run_time_max, memory_max].
336372
"""
337-
return self._adapter.check_queue_parameters(
338-
queue=queue,
339-
cores=cores,
340-
run_time_max=run_time_max,
341-
memory_max=memory_max,
342-
active_queue=active_queue,
343-
)
373+
if isinstance(self._adapter, QueueAdapterWithConfig):
374+
return self._adapter.check_queue_parameters(
375+
queue=queue,
376+
cores=cores,
377+
run_time_max=run_time_max,
378+
memory_max=memory_max,
379+
active_queue=active_queue,
380+
)
381+
else:
382+
return cores, run_time_max, memory_max
344383

345384

346385
def set_queue_adapter(

tests/test_basic.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ def test_missing_config(self):
1515
with self.assertRaises(ValueError):
1616
QueueAdapter(directory=os.path.join(self.path, "config/error"))
1717

18+
def test_no_config(self):
19+
with self.assertRaises(ValueError):
20+
QueueAdapter()
21+
1822
def test_bad_queue_template(self):
1923
with self.assertRaises(TemplateSyntaxError):
2024
QueueAdapter(directory=os.path.join(self.path, "config/bad_template"))

0 commit comments

Comments
 (0)