Skip to content

Commit

Permalink
Merge pull request #3 from Jeckel-Lab/feature/send-millions-messages
Browse files Browse the repository at this point in the history
Send millions of messages in RabbitMQ
  • Loading branch information
jeckel authored Oct 30, 2023
2 parents f5395c6 + c0bedae commit 0a45d1c
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 72 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
/phpstan.neon
/phpunit.xml

/.idea
/.idea
/.castor.stub.php
2 changes: 1 addition & 1 deletion .php-cs-fixer.dist.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

declare(strict_types=1);

$finder = PhpCsFixer\Finder::create()->in(__DIR__);
$finder = PhpCsFixer\Finder::create()->in(__DIR__ . '/src');

return (new PhpCsFixer\Config())
->setRiskyAllowed(true)
Expand Down
10 changes: 1 addition & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,7 @@ A demo project on how to use the IPC-Shared-Memory package
## Installation

```shell
composer require jeckel/ipc-shared-memory-demo
```

## Running tests

```shell
composer test
```
> This project requires [castor](https://github.com/jolicode/castor) to run.
## License

Expand Down
27 changes: 27 additions & 0 deletions castor-commands/console.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

/**
* @author: Julien Mercier-Rojas <julien@jeckel-lab.fr>
* Created at: 30/10/2023
*/

declare(strict_types=1);

namespace php;

require_once __DIR__ . '/../vendor/autoload.php';

use Castor\Attribute\AsTask;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

use function Castor\run;

#[AsTask(description: 'Execute console command')]
function console(
string $consoleCommand
): void {
run(
'docker-compose -f docker-compose.yml exec demo php -d max_execution_time=120 console.php ' . $consoleCommand
);
}
5 changes: 5 additions & 0 deletions castor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<?php

use function Castor\import;

import(__DIR__ . '/castor-commands');
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
],
"require": {
"php": "~8.2",
"php-amqplib/php-amqplib": "^3.6"
"php-amqplib/php-amqplib": "^3.6",
"symfony/console": "^6.3"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^3.34",
Expand Down
20 changes: 20 additions & 0 deletions console.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env php
<?php

/**
* @author: Julien Mercier-Rojas <julien@jeckel-lab.fr>
* Created at: 30/10/2023
*/

declare(strict_types=1);

require __DIR__ . '/vendor/autoload.php';

use JeckelLab\IpcSharedMemoryDemo\Console\LoadMessages;
use Symfony\Component\Console\Application;

$application = new Application();

$application->add(new LoadMessages());

$application->run();
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ services:
volumes:
- .:/project
working_dir: /project
environment:
- RABBITMQ_USER=guest
- RABBITMQ_PASS=guest
- RABBITMQ_HOST=rabbitmq
- RABBITMQ_PORT=5672
- RABBITMQ_EXCHANGE=demo.X.incoming
ports:
- "9001:9001" # Supervisor admin panel

Expand Down
28 changes: 14 additions & 14 deletions grumphp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ grumphp:
- composer_require_checker
- phpmd
- phpstan
- phpcsfixer
# - phpcsfixer
tasks:
phpcsfixer:
allow_risky: ~
cache_file: ~
config: ~
rules:
line_ending: true
array_syntax:
syntax: short
using_cache: ~
config_contains_finder: false
verbose: true
diff: false
triggered_by: ['php']
# phpcsfixer:
# allow_risky: ~
# cache_file: ~
# config: ~
# rules:
# line_ending: true
# array_syntax:
# syntax: short
# using_cache: ~
# config_contains_finder: false
# verbose: true
# diff: false
# triggered_by: ['php']
phpmd:
whitelist_patterns:
- /^src\/(.*)/
Expand Down
3 changes: 3 additions & 0 deletions phpstan.neon.dist
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ parameters:
level: max
paths:
- src/
excludePaths:
- castor.php
- castor-commands/*

includes:
- vendor/phpstan/phpstan/conf/bleedingEdge.neon
12 changes: 12 additions & 0 deletions pub/index.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

/**
* @author: Julien Mercier-Rojas <julien@jeckel-lab.fr>
* Created at: 29/10/2023
*/

declare(strict_types=1);

require_once __DIR__ . '/../vendor/autoload.php';

echo "Hello World!\n";
21 changes: 0 additions & 21 deletions pub/load-messages.php

This file was deleted.

101 changes: 101 additions & 0 deletions src/Console/LoadMessages.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?php

/**
* @author: Julien Mercier-Rojas <julien@jeckel-lab.fr>
* Created at: 30/10/2023
*/

declare(strict_types=1);

namespace JeckelLab\IpcSharedMemoryDemo\Console;

use Exception;
use JeckelLab\IpcSharedMemoryDemo\Message\Message;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(name: 'demo:load-messages')]
class LoadMessages extends Command
{
private ?AMQPStreamConnection $connection = null;

/**
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
* @throws Exception
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$channel = $this->getChannel();

$exchange = (string) getenv('RABBITMQ_EXCHANGE');

$batch = 100;
$max = 1000 * 1000;
for ($i = 0; $i < $max; $i++) {
$message = $this->getMessage();
$channel->batch_basic_publish($message, $exchange, $message->getRoutingKey());

if ($i % $batch === 0) {
try {
$channel->publish_batch();
} catch (AMQPConnectionBlockedException) {
do {
sleep(10);
} while ($this->getConnection()->isBlocked());
$channel->publish_batch();
}
$output->writeln(sprintf('Published %d messages', $i));
}
}

$channel->publish_batch();
$output->writeln(sprintf('Published %d messages', $max));
return Command::SUCCESS;
}

/**
* @return AMQPChannel
* @throws Exception
*/
protected function getChannel(): AMQPChannel
{
return ($this->getConnection())->channel();
}

/**
* @return Message
* @throws Exception
*/
protected function getMessage(): Message
{
$shard = random_int(0, 9);
return (new Message(
sprintf('Hello world, dummy message for shard %s', $shard),
[
'content_type' => 'text/plain'
]
))->setRoutingKey(sprintf('shard-%d', $shard));
}

/**
* @return AMQPStreamConnection
* @throws Exception
*/
protected function getConnection(): AMQPStreamConnection
{
if (null === $this->connection) {
$this->connection = new AMQPStreamConnection(
(string) getenv('RABBITMQ_HOST'),
(int) getenv('RABBITMQ_PORT'),
(string) getenv('RABBITMQ_USER'),
(string) getenv('RABBITMQ_PASS')
);
}
return $this->connection;
}
}
28 changes: 28 additions & 0 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

/**
* @author: Julien Mercier-Rojas <julien@jeckel-lab.fr>
* Created at: 30/10/2023
*/

declare(strict_types=1);

namespace JeckelLab\IpcSharedMemoryDemo\Message;

use PhpAmqpLib\Message\AMQPMessage;

class Message extends AMQPMessage
{
private string $routingKey = '';

public function getRoutingKey(): string
{
return $this->routingKey;
}

public function setRoutingKey(string $routingKey): self
{
$this->routingKey = $routingKey;
return $this;
}
}
25 changes: 0 additions & 25 deletions src/Placeholder.php

This file was deleted.

0 comments on commit 0a45d1c

Please sign in to comment.