Skip to content

Commit 4ed39cd

Browse files
committed
Concurrency
1 parent 043f250 commit 4ed39cd

File tree

4 files changed

+46
-11
lines changed

4 files changed

+46
-11
lines changed

lib/Extension/Core/Stage/Aggregator/DescribeAggregator.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public function __invoke(): Generator
2121
$samples[$hash] = [];
2222
}
2323

24-
foreach ($config['describe'] as $field) {
24+
foreach ((array) $config['describe'] as $field) {
2525
if (false === isset($samples[$hash][$field])) {
2626
$samples[$hash][$field] = [];
2727
}
@@ -44,7 +44,7 @@ public function configure(Schema $schema)
4444
private function buildHash(array $row, array $config): string
4545
{
4646
$hash = [];
47-
foreach ($config['group_by'] as $groupBy) {
47+
foreach ((array) $config['group_by'] as $groupBy) {
4848
if (!isset($row[$groupBy])) {
4949
throw new InvalidArgumentException(sprintf(
5050
'Group by field "%s" does not exist in input with fields "%s"',
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
namespace PhpBench\Pipeline\Extension\Core\Stage\Parameter;
4+
5+
use PhpBench\Pipeline\Core\Stage;
6+
use Generator;
7+
use PhpBench\Pipeline\Core\Schema;
8+
9+
class CounterParameter implements Stage
10+
{
11+
public function __invoke(): Generator
12+
{
13+
list($config, $data) = yield;
14+
15+
$count = 0;
16+
while (true) {
17+
list($config, $data) = yield [ $config['name'] => $count += $config['step'] ];
18+
}
19+
}
20+
21+
public function configure(Schema $schema)
22+
{
23+
$schema->setDefaults([
24+
'name' => 'count',
25+
'step' => 1
26+
]);
27+
}
28+
}

lib/Extension/Core/Stage/Sampler/CurlSampler.php

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,10 @@ public function __invoke(): Generator
2323
$this->activeRequests++;
2424
}
2525

26-
$status = curl_multi_exec($this->multiHandle, $active);
2726
$multiInfo = curl_multi_info_read($this->multiHandle);
2827

2928
if (false !== $multiInfo) {
30-
$info = curl_getinfo($multiInfo['handle']);
31-
curl_multi_remove_handle($this->multiHandle, $multiInfo['handle']);
32-
curl_close($multiInfo['handle']);
29+
$info = $this->closeHandle($multiInfo);
3330
$this->activeRequests--;
3431
list($config, $data) = yield $info;
3532
}
@@ -63,5 +60,14 @@ private function sampleUrl(array $config): void
6360
}
6461

6562
curl_multi_add_handle($this->multiHandle, $handle);
63+
curl_multi_exec($this->multiHandle, $active);
64+
}
65+
66+
private function closeHandle($multiInfo)
67+
{
68+
$info = curl_getinfo($multiInfo['handle']);
69+
curl_multi_remove_handle($this->multiHandle, $multiInfo['handle']);
70+
curl_close($multiInfo['handle']);
71+
return $info;
6672
}
6773
}

tests/Unit/Extension/Core/Stage/Sampler/CurlSamplerTest.php

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,15 @@ public function testConcurrentRequests()
8787
->stage('sampler/curl', [
8888
'url' => self::SAMPLE_URL,
8989
'concurrency' => 4,
90-
'headers' => [
91-
'X-Header1' => 'Yes',
92-
'X-Header2' => 'No',
93-
], ])
90+
])
91+
->stage('aggregator/describe', [ 'group_by' => 'url', 'describe' => 'total_time' ])
92+
->stage('parameter/counter')
9493
->run();
9594

9695
$requests = $this->requests();
97-
$this->assertCount(4, $requests);
96+
97+
// although we specified a concurrency of 4, 6 requests were made?
98+
$this->assertCount(6, $requests);
9899
}
99100

100101
private function requests(): array

0 commit comments

Comments
 (0)