Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send millions of messages in RabbitMQ #3

Merged
merged 3 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
/phpstan.neon
/phpunit.xml

/.idea
/.idea
/.castor.stub.php
2 changes: 1 addition & 1 deletion .php-cs-fixer.dist.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

declare(strict_types=1);

$finder = PhpCsFixer\Finder::create()->in(__DIR__);
$finder = PhpCsFixer\Finder::create()->in(__DIR__ . '/src');

return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
Expand Down
10 changes: 1 addition & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,7 @@ A demo project on how to use the IPC-Shared-Memory package

## Installation

```shell
composer require jeckel/ipc-shared-memory-demo
```

## Running tests

```shell
composer test
```
> This project requires [castor](https://github.com/jolicode/castor) to run.

## License

Expand Down
27 changes: 27 additions & 0 deletions castor-commands/console.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

/**
* @author: Julien Mercier-Rojas <julien@jeckel-lab.fr>
* Created at: 30/10/2023
*/

declare(strict_types=1);

namespace php;

require_once __DIR__ . '/../vendor/autoload.php';

use Castor\Attribute\AsTask;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

use function Castor\run;

#[AsTask(description: 'Execute console command')]
function console(
string $consoleCommand
): void {
run(
'docker-compose -f docker-compose.yml exec demo php -d max_execution_time=120 console.php ' . $consoleCommand
);
}
5 changes: 5 additions & 0 deletions castor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<?php

use function Castor\import;

import(__DIR__ . '/castor-commands');
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
],
"require": {
"php": "~8.2",
"php-amqplib/php-amqplib": "^3.6"
"php-amqplib/php-amqplib": "^3.6",
"symfony/console": "^6.3"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^3.34",
Expand Down
20 changes: 20 additions & 0 deletions console.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env php
<?php

/**
* @author: Julien Mercier-Rojas <julien@jeckel-lab.fr>
* Created at: 30/10/2023
*/

declare(strict_types=1);

require __DIR__ . '/vendor/autoload.php';

use JeckelLab\IpcSharedMemoryDemo\Console\LoadMessages;
use Symfony\Component\Console\Application;

$application = new Application();

$application->add(new LoadMessages());

$application->run();
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ services:
volumes:
- .:/project
working_dir: /project
environment:
- RABBITMQ_USER=guest
- RABBITMQ_PASS=guest
- RABBITMQ_HOST=rabbitmq
- RABBITMQ_PORT=5672
- RABBITMQ_EXCHANGE=demo.X.incoming
ports:
- "9001:9001" # Supervisor admin panel

Expand Down
28 changes: 14 additions & 14 deletions grumphp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ grumphp:
- composer_require_checker
- phpmd
- phpstan
- phpcsfixer
# - phpcsfixer
tasks:
phpcsfixer:
allow_risky: ~
cache_file: ~
config: ~
rules:
line_ending: true
array_syntax:
syntax: short
using_cache: ~
config_contains_finder: false
verbose: true
diff: false
triggered_by: ['php']
# phpcsfixer:
# allow_risky: ~
# cache_file: ~
# config: ~
# rules:
# line_ending: true
# array_syntax:
# syntax: short
# using_cache: ~
# config_contains_finder: false
# verbose: true
# diff: false
# triggered_by: ['php']
phpmd:
whitelist_patterns:
- /^src\/(.*)/
Expand Down
3 changes: 3 additions & 0 deletions phpstan.neon.dist
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ parameters:
level: max
paths:
- src/
excludePaths:
- castor.php
- castor-commands/*

includes:
- vendor/phpstan/phpstan/conf/bleedingEdge.neon
12 changes: 12 additions & 0 deletions pub/index.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

/**
* @author: Julien Mercier-Rojas <julien@jeckel-lab.fr>
* Created at: 29/10/2023
*/

declare(strict_types=1);

require_once __DIR__ . '/../vendor/autoload.php';

echo "Hello World!\n";
21 changes: 0 additions & 21 deletions pub/load-messages.php

This file was deleted.

101 changes: 101 additions & 0 deletions src/Console/LoadMessages.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?php

/**
* @author: Julien Mercier-Rojas <julien@jeckel-lab.fr>
* Created at: 30/10/2023
*/

declare(strict_types=1);

namespace JeckelLab\IpcSharedMemoryDemo\Console;

use Exception;
use JeckelLab\IpcSharedMemoryDemo\Message\Message;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(name: 'demo:load-messages')]
class LoadMessages extends Command
{
private ?AMQPStreamConnection $connection = null;

/**
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
* @throws Exception
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$channel = $this->getChannel();

$exchange = (string) getenv('RABBITMQ_EXCHANGE');

$batch = 100;
$max = 1000 * 1000;
for ($i = 0; $i < $max; $i++) {
$message = $this->getMessage();
$channel->batch_basic_publish($message, $exchange, $message->getRoutingKey());

if ($i % $batch === 0) {
try {
$channel->publish_batch();
} catch (AMQPConnectionBlockedException) {
do {
sleep(10);
} while ($this->getConnection()->isBlocked());
$channel->publish_batch();
}
$output->writeln(sprintf('Published %d messages', $i));
}
}

$channel->publish_batch();
$output->writeln(sprintf('Published %d messages', $max));
return Command::SUCCESS;
}

/**
* @return AMQPChannel
* @throws Exception
*/
protected function getChannel(): AMQPChannel
{
return ($this->getConnection())->channel();
}

/**
* @return Message
* @throws Exception
*/
protected function getMessage(): Message
{
$shard = random_int(0, 9);
return (new Message(
sprintf('Hello world, dummy message for shard %s', $shard),
[
'content_type' => 'text/plain'
]
))->setRoutingKey(sprintf('shard-%d', $shard));
}

/**
* @return AMQPStreamConnection
* @throws Exception
*/
protected function getConnection(): AMQPStreamConnection
{
if (null === $this->connection) {
$this->connection = new AMQPStreamConnection(
(string) getenv('RABBITMQ_HOST'),
(int) getenv('RABBITMQ_PORT'),
(string) getenv('RABBITMQ_USER'),
(string) getenv('RABBITMQ_PASS')
);
}
return $this->connection;
}
}
28 changes: 28 additions & 0 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

/**
* @author: Julien Mercier-Rojas <julien@jeckel-lab.fr>
* Created at: 30/10/2023
*/

declare(strict_types=1);

namespace JeckelLab\IpcSharedMemoryDemo\Message;

use PhpAmqpLib\Message\AMQPMessage;

class Message extends AMQPMessage
{
private string $routingKey = '';

public function getRoutingKey(): string
{
return $this->routingKey;
}

public function setRoutingKey(string $routingKey): self
{
$this->routingKey = $routingKey;
return $this;
}
}
25 changes: 0 additions & 25 deletions src/Placeholder.php

This file was deleted.