- Use version
1.0-dev
for last updated. Alias ofdev-master
. - Use last stable version tag to stay in a stable release.
Installing Redis
wget http://download.redis.io/redis-stable.tar.gz
tar xvzf redis-stable.tar.gz
cd redis-stable
make
Installing PHPRedis
phpredis extension is necessary to be installed in your server.
Otherwise composer will alert you.
git clone git://github.com/nicolasff/phpredis.git
cd phpredis
phpize
./configure
make
sudo make install
cd ..
echo "extension=redis.so" >> `php --ini | grep "Loaded Configuration" | sed -e "s|.*:\s*||"`
Installing RSQueue
You have to add require line into you composer.json file
"require": {
"php": ">=5.3.3",
"symfony/symfony": "2.3.*",
...
"mmoreram/rsqueue-bundle": "dev-master"
},
Then you have to use composer to update your project dependencies
php composer.phar update
And register the bundle in your appkernel.php file
return array(
// ...
new Mmoreram\RSQueueBundle\RSQueueBundle(),
// ...
);
In this first version, all conections are localhost:6379, but as soon as posible connections will be configurable.
You need to configure all queues and serializer.
By default serializer has the value 'Json', but also 'PHP' value can be used. Also custom serializer can be implemented by extending default serializer interface. Then you need to add namespace of class into the rs_queue.serializer parameter.
rs_queue:
# Queues definition
queues:
videos: "queues:videos"
audios: "queues:audios"
# Serializer definition
serializer: ~
# Server configuration. By default, these values
server:
redis:
host: 127.0.0.1
port: 6379
database: ~
Producer/consumer model allows you to produce elements into one/many queues by using default rsqueue producer service.
One element is pushed into one queue so one and only one consumer will pop and treat this element.
$this->container->get("rs_queue.producer")->produce("videos", "this is my video");
$this->container->get("rs_queue.producer")->produce("audios", "this is my audio");
Then you should extend ConsumerCommand so that in this way you can define which queues listen, and in each case, which action execute.
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Mmoreram\RSQueueBundle\Command\ConsumerCommand;
/**
* Testing consumer command
*/
class TestConsumerCommand extends ConsumerCommand
{
/**
* Configuration method
*/
protected function configure()
{
$this
->setName('test:consumer')
->setDescription('Testing consumer command');
;
parent::configure();
}
/**
* Relates queue name with appropiated method
*/
public function define()
{
$this->addQueue('videos', 'consumeVideo');
}
/**
* If many queues are defined, as Redis respects order of queues, you can shuffle them
* just overwritting method shuffleQueues() and returning true
*
* @return boolean Shuffle before passing to Gearman
*/
public function shuffleQueues()
{
return true;
}
/**
* Consume method with retrieved queue value
*
* @param InputInterface $input An InputInterface instance
* @param OutputInterface $output An OutputInterface instance
* @param Mixed $payload Data retrieved and unserialized from queue
*/
protected function consumeVideo(InputInterface $input, OutputInterface $output, $payload)
{
$output->writeln($payload);
}
}
This model allows data broadcasting. This means that one or more Subscribers will treat all elements of the queue, but only if they are listening just in the moment publisher publish them.
$this->container->get("rs_queue.publisher")->publish("audios", "this is my audio");
And, as consumers, subscribers must define which channels they want to listen
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Mmoreram\RSQueueBundle\Command\SubscriberCommand;
/**
* Testing subscriber command
*/
class TestSubscriberCommand extends SubscriberCommand
{
/**
* Configuration method
*/
protected function configure()
{
$this
->setName('test:subscriber:audios')
->setDescription('Testing subscriber audios command');
;
parent::configure();
}
/**
* Relates queue name with appropiated method
*/
public function define()
{
$this->addChannel('audios', 'consumeAudio');
}
/**
* If many queues are defined, as Redis respects order of queues, you can shuffle them
* just overwritting method shuffleQueues() and returning true
*
* @return boolean Shuffle before passing to Gearman
*/
public function shuffleQueues()
{
return true;
}
/**
* subscriber method with retrieved queue value
*
* @param InputInterface $input An InputInterface instance
* @param OutputInterface $output An OutputInterface instance
* @param Mixed $payload Data retrieved and unserialized from queue
*/
protected function consumeAudio(InputInterface $input, OutputInterface $output, $payload)
{
$output->writeln($payload);
}
}
By extending PSubscriberCommand you can define patterns instead of queue names.
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Mmoreram\RSQueueBundle\Command\PSubscriberCommand;
/**
* Testing PSubscriber command
*/
class TestPSubscriberCommand extends PSubscriberCommand
{
/**
* Configuration method
*/
protected function configure()
{
$this
->setName('test:psubscriber')
->setDescription('Testing psubscriber command');
;
parent::configure();
}
/**
* Relates queue name with appropiated method
*/
public function define()
{
$this->addPattern('*', 'consumeAll');
}
/**
* If many queues are defined, as Redis respects order of queues, you can shuffle them
* just overwritting method shuffleQueues() and returning true
*
* @return boolean Shuffle before passing to Gearman
*/
public function shuffleQueues()
{
return true;
}
/**
* Consume method with retrieved queue value
*
* @param InputInterface $input An InputInterface instance
* @param OutputInterface $output An OutputInterface instance
* @param Mixed $payload Data retrieved and unserialized from queue
*/
protected function consumeAll(InputInterface $input, OutputInterface $output, $payload)
{
$output->writeln($payload);
}
}
Custom events are used in this bundle.
/**
* The rs_queue.consumer is thrown each time a job is consumed by consumer
*
* The event listener recieves an
* Mmoreram\RSQueueBundle\Event\RSQueueConsumerEvent instance
*
* @var string
*/
const RSQUEUE_CONSUMER = 'rs_queue.consumer';
/**
* The rs_queue.subscriber is thrown each time a job is consumed by subscriber
*
* The event listener recieves an
* Mmoreram\RSQueueBundle\Event\RSQueueSubscriberEvent instance
*
* @var string
*/
const RSQUEUE_SUBSCRIBER = 'rs_queue.subscriber';
/**
* The rs_queue.producer is thrown each time a job is consumed by producer
*
* The event listener recieves an
* Mmoreram\RSQueueBundle\Event\RSQueueProducerEvent instance
*
* @var string
*/
const RSQUEUE_PRODUCER = 'rs_queue.producer';
/**
* The rs_queue.publisher is thrown each time a job is consumed by publisher
*
* The event listener recieves an
* Mmoreram\RSQueueBundle\Event\RSQueuePublisherEvent instance
*
* @var string
*/
const RSQUEUE_PUBLISHER = 'rs_queue.publisher';
All code is Symfony2 Code formatted, so every pull request must validate phpcs standards. You should read Symfony2 coding standards and install this CodeSniffer to check all code is validated.
There is also a policy for contributing to this project. All pull request must be all explained step by step, to make us more understandable and easier to merge pull request. All new features must be tested with PHPUnit.
If you'd like to contribute, please read the Contributing Code part of the documentation. If you're submitting a pull request, please follow the guidelines in the Submitting a Patch section and use the Pull Request Template.