Skip to content

Commit

Permalink
Merge pull request #555 from php-enqueue/loging-improvements
Browse files Browse the repository at this point in the history
[consumption] Logging improvements
  • Loading branch information
makasim authored Oct 9, 2018
2 parents 50237e6 + 3043106 commit 474e855
Show file tree
Hide file tree
Showing 27 changed files with 1,214 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ class FlushSpoolProducerExtension implements PostMessageReceivedExtensionInterfa
*/
private $producer;

/**
* @param SpoolProducer $producer
*/
public function __construct(SpoolProducer $producer)
{
$this->producer = $producer;
Expand Down
69 changes: 69 additions & 0 deletions pkg/enqueue/Client/ConsumptionExtension/LogExtension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

namespace Enqueue\Client\ConsumptionExtension;

use Enqueue\Client\Config;
use Enqueue\Consumption\Context\PostMessageReceived;
use Enqueue\Consumption\Result;
use Enqueue\Util\Stringify;
use Psr\Log\LogLevel;

class LogExtension extends \Enqueue\Consumption\Extension\LogExtension
{
public function onPostMessageReceived(PostMessageReceived $context): void
{
$result = $context->getResult();
$message = $context->getMessage();

$logLevel = Result::REJECT == ((string) $result) ? LogLevel::ERROR : LogLevel::INFO;

if ($command = $message->getProperty(Config::COMMAND)) {
$reason = '';
$logMessage = "[client] Processed {command}\t{body}\t{result}";
if ($result instanceof Result && $result->getReason()) {
$reason = $result->getReason();

$logMessage .= ' {reason}';
}

$context->getLogger()->log($logLevel, $logMessage, [
'result' => str_replace('enqueue.', '', $result),
'reason' => $reason,
'command' => $command,
'queueName' => $context->getConsumer()->getQueue()->getQueueName(),
'body' => Stringify::that($message->getBody()),
'properties' => Stringify::that($message->getProperties()),
'headers' => Stringify::that($message->getHeaders()),
]);

return;
}

$topic = $message->getProperty(Config::TOPIC);
$processor = $message->getProperty(Config::PROCESSOR);
if ($topic && $processor) {
$reason = '';
$logMessage = "[client] Processed {topic} -> {processor}\t{body}\t{result}";
if ($result instanceof Result && $result->getReason()) {
$reason = $result->getReason();

$logMessage .= ' {reason}';
}

$context->getLogger()->log($logLevel, $logMessage, [
'result' => str_replace('enqueue.', '', $result),
'reason' => $reason,
'topic' => $topic,
'processor' => $processor,
'queueName' => $context->getConsumer()->getQueue()->getQueueName(),
'body' => Stringify::that($message->getBody()),
'properties' => Stringify::that($message->getProperties()),
'headers' => Stringify::that($message->getHeaders()),
]);

return;
}

parent::onPostMessageReceived($context);
}
}
20 changes: 17 additions & 3 deletions pkg/enqueue/Consumption/ChainExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Enqueue\Consumption;

use Enqueue\Consumption\Context\End;
use Enqueue\Consumption\Context\InitLogger;
use Enqueue\Consumption\Context\MessageReceived;
use Enqueue\Consumption\Context\MessageResult;
use Enqueue\Consumption\Context\PostConsume;
Expand All @@ -15,6 +16,7 @@
class ChainExtension implements ExtensionInterface
{
private $startExtensions;
private $initLoggerExtensions;
private $preSubscribeExtensions;
private $preConsumeExtensions;
private $messageReceivedExtensions;
Expand All @@ -24,12 +26,10 @@ class ChainExtension implements ExtensionInterface
private $postConsumeExtensions;
private $endExtensions;

/**
* @param ExtensionInterface[] $extensions
*/
public function __construct(array $extensions)
{
$this->startExtensions = [];
$this->initLoggerExtensions = [];
$this->preSubscribeExtensions = [];
$this->preConsumeExtensions = [];
$this->messageReceivedExtensions = [];
Expand All @@ -42,6 +42,7 @@ public function __construct(array $extensions)
array_walk($extensions, function ($extension) {
if ($extension instanceof ExtensionInterface) {
$this->startExtensions[] = $extension;
$this->initLoggerExtensions[] = $extension;
$this->preSubscribeExtensions[] = $extension;
$this->preConsumeExtensions[] = $extension;
$this->messageReceivedExtensions[] = $extension;
Expand All @@ -61,6 +62,12 @@ public function __construct(array $extensions)
$extensionValid = true;
}

if ($extension instanceof InitLoggerExtensionInterface) {
$this->initLoggerExtensions[] = $extension;

$extensionValid = true;
}

if ($extension instanceof PreSubscribeExtensionInterface) {
$this->preSubscribeExtensions[] = $extension;

Expand Down Expand Up @@ -115,6 +122,13 @@ public function __construct(array $extensions)
});
}

public function onInitLogger(InitLogger $context): void
{
foreach ($this->initLoggerExtensions as $extension) {
$extension->onInitLogger($context);
}
}

public function onStart(Start $context): void
{
foreach ($this->startExtensions as $extension) {
Expand Down
28 changes: 28 additions & 0 deletions pkg/enqueue/Consumption/Context/InitLogger.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

namespace Enqueue\Consumption\Context;

use Psr\Log\LoggerInterface;

class InitLogger
{
/**
* @var LoggerInterface
*/
private $logger;

public function __construct(LoggerInterface $logger)
{
$this->logger = $logger;
}

public function getLogger(): LoggerInterface
{
return $this->logger;
}

public function changeLogger(LoggerInterface $logger): void
{
$this->logger = $logger;
}
}
13 changes: 13 additions & 0 deletions pkg/enqueue/Consumption/Context/PostMessageReceived.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Enqueue\Consumption\Context;

use Enqueue\Consumption\Result;
use Interop\Queue\Consumer;
use Interop\Queue\Context;
use Interop\Queue\Message;
use Psr\Log\LoggerInterface;
Expand All @@ -14,6 +15,11 @@ final class PostMessageReceived
*/
private $context;

/**
* @var Consumer
*/
private $consumer;

/**
* @var Message
*/
Expand Down Expand Up @@ -41,12 +47,14 @@ final class PostMessageReceived

public function __construct(
Context $context,
Consumer $consumer,
Message $message,
$result,
int $receivedAt,
LoggerInterface $logger
) {
$this->context = $context;
$this->consumer = $consumer;
$this->message = $message;
$this->result = $result;
$this->receivedAt = $receivedAt;
Expand All @@ -60,6 +68,11 @@ public function getContext(): Context
return $this->context;
}

public function getConsumer(): Consumer
{
return $this->consumer;
}

public function getMessage(): Message
{
return $this->message;
Expand Down
5 changes: 0 additions & 5 deletions pkg/enqueue/Consumption/Context/Start.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ public function getLogger(): LoggerInterface
return $this->logger;
}

public function changeLogger(LoggerInterface $logger): void
{
$this->logger = $logger;
}

/**
* In milliseconds.
*/
Expand Down
67 changes: 67 additions & 0 deletions pkg/enqueue/Consumption/Extension/LogExtension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php

namespace Enqueue\Consumption\Extension;

use Enqueue\Consumption\Context\End;
use Enqueue\Consumption\Context\MessageReceived;
use Enqueue\Consumption\Context\PostMessageReceived;
use Enqueue\Consumption\Context\Start;
use Enqueue\Consumption\EndExtensionInterface;
use Enqueue\Consumption\MessageReceivedExtensionInterface;
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
use Enqueue\Consumption\Result;
use Enqueue\Consumption\StartExtensionInterface;
use Enqueue\Util\Stringify;
use Psr\Log\LogLevel;

class LogExtension implements StartExtensionInterface, MessageReceivedExtensionInterface, PostMessageReceivedExtensionInterface, EndExtensionInterface
{
public function onStart(Start $context): void
{
$context->getLogger()->debug('Consumption has started');
}

public function onEnd(End $context): void
{
$context->getLogger()->debug('Consumption has ended');
}

public function onMessageReceived(MessageReceived $context): void
{
$message = $context->getMessage();

$context->getLogger()->debug("Received from {queueName}\t{body}", [
'queueName' => $context->getConsumer()->getQueue()->getQueueName(),
'redelivered' => $message->isRedelivered(),
'body' => Stringify::that($message->getBody()),
'properties' => Stringify::that($message->getProperties()),
'headers' => Stringify::that($message->getHeaders()),
]);
}

public function onPostMessageReceived(PostMessageReceived $context): void
{
$message = $context->getMessage();
$queue = $context->getConsumer()->getQueue();
$result = $context->getResult();

$reason = '';
$logMessage = "Processed from {queueName}\t{body}\t{result}";
if ($result instanceof Result && $result->getReason()) {
$reason = $result->getReason();
$logMessage .= ' {reason}';
}
$logContext = [
'result' => str_replace('enqueue.', '', $result),
'reason' => $reason,
'queueName' => $queue->getQueueName(),
'body' => Stringify::that($message->getBody()),
'properties' => Stringify::that($message->getProperties()),
'headers' => Stringify::that($message->getHeaders()),
];

$logLevel = Result::REJECT == ((string) $result) ? LogLevel::ERROR : LogLevel::INFO;

$context->getLogger()->log($logLevel, $logMessage, $logContext);
}
}
66 changes: 8 additions & 58 deletions pkg/enqueue/Consumption/Extension/LoggerExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,11 @@

namespace Enqueue\Consumption\Extension;

use Enqueue\Consumption\Context\PostMessageReceived;
use Enqueue\Consumption\Context\Start;
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
use Enqueue\Consumption\Result;
use Enqueue\Consumption\StartExtensionInterface;
use Interop\Queue\Message as InteropMessage;
use Enqueue\Consumption\Context\InitLogger;
use Enqueue\Consumption\InitLoggerExtensionInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class LoggerExtension implements StartExtensionInterface, PostMessageReceivedExtensionInterface
class LoggerExtension implements InitLoggerExtensionInterface
{
/**
* @var LoggerInterface
Expand All @@ -26,59 +21,14 @@ public function __construct(LoggerInterface $logger)
$this->logger = $logger;
}

public function onStart(Start $context): void
public function onInitLogger(InitLogger $context): void
{
if ($context->getLogger() && false == $context->getLogger() instanceof NullLogger) {
$context->getLogger()->debug(sprintf(
'Skip setting context\'s logger "%s". Another one "%s" has already been set.',
get_class($this->logger),
get_class($context->getLogger())
));
} else {
$context->changeLogger($this->logger);
$this->logger->debug(sprintf('Set context\'s logger "%s"', get_class($this->logger)));
}
}

public function onPostMessageReceived(PostMessageReceived $context): void
{
if (false == $context->getResult() instanceof Result) {
return;
}

/** @var $result Result */
$result = $context->getResult();
$previousLogger = $context->getLogger();

switch ($result->getStatus()) {
case Result::REJECT:
case Result::REQUEUE:
if ($result->getReason()) {
$this->logger->error($result->getReason(), $this->messageToLogContext($context->getMessage()));
}

break;
case Result::ACK:
if ($result->getReason()) {
$this->logger->info($result->getReason(), $this->messageToLogContext($context->getMessage()));
}
if ($previousLogger !== $this->logger) {
$context->changeLogger($this->logger);

break;
default:
throw new \LogicException(sprintf('Got unexpected message result. "%s"', $result->getStatus()));
$this->logger->debug(sprintf('Change logger from "%s" to "%s"', get_class($previousLogger), get_class($this->logger)));
}
}

/**
* @param InteropMessage $message
*
* @return array
*/
private function messageToLogContext(InteropMessage $message)
{
return [
'body' => $message->getBody(),
'headers' => $message->getHeaders(),
'properties' => $message->getProperties(),
];
}
}
2 changes: 1 addition & 1 deletion pkg/enqueue/Consumption/ExtensionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

namespace Enqueue\Consumption;

interface ExtensionInterface extends StartExtensionInterface, PreSubscribeExtensionInterface, PreConsumeExtensionInterface, MessageReceivedExtensionInterface, PostMessageReceivedExtensionInterface, MessageResultExtensionInterface, ProcessorExceptionExtensionInterface, PostConsumeExtensionInterface, EndExtensionInterface
interface ExtensionInterface extends StartExtensionInterface, PreSubscribeExtensionInterface, PreConsumeExtensionInterface, MessageReceivedExtensionInterface, PostMessageReceivedExtensionInterface, MessageResultExtensionInterface, ProcessorExceptionExtensionInterface, PostConsumeExtensionInterface, EndExtensionInterface, InitLoggerExtensionInterface
{
}
Loading

0 comments on commit 474e855

Please sign in to comment.