Skip to content

solcloud/consumer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

17 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Consumer

Consumer needs AMQPChannel channel as only dependency.

Channel setup

If you have channel instance in your project already than you can skip this, otherwise lets setup rabbitmq connection, we recommend to use container for this.

$config = new \Solcloud\Consumer\QueueConfig();
$config
    ->setHost('solcloud_rabbitmq')
    ->setVhost('/')
    #->setHeartbeatSec(5)
    ->setUsername('dev')
    ->setPassword('dev')
;
$connectionFactory = new \Solcloud\Consumer\QueueConnectionFactory($config);
$connection = $connectionFactory->createSocketConnection();
#(new \PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender($connection))->register(); // if heartbeat and pcntl_async_signals() is available

Create channel from connection or use your own channel

/** @var \PhpAmqpLib\Channel\AMQPChannel $channel */
$channel = $connection->channel();

Worker

Create worker (consumer) class for your business logic and inject $channel dependency. You can extend AbstractConsumer for lightweight abstraction or use "solcloud standard" BaseConsumer. We will use BaseConsumer in this example

$worker = new class($channel) extends \Solcloud\Consumer\BaseConsumer {

    protected function run(): void
    {
        // Your hard work here
        echo "Processing message: " . $this->data->id . PHP_EOL;
    }

};

Start consuming message from queue using blocking method wait

$worker->consume($consumeQueueName);
while ($worker->hasCallback()) {
    try {
        // While we have callback lets enter event loop with some timeout
        $worker->wait(rand(8, 11));
    } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $ex) {
        echo $ex->getMessage() . PHP_EOL;
    }
}
$worker->closeChannel();

Message publishing

For message publish you can use $worker directly or use rabbitmq management plugin or different scripts

$worker->publishMessage(
    $worker->createMessageHelper([], ["id" => 1]),
    '',
    $consumeQueueName
); // OR open rabbitmq management and publish: {"meta":[],"data":{"id":1}}

Logging

Worker can log to Psr\Log\LoggerInterface compatible logger.

$worker->setLogger(new YourPsrLogger());
$worker->getLogger()->info('Something');

Examples

For complete example for this readme see example.php

About

RabbitMQ base worker for queue consuming

Topics

Resources

Stars

Watchers

Forks

Languages