Skip to content

Commit 0f6bd3c

Browse files
committed
Fork -- probably better to do this with the existing pipeline
1 parent ca80a66 commit 0f6bd3c

File tree

3 files changed

+49
-14
lines changed

3 files changed

+49
-14
lines changed

example/http_sampler.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
[ 'pipeline', [ 'stages' => [
1818
[ 'filter/keys', [ 'keys' => [ 'url', 'total_time', 'connect_time', 'concurrency']]],
1919
[ 'aggregator/collector', ['limit' => 2] ],
20+
//[ 'aggregator/describe', [ 'group_by' => 'url', 'describe' => 'total_time' ] ],
2021
[ 'encoder/json', [ 'pretty' => true ] ],
21-
[ 'console/redraw' ],
22+
'console/redraw',
2223
[ 'output/stream' ],
2324
]]],
2425
[ 'pipeline', [ 'stages' => [
2526
[ 'encoder/json' ],
26-
[ 'output/stream', [ 'stream' => 'report.json' ]],
27+
[ 'output/stream', [ 'stream' => 'report.json', 'mode' => 'a' ]],
2728
]]]
2829
]]],
2930
]);

lib/Extension/Core/Stage/Distribution/Fork.php

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,20 @@ public function __invoke(): Generator
1414
{
1515
list($config, $data) = yield;
1616

17-
/**
18-
* @var GeneratorFactory $generatorFactory */
17+
/** @var GeneratorFactory $generatorFactory */
1918
$generatorFactory = $config['generator_factory'];
2019

20+
$configuredGenerators = [];
21+
foreach ($config['stages'] as $stage) {
22+
$configuredGenerators[] = $generatorFactory->generatorFor($stage);
23+
}
24+
2125
while (true) {
22-
foreach ($config['stages'] as $stage) {
23-
$generatorFactory->generatorFor($stage);
24-
yield;
26+
foreach ($configuredGenerators as $configuredGenerator) {
27+
$configuredGenerator->generator()->send([$configuredGenerator->config(), $data]);
2528
}
29+
30+
list($config, $data) = yield $data;
2631
}
2732

2833
}
@@ -33,6 +38,5 @@ public function configure(Schema $schema)
3338
'stages' => []
3439
]);
3540
$schema->setRequired([ 'generator_factory' ]);
36-
3741
}
3842
}

tests/Unit/Extension/Core/Stage/Distribution/ForkTest.php

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,52 @@ public function testForksDataToAStage()
1111
{
1212
$forked = [];
1313
$result = $this->pipeline()
14+
->stage('valve/take', ['quantity' => 1 ])
1415
->stage('distribution/fork', [
1516
'stages' => [
1617
function () use (&$forked) {
1718
$data = yield;
18-
$forked[] = $data;
19-
$data[] = 'foobar';
20-
yield $data;
19+
$forked[] = 'Forked Data';
2120
}
2221
],
2322
])
24-
->generator(['Foobar'])
25-
->current();
23+
->run(['Mainline Data']);
2624

2725
$this->assertEquals([
28-
'Foobar',
26+
'Mainline Data',
2927
], $result, 'Main pipeline data is not affected');
3028

29+
$this->assertEquals([
30+
'Forked Data',
31+
], $forked);
32+
}
33+
34+
public function testForksDataToMultipleStages()
35+
{
36+
$forked = [];
37+
$result = $this->pipeline()
38+
->stage('valve/take', ['quantity' => 1 ])
39+
->stage('distribution/fork', [
40+
'stages' => [
41+
function () use (&$forked) {
42+
list($config, $data) = yield;
43+
$forked[] = 'Forked Data 1';
44+
},
45+
function () use (&$forked) {
46+
list($config, $data) = yield;
47+
$forked[] = 'Forked Data 2';
48+
}
49+
],
50+
])
51+
->run(['Mainline Data']);
52+
53+
$this->assertEquals([
54+
'Mainline Data',
55+
], $result, 'Main pipeline data is not affected');
56+
57+
$this->assertEquals([
58+
'Forked Data 1',
59+
'Forked Data 2',
60+
], $forked);
3161
}
3262
}

0 commit comments

Comments
 (0)