diff --git a/.gitignore b/.gitignore index ff935b4..899e37b 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,5 @@ /phpstan.neon /phpunit.xml -/.idea \ No newline at end of file +/.idea +/.castor.stub.php diff --git a/.php-cs-fixer.dist.php b/.php-cs-fixer.dist.php index 930007e..d436ac7 100644 --- a/.php-cs-fixer.dist.php +++ b/.php-cs-fixer.dist.php @@ -2,7 +2,7 @@ declare(strict_types=1); -$finder = PhpCsFixer\Finder::create()->in(__DIR__); +$finder = PhpCsFixer\Finder::create()->in(__DIR__ . '/src'); return (new PhpCsFixer\Config()) ->setRiskyAllowed(true) diff --git a/README.md b/README.md index dd11438..2087a2e 100644 --- a/README.md +++ b/README.md @@ -16,15 +16,7 @@ A demo project on how to use the IPC-Shared-Memory package ## Installation -```shell -composer require jeckel/ipc-shared-memory-demo -``` - -## Running tests - -```shell -composer test -``` +> This project requires [castor](https://github.com/jolicode/castor) to run. ## License diff --git a/castor-commands/console.php b/castor-commands/console.php new file mode 100644 index 0000000..3152bd6 --- /dev/null +++ b/castor-commands/console.php @@ -0,0 +1,27 @@ + + * Created at: 30/10/2023 + */ + +declare(strict_types=1); + +namespace php; + +require_once __DIR__ . '/../vendor/autoload.php'; + +use Castor\Attribute\AsTask; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Message\AMQPMessage; + +use function Castor\run; + +#[AsTask(description: 'Execute console command')] +function console( + string $consoleCommand +): void { + run( + 'docker-compose -f docker-compose.yml exec demo php -d max_execution_time=120 console.php ' . $consoleCommand + ); +} diff --git a/castor.php b/castor.php new file mode 100644 index 0000000..55e5ed6 --- /dev/null +++ b/castor.php @@ -0,0 +1,5 @@ + + * Created at: 30/10/2023 + */ + +declare(strict_types=1); + +require __DIR__ . '/vendor/autoload.php'; + +use JeckelLab\IpcSharedMemoryDemo\Console\LoadMessages; +use Symfony\Component\Console\Application; + +$application = new Application(); + +$application->add(new LoadMessages()); + +$application->run(); diff --git a/docker-compose.yml b/docker-compose.yml index 90fbc5d..61005b3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,12 @@ services: volumes: - .:/project working_dir: /project + environment: + - RABBITMQ_USER=guest + - RABBITMQ_PASS=guest + - RABBITMQ_HOST=rabbitmq + - RABBITMQ_PORT=5672 + - RABBITMQ_EXCHANGE=demo.X.incoming ports: - "9001:9001" # Supervisor admin panel diff --git a/grumphp.yml b/grumphp.yml index 895c792..27ad682 100644 --- a/grumphp.yml +++ b/grumphp.yml @@ -5,21 +5,21 @@ grumphp: - composer_require_checker - phpmd - phpstan - - phpcsfixer +# - phpcsfixer tasks: - phpcsfixer: - allow_risky: ~ - cache_file: ~ - config: ~ - rules: - line_ending: true - array_syntax: - syntax: short - using_cache: ~ - config_contains_finder: false - verbose: true - diff: false - triggered_by: ['php'] +# phpcsfixer: +# allow_risky: ~ +# cache_file: ~ +# config: ~ +# rules: +# line_ending: true +# array_syntax: +# syntax: short +# using_cache: ~ +# config_contains_finder: false +# verbose: true +# diff: false +# triggered_by: ['php'] phpmd: whitelist_patterns: - /^src\/(.*)/ diff --git a/phpstan.neon.dist b/phpstan.neon.dist index ce95b91..1531801 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -2,6 +2,9 @@ parameters: level: max paths: - src/ + excludePaths: + - castor.php + - castor-commands/* includes: - vendor/phpstan/phpstan/conf/bleedingEdge.neon diff --git a/pub/index.php b/pub/index.php new file mode 100644 index 0000000..17622ef --- /dev/null +++ b/pub/index.php @@ -0,0 +1,12 @@ + + * Created at: 29/10/2023 + */ + +declare(strict_types=1); + +require_once __DIR__ . '/../vendor/autoload.php'; + +echo "Hello World!\n"; diff --git a/pub/load-messages.php b/pub/load-messages.php deleted file mode 100644 index 4b482f5..0000000 --- a/pub/load-messages.php +++ /dev/null @@ -1,21 +0,0 @@ - - * Created at: 29/10/2023 - */ - -declare(strict_types=1); - -require_once __DIR__ . '/../vendor/autoload.php'; - -use PhpAmqpLib\Connection\AMQPStreamConnection; -use PhpAmqpLib\Message\AMQPMessage; - -$connection = new AMQPStreamConnection('rabbitmq', 5672, 'guest', 'guest'); -$channel = $connection->channel(); - -$msg = new AMQPMessage('Hello World!'); -$channel->basic_publish($msg, 'demo.X.incoming', 'type1'); - -printf(" [x] Sent %s\n", $msg->getBody()); diff --git a/src/Console/LoadMessages.php b/src/Console/LoadMessages.php new file mode 100644 index 0000000..1810513 --- /dev/null +++ b/src/Console/LoadMessages.php @@ -0,0 +1,101 @@ + + * Created at: 30/10/2023 + */ + +declare(strict_types=1); + +namespace JeckelLab\IpcSharedMemoryDemo\Console; + +use Exception; +use JeckelLab\IpcSharedMemoryDemo\Message\Message; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Exception\AMQPConnectionBlockedException; +use Symfony\Component\Console\Attribute\AsCommand; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Output\OutputInterface; + +#[AsCommand(name: 'demo:load-messages')] +class LoadMessages extends Command +{ + private ?AMQPStreamConnection $connection = null; + + /** + * @SuppressWarnings(PHPMD.UnusedFormalParameter) + * @throws Exception + */ + protected function execute(InputInterface $input, OutputInterface $output): int + { + $channel = $this->getChannel(); + + $exchange = (string) getenv('RABBITMQ_EXCHANGE'); + + $batch = 100; + $max = 1000 * 1000; + for ($i = 0; $i < $max; $i++) { + $message = $this->getMessage(); + $channel->batch_basic_publish($message, $exchange, $message->getRoutingKey()); + + if ($i % $batch === 0) { + try { + $channel->publish_batch(); + } catch (AMQPConnectionBlockedException) { + do { + sleep(10); + } while ($this->getConnection()->isBlocked()); + $channel->publish_batch(); + } + $output->writeln(sprintf('Published %d messages', $i)); + } + } + + $channel->publish_batch(); + $output->writeln(sprintf('Published %d messages', $max)); + return Command::SUCCESS; + } + + /** + * @return AMQPChannel + * @throws Exception + */ + protected function getChannel(): AMQPChannel + { + return ($this->getConnection())->channel(); + } + + /** + * @return Message + * @throws Exception + */ + protected function getMessage(): Message + { + $shard = random_int(0, 9); + return (new Message( + sprintf('Hello world, dummy message for shard %s', $shard), + [ + 'content_type' => 'text/plain' + ] + ))->setRoutingKey(sprintf('shard-%d', $shard)); + } + + /** + * @return AMQPStreamConnection + * @throws Exception + */ + protected function getConnection(): AMQPStreamConnection + { + if (null === $this->connection) { + $this->connection = new AMQPStreamConnection( + (string) getenv('RABBITMQ_HOST'), + (int) getenv('RABBITMQ_PORT'), + (string) getenv('RABBITMQ_USER'), + (string) getenv('RABBITMQ_PASS') + ); + } + return $this->connection; + } +} diff --git a/src/Message/Message.php b/src/Message/Message.php new file mode 100644 index 0000000..5aecf24 --- /dev/null +++ b/src/Message/Message.php @@ -0,0 +1,28 @@ + + * Created at: 30/10/2023 + */ + +declare(strict_types=1); + +namespace JeckelLab\IpcSharedMemoryDemo\Message; + +use PhpAmqpLib\Message\AMQPMessage; + +class Message extends AMQPMessage +{ + private string $routingKey = ''; + + public function getRoutingKey(): string + { + return $this->routingKey; + } + + public function setRoutingKey(string $routingKey): self + { + $this->routingKey = $routingKey; + return $this; + } +} diff --git a/src/Placeholder.php b/src/Placeholder.php deleted file mode 100644 index 3ffee5b..0000000 --- a/src/Placeholder.php +++ /dev/null @@ -1,25 +0,0 @@ - - * Created at: 2023-10-28 - */ - -declare(strict_types=1); - -namespace JeckelLab\IpcSharedMemoryDemo; - -final class Placeholder -{ - private string $prefix; - - public function __construct(string $prefix) - { - $this->prefix = $prefix; - } - - public function echo(string $value): string - { - return $this->prefix . $value; - } -}