5
5
"""
6
6
Module containing code to create locks enabling dask workers to co-operate.
7
7
8
- This matter is complicated by needing different solutions for different dask scheduler
9
- types, i.e. local 'threads' scheduler, local 'processes' or distributed.
8
+ This matter is complicated by needing different solutions for different dask
9
+ scheduler types, i.e. local 'threads' scheduler, local 'processes' or
10
+ distributed.
10
11
11
- In any case, an "iris.fileformats.netcdf.saver.Saver" object contains a netCDF4.Dataset
12
- targeting an output file, and creates a Saver.file_write_lock object to serialise
13
- write-accesses to the file from dask tasks : All dask-task file writes go via a
14
- "iris.fileformats.netcdf.saver.NetCDFWriteProxy" object, which also contains a link
15
- to the Saver.file_write_lock, and uses it to prevent workers from fouling each other.
12
+ In any case, an "iris.fileformats.netcdf.saver.Saver" object contains a
13
+ netCDF4.Dataset targeting an output file, and creates a Saver.file_write_lock
14
+ object to serialise write-accesses to the file from dask tasks : All dask-task
15
+ file writes go via a "iris.fileformats.netcdf.saver.NetCDFWriteProxy" object,
16
+ which also contains a link to the Saver.file_write_lock, and uses it to prevent
17
+ workers from fouling each other.
16
18
17
19
For each chunk written, the NetCDFWriteProxy acquires the common per-file lock;
18
- opens a Dataset on the file; performs a write to the relevant variable; closes the
19
- Dataset and then releases the lock. This process is obviously very similar to what the
20
- NetCDFDataProxy does for reading lazy chunks.
20
+ opens a Dataset on the file; performs a write to the relevant variable; closes
21
+ the Dataset and then releases the lock. This process is obviously very similar
22
+ to what the NetCDFDataProxy does for reading lazy chunks.
21
23
22
- For a threaded scheduler, the Saver.lock is a simple threading.Lock(). The workers
23
- (threads) execute tasks which contain a NetCDFWriteProxy, as above. All of those
24
- contain the common lock, and this is simply **the same object** for all workers, since
25
- they share an address space.
24
+ For a threaded scheduler, the Saver.lock is a simple threading.Lock(). The
25
+ workers (threads) execute tasks which contain a NetCDFWriteProxy, as above.
26
+ All of those contain the common lock, and this is simply **the same object**
27
+ for all workers, since they share an address space.
26
28
27
29
For a distributed scheduler, the Saver.lock is a `distributed.Lock()` which is
28
30
identified with the output filepath. This is distributed to the workers by
29
- serialising the task function arguments, which will include the NetCDFWriteProxy.
30
- A worker behaves like a process, though it may execute on a remote machine. When a
31
- distributed.Lock is deserialised to reconstruct the worker task, this creates an object
32
- that communicates with the scheduler. These objects behave as a single common lock,
33
- as they all have the same string 'identity', so the scheduler implements inter-process
34
- communication so that they can mutually exclude each other.
31
+ serialising the task function arguments, which will include the
32
+ NetCDFWriteProxy. A worker behaves like a process, though it may execute on a
33
+ remote machine. When a distributed.Lock is deserialised to reconstruct the
34
+ worker task, this creates an object that communicates with the scheduler.
35
+ These objects behave as a single common lock, as they all have the same string
36
+ 'identity', so the scheduler implements inter-process communication so that
37
+ they can mutually exclude each other.
35
38
36
39
It is also *conceivable* that multiple processes could write to the same file in
37
- parallel, if the operating system supports it. However, this also requires that the
38
- libnetcdf C library is built with parallel access option, which is not common.
39
- With the "ordinary" libnetcdf build, a process which attempts to open for writing a file
40
- which is _already_ open for writing simply raises an access error.
41
- In any case, Iris netcdf saver will not support this mode of operation, at present.
40
+ parallel, if the operating system supports it. However, this also requires
41
+ that the libnetcdf C library is built with parallel access option, which is
42
+ not common. With the "ordinary" libnetcdf build, a process which attempts to
43
+ open for writing a file which is _already_ open for writing simply raises an
44
+ access error. In any case, Iris netcdf saver will not support this mode of
45
+ operation, at present.
42
46
43
47
We don't currently support a local "processes" type scheduler. If we did, the
44
- behaviour should be very similar to a distributed scheduler. It would need to use some
45
- other serialisable shared-lock solution in place of 'distributed.Lock', which requires
46
- a distributed scheduler to function.
48
+ behaviour should be very similar to a distributed scheduler. It would need to
49
+ use some other serialisable shared-lock solution in place of
50
+ 'distributed.Lock', which requires a distributed scheduler to function.
47
51
48
52
"""
49
53
import threading
55
59
56
60
57
61
# A dedicated error class, allowing filtering and testing of errors raised here.
58
- class DaskSchedulerTypeError (ValueError ):
62
+ class DaskSchedulerTypeError (ValueError ): # noqa: D101
59
63
pass
60
64
61
65
@@ -82,11 +86,13 @@ def get_dask_array_scheduler_type():
82
86
83
87
Returns one of 'distributed', 'threads' or 'processes'.
84
88
The return value is a valid argument for dask.config.set(scheduler=<type>).
85
- This cannot distinguish between distributed local and remote clusters -- both of
86
- those simply return 'distributed'.
89
+ This cannot distinguish between distributed local and remote clusters --
90
+ both of those simply return 'distributed'.
87
91
88
- NOTE: this takes account of how dask is *currently* configured. It will be wrong
89
- if the config changes before the compute actually occurs.
92
+ Notes
93
+ -----
94
+ This takes account of how dask is *currently* configured. It will
95
+ be wrong if the config changes before the compute actually occurs.
90
96
91
97
"""
92
98
if dask_scheduler_is_distributed ():
@@ -114,8 +120,12 @@ def get_worker_lock(identity: str):
114
120
"""
115
121
Return a mutex Lock which can be shared by multiple Dask workers.
116
122
117
- The type of Lock generated depends on the dask scheduler type, which must therefore
118
- be set up before this is called.
123
+ The type of Lock generated depends on the dask scheduler type, which must
124
+ therefore be set up before this is called.
125
+
126
+ Parameters
127
+ ----------
128
+ identity : str
119
129
120
130
"""
121
131
scheduler_type = get_dask_array_scheduler_type ()
0 commit comments