Skip to content

Commit

Permalink
[feat] Support buffer mechanism in standalone process of metric (#6030)
Browse files Browse the repository at this point in the history

Co-authored-by: 李铭昕 <715557344@qq.com>
  • Loading branch information
albertcht and limingxinleo authored Aug 17, 2023
1 parent 1de58b1 commit 5668bd5
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 45 deletions.
3 changes: 3 additions & 0 deletions publish/metric.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
'use_standalone_process' => env('METRIC_USE_STANDALONE_PROCESS', true),
'enable_default_metric' => env('METRIC_ENABLE_DEFAULT_METRIC', true),
'default_metric_interval' => env('DEFAULT_METRIC_INTERVAL', 5),
// only available when use_standalone_process is true
'buffer_interval' => env('METRIC_BUFFER_INTERVAL', 5),
'buffer_size' => env('METRIC_BUFFER_SIZE', 200),
'metric' => [
'prometheus' => [
'driver' => Hyperf\Metric\Adapter\Prometheus\MetricFactory::class,
Expand Down
11 changes: 6 additions & 5 deletions src/Adapter/RemoteProxy/Counter.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
*/
namespace Hyperf\Metric\Adapter\RemoteProxy;

use Hyperf\Context\ApplicationContext;
use Hyperf\Metric\Contract\CounterInterface;
use Hyperf\Process\ProcessCollector;
use Hyperf\Metric\Contract\MetricCollectorInterface;

class Counter implements CounterInterface
{
protected const TARGET_PROCESS_NAME = 'metric';

/**
* @var string[]
*/
Expand All @@ -38,7 +37,9 @@ public function with(string ...$labelValues): static
public function add(int $delta): void
{
$this->delta = $delta;
$process = ProcessCollector::get(static::TARGET_PROCESS_NAME)[0];
$process->write(serialize($this));

ApplicationContext::getContainer()
->get(MetricCollectorInterface::class)
->add($this);
}
}
17 changes: 10 additions & 7 deletions src/Adapter/RemoteProxy/Gauge.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
*/
namespace Hyperf\Metric\Adapter\RemoteProxy;

use Hyperf\Context\ApplicationContext;
use Hyperf\Metric\Contract\GaugeInterface;
use Hyperf\Process\ProcessCollector;
use Hyperf\Metric\Contract\MetricCollectorInterface;

class Gauge implements GaugeInterface
{
protected const TARGET_PROCESS_NAME = 'metric';

/**
* @var string[]
*/
Expand All @@ -41,15 +40,19 @@ public function set(float $value): void
{
$this->value = $value;
$this->delta = null;
$process = ProcessCollector::get(static::TARGET_PROCESS_NAME)[0];
$process->write(serialize($this));

ApplicationContext::getContainer()
->get(MetricCollectorInterface::class)
->add($this);
}

public function add(float $delta): void
{
$this->delta = $delta;
$this->value = null;
$process = ProcessCollector::get(static::TARGET_PROCESS_NAME)[0];
$process->write(serialize($this));

ApplicationContext::getContainer()
->get(MetricCollectorInterface::class)
->add($this);
}
}
11 changes: 6 additions & 5 deletions src/Adapter/RemoteProxy/Histogram.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
*/
namespace Hyperf\Metric\Adapter\RemoteProxy;

use Hyperf\Context\ApplicationContext;
use Hyperf\Metric\Contract\HistogramInterface;
use Hyperf\Process\ProcessCollector;
use Hyperf\Metric\Contract\MetricCollectorInterface;

class Histogram implements HistogramInterface
{
protected const TARGET_PROCESS_NAME = 'metric';

/**
* @var string[]
*/
Expand All @@ -38,7 +37,9 @@ public function with(string ...$labelValues): static
public function put(float $sample): void
{
$this->sample = $sample;
$process = ProcessCollector::get(static::TARGET_PROCESS_NAME)[0];
$process->write(serialize($this));

ApplicationContext::getContainer()
->get(MetricCollectorInterface::class)
->add($this);
}
}
49 changes: 49 additions & 0 deletions src/Adapter/RemoteProxy/MetricCollector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Metric\Adapter\RemoteProxy;

use Hyperf\Metric\Contract\MetricCollectorInterface;
use Hyperf\Process\ProcessCollector;

class MetricCollector implements MetricCollectorInterface
{
protected const TARGET_PROCESS_NAME = 'metric';

protected array $buffer = [];

public function __construct(
protected int $bufferSize = 200
) {
}

public function add(object $data): void
{
$this->buffer[] = $data;

if (count($this->buffer) >= $this->bufferSize) {
$this->flush();
}
}

public function flush(): void
{
$process = ProcessCollector::get(static::TARGET_PROCESS_NAME)[0];
$buffer = $this->buffer;
$this->buffer = [];
$process->write(serialize($buffer));
}

public function getBuffer(): array
{
return $this->buffer;
}
}
34 changes: 34 additions & 0 deletions src/Adapter/RemoteProxy/MetricCollectorFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Metric\Adapter\RemoteProxy;

use Hyperf\Contract\ConfigInterface;
use Hyperf\Metric\Contract\MetricCollectorInterface;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;

class MetricCollectorFactory
{
/**
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function __invoke(ContainerInterface $container): MetricCollectorInterface
{
$config = $container->get(ConfigInterface::class);

return new MetricCollector(
(int) $config->get('metric.buffer_size', 200)
);
}
}
5 changes: 5 additions & 0 deletions src/ConfigProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@

use Domnikl\Statsd\Connection;
use Domnikl\Statsd\Connection\UdpSocket;
use Hyperf\Metric\Adapter\RemoteProxy\MetricCollectorFactory;
use Hyperf\Metric\Aspect\CounterAnnotationAspect;
use Hyperf\Metric\Aspect\HistogramAnnotationAspect;
use Hyperf\Metric\Contract\MetricCollectorInterface;
use Hyperf\Metric\Contract\MetricFactoryInterface;
use Hyperf\Metric\Listener\MetricBufferWatcher;
use Hyperf\Metric\Listener\OnBeforeHandle;
use Hyperf\Metric\Listener\OnCoroutineServerStart;
use Hyperf\Metric\Listener\OnMetricFactoryReady;
Expand All @@ -37,6 +40,7 @@ public function __invoke(): array
Adapter::class => InMemory::class,
Connection::class => UdpSocket::class,
DriverInterface::class => Guzzle::class,
MetricCollectorInterface::class => MetricCollectorFactory::class,
],
'aspects' => [
CounterAnnotationAspect::class,
Expand All @@ -56,6 +60,7 @@ public function __invoke(): array
OnBeforeHandle::class,
OnWorkerStart::class,
OnCoroutineServerStart::class,
MetricBufferWatcher::class,
],
'processes' => [
MetricProcess::class,
Expand Down
19 changes: 19 additions & 0 deletions src/Contract/MetricCollectorInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Metric\Contract;

interface MetricCollectorInterface
{
public function add(object $data): void;

public function flush(): void;
}
80 changes: 80 additions & 0 deletions src/Listener/MetricBufferWatcher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Metric\Listener;

use Hyperf\Contract\ConfigInterface;
use Hyperf\Coordinator\Constants;
use Hyperf\Coordinator\CoordinatorManager;
use Hyperf\Coordinator\Timer;
use Hyperf\Coroutine\Coroutine;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeWorkerStart;
use Hyperf\Metric\Contract\MetricCollectorInterface;
use Psr\Container\ContainerInterface;

/**
* Collect and handle metrics before worker start.
* Only used for swoole process mode.
*/
class MetricBufferWatcher implements ListenerInterface
{
private ConfigInterface $config;

private MetricCollectorInterface $collector;

private Timer $timer;

public function __construct(protected ContainerInterface $container)
{
$this->config = $container->get(ConfigInterface::class);
$this->collector = $container->get(MetricCollectorInterface::class);
$this->timer = new Timer();
}

/**
* @return string[] returns the events that you want to listen
*/
public function listen(): array
{
return [
BeforeWorkerStart::class,
];
}

/**
* Handle the Event when the event is triggered, all listeners will
* complete before the event is returned to the EventDispatcher.
*/
public function process(object $event): void
{
if ($event->workerId === null) {
return;
}

/*
* Only start buffer watcher in standalone process mode
*/
if (! $this->config->get('metric.use_standalone_process', true)) {
return;
}

$timerInterval = $this->config->get('metric.buffer_interval', 5);
$timerId = $this->timer->tick($timerInterval, function () {
$this->collector->flush();
});
// Clean up timer on worker exit;
Coroutine::create(function () use ($timerId) {
CoordinatorManager::until(Constants::WORKER_EXIT)->yield();
$this->timer->clear($timerId);
});
}
}
Loading

0 comments on commit 5668bd5

Please sign in to comment.