forked from drush-ops/drush
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatch.inc
359 lines (321 loc) · 12 KB
/
batch.inc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
<?php
/**
* @file
* Drush batch API.
*
* This file contains a fork of the Drupal Batch API that has been drastically
* simplified and tailored to Drush's unique use case.
*
* The existing API is very targeted towards environments that are web accessible,
* and would frequently attempt to redirect the user which would result in the
* drush process being completely destroyed with no hope of recovery.
*
* While the original API does offer a 'non progressive' mode which simply
* calls each operation in sequence within the current process, in most
* implementations (D6), it would still attempt to redirect
* unless very specific conditions were met.
*
* When operating in 'non progressive' mode, Drush would experience the problems
* that the API was written to solve in the first place, specifically that processes
* would exceed the available memory and exit with an error.
*
* Each major release of Drupal has also had slightly different implementations
* of the batch API, and this provides a uniform interface to all of these
* implementations.
*/
use Drush\Log\LogLevel;
/**
* Class extending ArrayObject to allow the batch API to perform logging when
* some keys of the array change.
*
* It is used to wrap batch's $context array and set log messages when values
* are assigned to keys 'message' or 'error_message'.
*
* @see _drush_batch_worker().
*/
class DrushBatchContext extends ArrayObject {
function offsetSet($name, $value) {
if ($name == 'message') {
drush_log(strip_tags($value), LogLevel::OK);
}
elseif ($name == 'error_message') {
drush_set_error('DRUSH_BATCH_ERROR', strip_tags($value));
}
parent::offsetSet($name, $value);
}
}
/**
* Process a Drupal batch by spawning multiple Drush processes.
*
* This function will include the correct batch engine for the current
* major version of Drupal, and will make use of the drush_backend_invoke
* system to spawn multiple worker threads to handle the processing of
* the current batch, while keeping track of available memory.
*
* The batch system will process as many batch sets as possible until
* the entire batch has been completed or half of the available memory
* has been used.
*
* This function is a drop in replacement for the existing batch_process()
* function of Drupal.
*
* @param string $command
* (optional) The command to call for the back end process. By default this will be
* the 'batch-process' command, but some commands such as updatedb will
* have special initialization requirements, and will need to define and
* use their own command.
* @param array $args
* (optional)
* @param array $options
* (optional)
*/
function drush_backend_batch_process($command = 'batch-process', $args = array(), $options = array()) {
// Command line options to pass to the command.
$options['u'] = \Drupal::currentUser()->id();
_drush_backend_batch_process($command, $args, $options);
}
/**
* Process sets from the specified batch.
*
* This function is called by the worker process that is spawned by the
* drush_backend_batch_process function.
*
* The command called needs to call this function after it's special bootstrap
* requirements have been taken care of.
*
* @param int $id
* The batch ID of the batch being processed.
*/
function drush_batch_command($id) {
include_once(DRUSH_DRUPAL_CORE . '/includes/batch.inc');
_drush_batch_command($id);
}
/**
* Main loop for the Drush batch API.
*
* Saves a record of the batch into the database, and progressively call $command to
* process the operations.
*
* @param command
* The command to call to process the batch.
*
*/
function _drush_backend_batch_process($command = 'batch-process', $args, $options) {
$batch =& batch_get();
if (isset($batch)) {
$process_info = array(
'current_set' => 0,
);
$batch += $process_info;
// The batch is now completely built. Allow other modules to make changes
// to the batch so that it is easier to reuse batch processes in other
// enviroments.
\Drupal::moduleHandler()->alter('batch', $batch);
// Assign an arbitrary id: don't rely on a serial column in the 'batch'
// table, since non-progressive batches skip database storage completely.
$batch['id'] = db_next_id();
$args[] = $batch['id'];
$batch['progressive'] = TRUE;
// Move operations to a job queue. Non-progressive batches will use a
// memory-based queue.
foreach ($batch['sets'] as $key => $batch_set) {
_batch_populate_queue($batch, $key);
}
// Store the batch.
/** @var \Drupal\Core\Batch\BatchStorage $batch_storage */
$batch_storage = \Drupal::service('batch.storage');
$batch_storage->create($batch);
$finished = FALSE;
while (!$finished) {
$data = drush_invoke_process('@self', $command, $args);
$finished = drush_get_error() || !$data || (isset($data['context']['drush_batch_process_finished']) && $data['context']['drush_batch_process_finished'] == TRUE);
}
}
}
/**
* Initialize the batch command and call the worker function.
*
* Loads the batch record from the database and sets up the requirements
* for the worker, such as registering the shutdown function.
*
* @param id
* The batch id of the batch being processed.
*/
function _drush_batch_command($id) {
$batch =& batch_get();
$data = db_query("SELECT batch FROM {batch} WHERE bid = :bid", array(
':bid' => $id))->fetchField();
if ($data) {
$batch = unserialize($data);
}
else {
return FALSE;
}
if (!isset($batch['running'])) {
$batch['running'] = TRUE;
}
// Register database update for end of processing.
register_shutdown_function('_drush_batch_shutdown');
if (_drush_batch_worker()) {
_drush_batch_finished();
}
}
/**
* Process batch operations
*
* Using the current $batch process each of the operations until the batch
* has been completed or half of the available memory for the process has been
* reached.
*/
function _drush_batch_worker() {
$batch =& batch_get();
$current_set =& _batch_current_set();
$set_changed = TRUE;
if (empty($current_set['start'])) {
$current_set['start'] = microtime(TRUE);
}
$queue = _batch_queue($current_set);
while (!$current_set['success']) {
// If this is the first time we iterate this batch set in the current
// request, we check if it requires an additional file for functions
// definitions.
if ($set_changed && isset($current_set['file']) && is_file($current_set['file'])) {
include_once DRUPAL_ROOT . '/' . $current_set['file'];
}
$task_message = '';
// Assume a single pass operation and set the completion level to 1 by
// default.
$finished = 1;
if ($item = $queue->claimItem()) {
list($function, $args) = $item->data;
// Build the 'context' array and execute the function call.
$batch_context = array(
'sandbox' => &$current_set['sandbox'],
'results' => &$current_set['results'],
'finished' => &$finished,
'message' => &$task_message,
);
// Magic wrap to catch changes to 'message' key.
$batch_context = new DrushBatchContext($batch_context);
// Tolerate recoverable errors.
// See https://github.com/drush-ops/drush/issues/1930
$halt_on_error = drush_get_option('halt-on-error', TRUE);
drush_set_option('halt-on-error', FALSE);
call_user_func_array($function, array_merge($args, array(&$batch_context)));
drush_set_option('halt-on-error', $halt_on_error);
$finished = $batch_context['finished'];
if ($finished >= 1) {
// Make sure this step is not counted twice when computing $current.
$finished = 0;
// Remove the processed operation and clear the sandbox.
$queue->deleteItem($item);
$current_set['count']--;
$current_set['sandbox'] = array();
}
}
// When all operations in the current batch set are completed, browse
// through the remaining sets, marking them 'successfully processed'
// along the way, until we find a set that contains operations.
// _batch_next_set() executes form submit handlers stored in 'control'
// sets (see form_execute_handlers()), which can in turn add new sets to
// the batch.
$set_changed = FALSE;
$old_set = $current_set;
while (empty($current_set['count']) && ($current_set['success'] = TRUE) && _batch_next_set()) {
$current_set = &_batch_current_set();
$current_set['start'] = microtime(TRUE);
$set_changed = TRUE;
}
// At this point, either $current_set contains operations that need to be
// processed or all sets have been completed.
$queue = _batch_queue($current_set);
// If we are in progressive mode, break processing after 1 second.
if (drush_memory_limit() > 0 && (memory_get_usage() * 2) >= drush_memory_limit()) {
drush_log(dt("Batch process has consumed in excess of 50% of available memory. Starting new thread"), LogLevel::BATCH);
// Record elapsed wall clock time.
$current_set['elapsed'] = round((microtime(TRUE) - $current_set['start']) * 1000, 2);
break;
}
}
// Reporting 100% progress will cause the whole batch to be considered
// processed. If processing was paused right after moving to a new set,
// we have to use the info from the new (unprocessed) set.
if ($set_changed && isset($current_set['queue'])) {
// Processing will continue with a fresh batch set.
$remaining = $current_set['count'];
$total = $current_set['total'];
$progress_message = $current_set['init_message'];
$task_message = '';
}
else {
// Processing will continue with the current batch set.
$remaining = $old_set['count'];
$total = $old_set['total'];
$progress_message = $old_set['progress_message'];
}
$current = $total - $remaining + $finished;
$percentage = _batch_api_percentage($total, $current);
return ($percentage == 100);
}
/**
* End the batch processing:
* Call the 'finished' callbacks to allow custom handling of results,
* and resolve page redirection.
*/
function _drush_batch_finished() {
$batch = &batch_get();
// Execute the 'finished' callbacks for each batch set, if defined.
foreach ($batch['sets'] as $batch_set) {
if (isset($batch_set['finished'])) {
// Check if the set requires an additional file for function definitions.
if (isset($batch_set['file']) && is_file($batch_set['file'])) {
include_once DRUPAL_ROOT . '/' . $batch_set['file'];
}
if (is_callable($batch_set['finished'])) {
$queue = _batch_queue($batch_set);
$operations = $queue->getAllItems();
$elapsed = $batch_set['elapsed'] / 1000;
$elapsed = drush_drupal_major_version() >=8 ? \Drupal::service('date.formatter')->formatInterval($elapsed) : format_interval($elapsed);
$batch_set['finished']($batch_set['success'], $batch_set['results'], $operations, $elapsed);
}
}
}
// Clean up the batch table and unset the static $batch variable.
if (drush_drupal_major_version() >= 8) {
/** @var \Drupal\Core\Batch\BatchStorage $batch_storage */
$batch_storage = \Drupal::service('batch.storage');
$batch_storage->delete($batch['id']);
}
else {
db_delete('batch')
->condition('bid', $batch['id'])
->execute();
}
foreach ($batch['sets'] as $batch_set) {
if ($queue = _batch_queue($batch_set)) {
$queue->deleteQueue();
}
}
$_batch = $batch;
$batch = NULL;
drush_set_option('drush_batch_process_finished', TRUE);
}
/**
* Shutdown function: store the batch data for next request,
* or clear the table if the batch is finished.
*/
function _drush_batch_shutdown() {
if ($batch = batch_get()) {
if (drush_drupal_major_version() >= 8) {
/** @var \Drupal\Core\Batch\BatchStorage $batch_storage */
$batch_storage = \Drupal::service('batch.storage');
$batch_storage->update($batch);
}
else {
db_update('batch')
->fields(array('batch' => serialize($batch)))
->condition('bid', $batch['id'])
->execute();
}
}
}