Skip to content

[consumption] Logging improvements #555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 9, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[consumption] add InitLogger extension point. Before start.
  • Loading branch information
makasim committed Oct 8, 2018
commit 10e05d6ad8a791731a1440b7d2c59f1e7f3fa4b4
20 changes: 17 additions & 3 deletions pkg/enqueue/Consumption/ChainExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Enqueue\Consumption;

use Enqueue\Consumption\Context\End;
use Enqueue\Consumption\Context\InitLogger;
use Enqueue\Consumption\Context\MessageReceived;
use Enqueue\Consumption\Context\MessageResult;
use Enqueue\Consumption\Context\PostConsume;
Expand All @@ -15,6 +16,7 @@
class ChainExtension implements ExtensionInterface
{
private $startExtensions;
private $initLoggerExtensions;
private $preSubscribeExtensions;
private $preConsumeExtensions;
private $messageReceivedExtensions;
Expand All @@ -24,12 +26,10 @@ class ChainExtension implements ExtensionInterface
private $postConsumeExtensions;
private $endExtensions;

/**
* @param ExtensionInterface[] $extensions
*/
public function __construct(array $extensions)
{
$this->startExtensions = [];
$this->initLoggerExtensions = [];
$this->preSubscribeExtensions = [];
$this->preConsumeExtensions = [];
$this->messageReceivedExtensions = [];
Expand All @@ -42,6 +42,7 @@ public function __construct(array $extensions)
array_walk($extensions, function ($extension) {
if ($extension instanceof ExtensionInterface) {
$this->startExtensions[] = $extension;
$this->initLoggerExtensions[] = $extension;
$this->preSubscribeExtensions[] = $extension;
$this->preConsumeExtensions[] = $extension;
$this->messageReceivedExtensions[] = $extension;
Expand All @@ -61,6 +62,12 @@ public function __construct(array $extensions)
$extensionValid = true;
}

if ($extension instanceof InitLoggerExtensionInterface) {
$this->initLoggerExtensions[] = $extension;

$extensionValid = true;
}

if ($extension instanceof PreSubscribeExtensionInterface) {
$this->preSubscribeExtensions[] = $extension;

Expand Down Expand Up @@ -115,6 +122,13 @@ public function __construct(array $extensions)
});
}

public function onInitLogger(InitLogger $context): void
{
foreach ($this->endExtensions as $extension) {
$extension->onInitLogger($context);
}
}

public function onStart(Start $context): void
{
foreach ($this->startExtensions as $extension) {
Expand Down
28 changes: 28 additions & 0 deletions pkg/enqueue/Consumption/Context/InitLogger.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

namespace Enqueue\Consumption\Context;

use Psr\Log\LoggerInterface;

class InitLogger
{
/**
* @var LoggerInterface
*/
private $logger;

public function __construct(LoggerInterface $logger)
{
$this->logger = $logger;
}

public function getLogger(): LoggerInterface
{
return $this->logger;
}

public function changeLogger(LoggerInterface $logger): void
{
$this->logger = $logger;
}
}
5 changes: 0 additions & 5 deletions pkg/enqueue/Consumption/Context/Start.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ public function getLogger(): LoggerInterface
return $this->logger;
}

public function changeLogger(LoggerInterface $logger): void
{
$this->logger = $logger;
}

/**
* In milliseconds.
*/
Expand Down
66 changes: 8 additions & 58 deletions pkg/enqueue/Consumption/Extension/LoggerExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,11 @@

namespace Enqueue\Consumption\Extension;

use Enqueue\Consumption\Context\PostMessageReceived;
use Enqueue\Consumption\Context\Start;
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
use Enqueue\Consumption\Result;
use Enqueue\Consumption\StartExtensionInterface;
use Interop\Queue\Message as InteropMessage;
use Enqueue\Consumption\Context\InitLogger;
use Enqueue\Consumption\InitLoggerExtensionInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class LoggerExtension implements StartExtensionInterface, PostMessageReceivedExtensionInterface
class LoggerExtension implements InitLoggerExtensionInterface
{
/**
* @var LoggerInterface
Expand All @@ -26,59 +21,14 @@ public function __construct(LoggerInterface $logger)
$this->logger = $logger;
}

public function onStart(Start $context): void
public function onInitLogger(InitLogger $context): void
{
if ($context->getLogger() && false == $context->getLogger() instanceof NullLogger) {
$context->getLogger()->debug(sprintf(
'Skip setting context\'s logger "%s". Another one "%s" has already been set.',
get_class($this->logger),
get_class($context->getLogger())
));
} else {
$context->changeLogger($this->logger);
$this->logger->debug(sprintf('Set context\'s logger "%s"', get_class($this->logger)));
}
}

public function onPostMessageReceived(PostMessageReceived $context): void
{
if (false == $context->getResult() instanceof Result) {
return;
}

/** @var $result Result */
$result = $context->getResult();
$previousLogger = $context->getLogger();

switch ($result->getStatus()) {
case Result::REJECT:
case Result::REQUEUE:
if ($result->getReason()) {
$this->logger->error($result->getReason(), $this->messageToLogContext($context->getMessage()));
}

break;
case Result::ACK:
if ($result->getReason()) {
$this->logger->info($result->getReason(), $this->messageToLogContext($context->getMessage()));
}
if ($previousLogger !== $this->logger) {
$context->changeLogger($this->logger);

break;
default:
throw new \LogicException(sprintf('Got unexpected message result. "%s"', $result->getStatus()));
$this->logger->debug(sprintf('Change logger from "%s" to "%s"', get_class($previousLogger), get_class($this->logger)));
}
}

/**
* @param InteropMessage $message
*
* @return array
*/
private function messageToLogContext(InteropMessage $message)
{
return [
'body' => $message->getBody(),
'headers' => $message->getHeaders(),
'properties' => $message->getProperties(),
];
}
}
2 changes: 1 addition & 1 deletion pkg/enqueue/Consumption/ExtensionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

namespace Enqueue\Consumption;

interface ExtensionInterface extends StartExtensionInterface, PreSubscribeExtensionInterface, PreConsumeExtensionInterface, MessageReceivedExtensionInterface, PostMessageReceivedExtensionInterface, MessageResultExtensionInterface, ProcessorExceptionExtensionInterface, PostConsumeExtensionInterface, EndExtensionInterface
interface ExtensionInterface extends StartExtensionInterface, PreSubscribeExtensionInterface, PreConsumeExtensionInterface, MessageReceivedExtensionInterface, PostMessageReceivedExtensionInterface, MessageResultExtensionInterface, ProcessorExceptionExtensionInterface, PostConsumeExtensionInterface, EndExtensionInterface, InitLoggerExtensionInterface
{
}
14 changes: 14 additions & 0 deletions pkg/enqueue/Consumption/InitLoggerExtensionInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace Enqueue\Consumption;

use Enqueue\Consumption\Context\InitLogger;

interface InitLoggerExtensionInterface
{
/**
* Executed only once at the very beginning of the QueueConsumer::consume method call.
* BEFORE onStart extension method.
*/
public function onInitLogger(InitLogger $context): void;
}
47 changes: 24 additions & 23 deletions pkg/enqueue/Consumption/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Enqueue\Consumption;

use Enqueue\Consumption\Context\End;
use Enqueue\Consumption\Context\InitLogger;
use Enqueue\Consumption\Context\MessageReceived;
use Enqueue\Consumption\Context\MessageResult;
use Enqueue\Consumption\Context\PostConsume;
Expand Down Expand Up @@ -32,7 +33,7 @@ final class QueueConsumer implements QueueConsumerInterface
private $interopContext;

/**
* @var ExtensionInterface|ChainExtension
* @var ExtensionInterface
*/
private $staticExtension;

Expand All @@ -46,11 +47,6 @@ final class QueueConsumer implements QueueConsumerInterface
*/
private $receiveTimeout;

/**
* @var ExtensionInterface|ChainExtension
*/
private $extension;

/**
* @var LoggerInterface
*/
Expand Down Expand Up @@ -128,11 +124,16 @@ public function bindCallback($queue, callable $processor): QueueConsumerInterfac

public function consume(ExtensionInterface $runtimeExtension = null): void
{
$this->extension = $runtimeExtension ?
$extension = $runtimeExtension ?
new ChainExtension([$this->staticExtension, $runtimeExtension]) :
$this->staticExtension
;

$initLogger = new InitLogger($this->logger);
$extension->onInitLogger($initLogger);

$this->logger = $initLogger->getLogger();

$startTime = (int) (microtime(true) * 1000);

$start = new Start(
Expand All @@ -143,10 +144,10 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
$startTime
);

$this->extension->onStart($start);
$extension->onStart($start);

if ($start->isExecutionInterrupted()) {
$this->onEnd($startTime);
$this->onEnd($extension, $startTime);

return;
}
Expand Down Expand Up @@ -176,7 +177,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
$receivedMessagesCount = 0;
$interruptExecution = false;

$callback = function (InteropMessage $message, Consumer $consumer) use (&$receivedMessagesCount, &$interruptExecution) {
$callback = function (InteropMessage $message, Consumer $consumer) use (&$receivedMessagesCount, &$interruptExecution, $extension) {
++$receivedMessagesCount;

$receivedAt = (int) (microtime(true) * 1000);
Expand All @@ -188,19 +189,19 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
$processor = $this->boundProcessors[$queue->getQueueName()]->getProcessor();

$messageReceived = new MessageReceived($this->interopContext, $consumer, $message, $processor, $receivedAt, $this->logger);
$this->extension->onMessageReceived($messageReceived);
$extension->onMessageReceived($messageReceived);
$result = $messageReceived->getResult();
$processor = $messageReceived->getProcessor();
if (null === $result) {
try {
$result = $processor->process($message, $this->interopContext);
} catch (\Exception $e) {
$result = $this->onProcessorException($message, $e, $receivedAt);
$result = $this->onProcessorException($extension, $message, $e, $receivedAt);
}
}

$messageResult = new MessageResult($this->interopContext, $message, $result, $receivedAt, $this->logger);
$this->extension->onResult($messageResult);
$extension->onResult($messageResult);
$result = $messageResult->getResult();

switch ($result) {
Expand All @@ -220,7 +221,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
}

$postMessageReceived = new PostMessageReceived($this->interopContext, $message, $result, $receivedAt, $this->logger);
$this->extension->onPostMessageReceived($postMessageReceived);
$extension->onPostMessageReceived($postMessageReceived);

if ($postMessageReceived->isExecutionInterrupted()) {
$interruptExecution = true;
Expand All @@ -241,7 +242,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
$this->logger
);

$this->extension->onPreSubscribe($preSubscribe);
$extension->onPreSubscribe($preSubscribe);

$subscriptionConsumer->subscribe($consumer, $callback);
}
Expand All @@ -252,21 +253,21 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
$interruptExecution = false;

$preConsume = new PreConsume($this->interopContext, $subscriptionConsumer, $this->logger, $cycle, $this->receiveTimeout, $startTime);
$this->extension->onPreConsume($preConsume);
$extension->onPreConsume($preConsume);

if ($preConsume->isExecutionInterrupted()) {
$this->onEnd($startTime, $subscriptionConsumer);
$this->onEnd($extension, $startTime, $subscriptionConsumer);

return;
}

$subscriptionConsumer->consume($this->receiveTimeout);

$postConsume = new PostConsume($this->interopContext, $subscriptionConsumer, $receivedMessagesCount, $cycle, $startTime, $this->logger);
$this->extension->onPostConsume($postConsume);
$extension->onPostConsume($postConsume);

if ($interruptExecution || $postConsume->isExecutionInterrupted()) {
$this->onEnd($startTime, $subscriptionConsumer);
$this->onEnd($extension, $startTime, $subscriptionConsumer);

return;
}
Expand All @@ -285,11 +286,11 @@ public function setFallbackSubscriptionConsumer(SubscriptionConsumer $fallbackSu
$this->fallbackSubscriptionConsumer = $fallbackSubscriptionConsumer;
}

private function onEnd(int $startTime, SubscriptionConsumer $subscriptionConsumer = null): void
private function onEnd(ExtensionInterface $extension, int $startTime, SubscriptionConsumer $subscriptionConsumer = null): void
{
$endTime = (int) (microtime(true) * 1000);

$this->extension->onEnd(new End($this->interopContext, $startTime, $endTime, $this->logger));
$extension->onEnd(new End($this->interopContext, $startTime, $endTime, $this->logger));

if ($subscriptionConsumer) {
$subscriptionConsumer->unsubscribeAll();
Expand All @@ -301,12 +302,12 @@ private function onEnd(int $startTime, SubscriptionConsumer $subscriptionConsume
*
* https://github.com/symfony/symfony/blob/cbe289517470eeea27162fd2d523eb29c95f775f/src/Symfony/Component/HttpKernel/EventListener/ExceptionListener.php#L77
*/
private function onProcessorException(Message $message, \Exception $exception, int $receivedAt)
private function onProcessorException(ExtensionInterface $extension, Message $message, \Exception $exception, int $receivedAt)
{
$processorException = new ProcessorException($this->interopContext, $message, $exception, $receivedAt, $this->logger);

try {
$this->extension->onProcessorException($processorException);
$extension->onProcessorException($processorException);

$result = $processorException->getResult();
if (null === $result) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/enqueue/Consumption/QueueConsumerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function bindCallback($queue, callable $processor): self;
* Runtime extension - is an extension or a collection of extensions which could be set on runtime.
* Here's a good example: @see LimitsExtensionsCommandTrait.
*
* @param ExtensionInterface|ChainExtension|null $runtimeExtension
* @param ExtensionInterface|null $runtimeExtension
*
* @throws \Exception
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@

trait LimitsExtensionsCommandTrait
{
/**
* {@inheritdoc}
*/
protected function configureLimitsExtensions()
{
$this
Expand Down
Loading