@@ -13,6 +13,7 @@ import 'asset_node.dart';
1313import 'asset_set.dart' ;
1414import 'errors.dart' ;
1515import 'phase_input.dart' ;
16+ import 'phase_output.dart' ;
1617import 'stream_pool.dart' ;
1718import 'transformer.dart' ;
1819import 'utils.dart' ;
@@ -44,15 +45,8 @@ class Phase {
4445 /// phases, they will be the outputs from the previous phase.
4546 final _inputs = new Map <AssetId , PhaseInput >();
4647
47- /// A map of output ids to the asset node outputs for those ids and the
48- /// transforms that produced those asset nodes.
49- ///
50- /// Usually there's only one node for a given output id. However, it's
51- /// possible for multiple transformers to output an asset with the same id. In
52- /// that case, the chronologically first output emitted is passed forward. We
53- /// keep track of the other nodes so that if that output is removed, we know
54- /// which asset to replace it with.
55- final _outputs = new Map <AssetId , Queue <AssetNode >>();
48+ /// The outputs for this phase.
49+ final _outputs = new Map <AssetId , PhaseOutput >();
5650
5751 /// A stream that emits an event whenever this phase becomes dirty and needs
5852 /// to be run.
@@ -77,7 +71,7 @@ class Phase {
7771 /// Returns all currently-available output assets for this phase.
7872 AssetSet get availableOutputs {
7973 return new AssetSet .from (_outputs.values
80- .map ((queue ) => queue.first )
74+ .map ((output ) => output.output )
8175 .where ((node) => node.state.isAvailable)
8276 .map ((node) => node.asset));
8377 }
@@ -125,7 +119,7 @@ class Phase {
125119 return newFuture (() {
126120 if (id.package != cascade.package) return cascade.graph.getAssetNode (id);
127121 if (! _outputs.containsKey (id)) return null ;
128- return _outputs[id].first ;
122+ return _outputs[id].output ;
129123 });
130124 }
131125
@@ -145,89 +139,71 @@ class Phase {
145139 Phase addPhase (Iterable <Transformer > transformers) {
146140 assert (_next == null );
147141 _next = new Phase (cascade, transformers);
148- for (var outputs in _outputs.values) {
149- _next.addInput (outputs.first);
142+ for (var output in _outputs.values.toList ()) {
143+ // Remove [output]'s listeners because now they should get the asset from
144+ // [_next], rather than this phase. Any transforms consuming [output] will
145+ // be re-run and will consume the output from the new final phase.
146+ output.removeListeners ();
147+
148+ // Removing [output]'s listeners will cause it to be removed from
149+ // [_outputs], so we have to put it back.
150+ _outputs[output.output.id] = output;
151+ _next.addInput (output.output);
150152 }
151153 return _next;
152154 }
153155
156+ /// Mark this phase as removed.
157+ ///
158+ /// This will remove all the phase's outputs and all following phases.
159+ void remove () {
160+ removeFollowing ();
161+ for (var input in _inputs.values.toList ()) {
162+ input.remove ();
163+ }
164+ _onDirtyPool.close ();
165+ }
166+
167+ /// Remove all phases after this one.
168+ Phase removeFollowing () {
169+ if (_next == null ) return ;
170+ _next.remove ();
171+ _next = null ;
172+ }
173+
154174 /// Processes this phase.
155175 ///
156176 /// Returns a future that completes when processing is done. If there is
157177 /// nothing to process, returns `null` .
158178 Future process () {
159179 if (! _inputs.values.any ((input) => input.isDirty)) return null ;
160180
181+ var outputIds = new Set <AssetId >();
161182 return Future .wait (_inputs.values.map ((input) {
162183 if (! input.isDirty) return new Future .value (new Set ());
163184 return input.process ().then ((outputs) {
164- return outputs.where (_addOutput).map ((output) => output.id).toSet ();
185+ for (var asset in outputs) {
186+ outputIds.add (asset.id);
187+ if (_outputs.containsKey (asset.id)) {
188+ _outputs[asset.id].add (asset);
189+ } else {
190+ _outputs[asset.id] = new PhaseOutput (this , asset);
191+ _outputs[asset.id].output.whenRemoved
192+ .then ((_) => _outputs.remove (asset.id));
193+ if (_next != null ) _next.addInput (_outputs[asset.id].output);
194+ }
195+ }
165196 });
166- })).then ((collisionsList ) {
197+ })).then ((_ ) {
167198 // Report collisions in a deterministic order.
168- var collisions = unionAll (collisionsList).toList ();
169- collisions.sort ((a, b) => a.compareTo (b));
170- for (var collision in collisions) {
171- // Ensure that there's still a collision. It's possible it was resolved
172- // while another transform was running.
173- if (_outputs[collision].length <= 1 ) continue ;
174- cascade.reportError (new AssetCollisionException (
175- _outputs[collision].where ((asset) => asset.transform != null )
176- .map ((asset) => asset.transform.info),
177- collision));
178- }
179- });
180- }
181-
182- /// Add [output] as an output of this phase, forwarding it to the next phase
183- /// if necessary.
184- ///
185- /// Returns whether or not [output] collides with another pre-existing output.
186- bool _addOutput (AssetNode output) {
187- _handleOutputRemoval (output);
188-
189- if (_outputs.containsKey (output.id)) {
190- _outputs[output.id].add (output);
191- return true ;
192- }
193-
194- _outputs[output.id] = new Queue <AssetNode >.from ([output]);
195- if (_next != null ) _next.addInput (output);
196- return false ;
197- }
198-
199- /// Properly resolve collisions when [output] is removed.
200- void _handleOutputRemoval (AssetNode output) {
201- output.whenRemoved.then ((_) {
202- var assets = _outputs[output.id];
203- if (assets.length == 1 ) {
204- assert (assets.single == output);
205- _outputs.remove (output.id);
206- return ;
207- }
208-
209- // If there was more than one asset, we're resolving a collision --
210- // possibly partially.
211- var wasFirst = assets.first == output;
212- assets.remove (output);
213-
214- // If this was the first asset, we need to pass the next asset
215- // (chronologically) to the next phase. Pump the event queue first to give
216- // [_next] a chance to handle the removal of its input before getting a
217- // new input.
218- if (wasFirst && _next != null ) {
219- newFuture (() => _next.addInput (assets.first));
220- }
221-
222- // If there's still a collision, report it. This lets the user know
223- // if they've successfully resolved the collision or not.
224- if (assets.length > 1 ) {
225- // Pump the event queue to ensure that the removal of the input triggers
226- // a new build to which we can attach the error.
227- newFuture (() => cascade.reportError (new AssetCollisionException (
228- assets.where ((asset) => asset.transform != null )
229- .map ((asset) => asset.transform.info),
230- output.id)));
199+ outputIds = outputIds.toList ();
200+ outputIds.sort ((a, b) => a.compareTo (b));
201+ for (var id in outputIds) {
202+ // It's possible the output was removed before other transforms in this
203+ // phase finished.
204+ if (! _outputs.containsKey (id)) continue ;
205+ var exception = _outputs[id].collisionException;
206+ if (exception != null ) cascade.reportError (exception);
231207 }
232208 });
233209 }
0 commit comments