|
3 | 3 | namespace React\Async; |
4 | 4 |
|
5 | 5 | use React\EventLoop\Loop; |
| 6 | +use React\Promise\CancellablePromiseInterface; |
6 | 7 | use React\Promise\Deferred; |
7 | 8 | use React\Promise\PromiseInterface; |
8 | 9 |
|
@@ -96,48 +97,44 @@ function ($error) use (&$exception, &$rejected, &$wait) { |
96 | 97 | */ |
97 | 98 | function parallel(array $tasks) |
98 | 99 | { |
| 100 | + $pending = array(); |
99 | 101 | $deferred = new Deferred(); |
100 | 102 | $results = array(); |
101 | | - $errors = array(); |
102 | | - |
103 | | - $done = function () use (&$results, &$errors, $deferred) { |
104 | | - if (count($errors)) { |
105 | | - $deferred->reject(reset($errors)); |
106 | | - return; |
107 | | - } |
108 | | - |
109 | | - $deferred->resolve($results); |
110 | | - }; |
| 103 | + $errored = false; |
111 | 104 |
|
112 | 105 | $numTasks = count($tasks); |
113 | | - |
114 | 106 | if (0 === $numTasks) { |
115 | | - $done(); |
| 107 | + $deferred->resolve($results); |
116 | 108 | } |
117 | 109 |
|
118 | | - $checkDone = function () use (&$results, &$errors, $numTasks, $done) { |
119 | | - if ($numTasks === count($results) || count($errors)) { |
120 | | - $done(); |
121 | | - } |
122 | | - }; |
| 110 | + $taskErrback = function ($error) use (&$pending, $deferred, &$errored) { |
| 111 | + $errored = true; |
| 112 | + $deferred->reject($error); |
123 | 113 |
|
124 | | - $taskErrback = function ($error) use (&$errors, $checkDone) { |
125 | | - $errors[] = $error; |
126 | | - $checkDone(); |
| 114 | + foreach ($pending as $promise) { |
| 115 | + if ($promise instanceof CancellablePromiseInterface) { |
| 116 | + $promise->cancel(); |
| 117 | + } |
| 118 | + } |
| 119 | + $pending = array(); |
127 | 120 | }; |
128 | 121 |
|
129 | 122 | foreach ($tasks as $i => $task) { |
130 | | - $taskCallback = function ($result) use (&$results, $i, $checkDone) { |
| 123 | + $taskCallback = function ($result) use (&$results, &$pending, $numTasks, $i, $deferred) { |
131 | 124 | $results[$i] = $result; |
132 | | - $checkDone(); |
| 125 | + |
| 126 | + if (count($results) === $numTasks) { |
| 127 | + $deferred->resolve($results); |
| 128 | + } |
133 | 129 | }; |
134 | 130 |
|
135 | 131 | $promise = call_user_func($task); |
136 | 132 | assert($promise instanceof PromiseInterface); |
| 133 | + $pending[$i] = $promise; |
137 | 134 |
|
138 | 135 | $promise->then($taskCallback, $taskErrback); |
139 | 136 |
|
140 | | - if ($errors) { |
| 137 | + if ($errored) { |
141 | 138 | break; |
142 | 139 | } |
143 | 140 | } |
|
0 commit comments