Skip to content

Commit e506309

Browse files
committed
Delay valve
1 parent 3d30d56 commit e506309

File tree

5 files changed

+60
-4
lines changed

5 files changed

+60
-4
lines changed

lib/Core/Pipeline.php

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,27 @@ public function __invoke(): Generator
3030
foreach ($configuredGenerators as $configuredGenerator) {
3131
$generatorConfig = $configuredGenerator->config();
3232
$generatorConfig = $this->replaceTokens($generatorConfig, $data);
33-
$data = $configuredGenerator->generator()->send([$generatorConfig, $data]);
33+
$response = $configuredGenerator->generator()->send([$generatorConfig, $data]);
3434

3535
if (false === $configuredGenerator->generator()->valid()) {
3636
break 2;
3737
}
3838

39-
if (false === is_array($data)) {
39+
if (false === $response instanceof Signal && false === is_array($response)) {
4040
throw new InvalidYieldedValue(sprintf(
41-
'All yielded values must bne arrays, got "%s"',
41+
'All yielded values must be arrays or Signals, got "%s"',
4242
gettype($data)
4343
));
4444
}
45+
46+
if ($response instanceof Signal) {
47+
switch ($response) {
48+
case Signal::continue():
49+
break 2;
50+
}
51+
}
52+
53+
$data = $response;
4554
}
4655

4756
yield $data;

lib/Core/Signal.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace PhpBench\Pipeline\Core;
4+
5+
final class Signal
6+
{
7+
private $signal;
8+
9+
private function __construct(string $signal)
10+
{
11+
$this->signal = $signal;
12+
}
13+
14+
public static function continue()
15+
{
16+
return new self('continue');
17+
}
18+
}

lib/Extension/Core/CoreExtension.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
use PhpBench\Pipeline\Extension\Core\Stage\Filter\KeysFilter;
1414
use PhpBench\Pipeline\Extension\Core\Stage\Aggregator\DescribeAggregator;
1515
use PhpBench\Pipeline\Extension\Core\Stage\Parameter\CounterParameter;
16+
use PhpBench\Pipeline\Extension\Core\Stage\Valve\DelayValve;
1617

1718
class CoreExtension implements PipelineExtension
1819
{
@@ -32,6 +33,7 @@ public function __construct()
3233
'parameter/counter' => new CounterParameter(),
3334
'sampler/callable' => new CallableSampler(),
3435
'sampler/curl' => new CurlSampler(),
36+
'valve/delay' => new DelayValve(),
3537
'valve/take' => new TakeValve(),
3638
];
3739
}

lib/Extension/Core/Stage/Sampler/CurlSampler.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public function __invoke(): Generator
3232
list($config, $data) = yield $info;
3333
}
3434

35-
yield Signal::continue();
35+
list($config, $data) = yield Signal::continue();
3636
}
3737

3838
curl_multi_close($this->multiHandle);
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace PhpBench\Pipeline\Extension\Core\Stage\Valve;
4+
5+
use PhpBench\Pipeline\Core\Stage;
6+
use Generator;
7+
use PhpBench\Pipeline\Core\Schema;
8+
9+
class DelayValve implements Stage
10+
{
11+
public function __invoke(): Generator
12+
{
13+
list($config, $data) = yield;
14+
15+
while (true) {
16+
usleep($config['time']);
17+
list($config, $data) = yield $data;
18+
}
19+
}
20+
21+
public function configure(Schema $schema)
22+
{
23+
$schema->setDefaults([
24+
'time' => 10000
25+
]);
26+
}
27+
}

0 commit comments

Comments
 (0)