Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"require-dev": {
"phpunit/phpunit": "~3.7",
"aws/aws-sdk-php": "~2.5",
"iron-io/iron_mq": "~1.5",
"iron-io/iron_mq": "^4.0",
"symfony/finder": "~2.3",
"symfony/filesystem": "~2.3"
},
Expand Down
2 changes: 1 addition & 1 deletion docs/iron-mq-provider.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ and needs to have the library included in your ``composer.json`` file.

{
require: {
"iron-io/iron_mq": "~1.5"
"iron-io/iron_mq": "^4.0"
}
}

Expand Down
22 changes: 15 additions & 7 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,17 @@ private function getProvidersNode()
->scalarNode('project_id')->end()
->scalarNode('service')->end()
->enumNode('host')
->defaultValue('mq-aws-us-east-1')
->defaultValue('mq-aws-eu-west-1-1')
->values([
'mq-aws-us-east-1',
'mq-aws-eu-west-1',
'mq-rackspace-ord',
'mq-rackspace-lon',
'mq-aws-eu-west-1-1',
'mq-aws-us-east-1-1',
])
->end()
->scalarNode('port')
->defaultValue('443')
->end()
->scalarNode('api_version')
->defaultValue(1)
->defaultValue(3)
->end()
// AWS
->scalarNode('key')->end()
Expand Down Expand Up @@ -145,12 +143,17 @@ private function getQueuesNode()
->defaultFalse()
->info('Whether notifications are sent to the subscribers')
->end()
->scalarNode('push_type')
->defaultValue('multicast')
->info('Whether the push queue is multicast or unicast')
->example('unicast')
->end()
->scalarNode('notification_retries')
->defaultValue(3)
->info('How many attempts the Push Notifications are retried if the Subscriber returns an error')
->example(3)
->end()
->scalarNode('notification_retry_delay')
->scalarNode('notification_retries_delay')
->defaultValue(60)
->info('Delay between each Push Notification retry in seconds')
->example(3)
Expand Down Expand Up @@ -180,6 +183,11 @@ private function getQueuesNode()
->info('How many seconds to Long Poll when requesting messages - if supported')
->example(3)
->end()
->scalarNode('rate_limit')
->defaultValue(-1)
->info('How many push requests per second will be triggered. -1 for unlimited, 0 disables push')
->example(1)
->end()
->append($this->getSubscribersNode())
->end()
->end()
Expand Down
4 changes: 2 additions & 2 deletions src/DependencyInjection/UecodeQPushExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,13 @@ private function createIronMQClient($config, ContainerBuilder $container, $name)

if (!$container->hasDefinition($service)) {

if (!class_exists('IronMQ')) {
if (!class_exists('IronMQ\IronMQ')) {
throw new \RuntimeException(
'You must require "iron-io/iron_mq" to use the Iron MQ provider.'
);
}

$ironmq = new Definition('IronMQ');
$ironmq = new Definition('IronMQ\IronMQ');
$ironmq->setArguments([
[
'token' => $config['token'],
Expand Down
77 changes: 70 additions & 7 deletions src/Provider/IronMqProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace Uecode\Bundle\QPushBundle\Provider;

use IronMQ;
use IronMQ\IronMQ;
use Doctrine\Common\Cache\Cache;
use Symfony\Bridge\Monolog\Logger;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
Expand Down Expand Up @@ -71,10 +71,13 @@ public function create()
{
if ($this->options['push_notifications']) {
$params = [
'push_type' => 'multicast',
'retries' => $this->options['notification_retries'],
'retry_delay' => $this->options['notification_retry_delay'],
'subscribers' => []
'type' => $this->options['push_type'],
'push' => [
'rate_limit' => $this->options['rate_limit'],
'retries' => $this->options['notification_retries'],
'retries_delay' => $this->options['notification_retries_delay'],
'subscribers' => []
]
];

foreach ($this->options['subscribers'] as $subscriber) {
Expand All @@ -84,14 +87,14 @@ public function create()
);
}

$params['subscribers'][] = ['url' => $subscriber['endpoint']];
$params['push']['subscribers'][] = ['url' => $subscriber['endpoint']];
}

} else {
$params = ['push_type' => 'pull'];
}

$result = $this->ironmq->updateQueue($this->getNameWithPrefix(), $params);
$result = $this->ironmq->createQueue($this->getNameWithPrefix(), $params);
$this->queue = $result;

$key = $this->getNameWithPrefix();
Expand Down Expand Up @@ -301,4 +304,64 @@ public function onMessageReceived(MessageEvent $event)

$event->stopPropagation();
}

/**
* Get queue info
*
* This allows to get queue size. Allowing to know if processing is finished or not
*
* @return stdObject|null
*/
public function queueInfo()
{
if ($this->queueExists()) {
$key = $this->getNameWithPrefix();
$this->queue = $this->ironmq->getQueue($key);

return $this->queue;
}

return null;
}

/**
* Publishes multiple message at once
*
* @param array $messages
* @param array $options
*
* @return array
*/
public function publishMessages(array $messages, array $options = [])
{
$options = $this->mergeOptions($options);
$publishStart = microtime(true);

if (!$this->queueExists()) {
$this->create();
}

$encodedMessages = [];
foreach ($messages as $message) {
$encodedMessages[] = json_encode($message + ['_qpush_queue' => $this->name]);
}

$result = $this->ironmq->postMessages(
$this->getNameWithPrefix(),
$encodedMessages,
[
'timeout' => $options['message_timeout'],
'delay' => $options['message_delay'],
'expires_in' => $options['message_expiration']
]
);

$context = [
'message_ids' => $result->ids,
'publish_time' => microtime(true) - $publishStart
];
$this->log(200, "Messages have been published.", $context);

return $result->ids;
}
}
14 changes: 13 additions & 1 deletion tests/MockClient/IronMqMockClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class IronMqMockClient
{
private $deleteCount = 0;

public function updateQueue($queue, array $params = [])
public function createQueue($queue, array $params = [])
{
$response = new \stdClass;
$response->id = '530295fe3c94fbcf0c79cffe';
Expand Down Expand Up @@ -97,4 +97,16 @@ public function deleteMessage($queue, $id)

return $response;
}

public function getQueue($queue)
{
$response = new \stdClass;
$response->id = '530295fe3c94fbcf0c79cffe';
$response->name = 'test';
$response->size = 0;
$response->total_messages = 0;
$response->project_id = '52f67d032001c00005000057';

return $response;
}
}
33 changes: 23 additions & 10 deletions tests/Provider/IronMqProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,18 @@ private function getIronMqProvider(array $options = [])
{
$options = array_merge(
[
'logging_enabled' => false,
'push_notifications' => true,
'notification_retries' => 3,
'notification_retry_delay' => 60,
'message_delay' => 0,
'message_timeout' => 30,
'message_expiration' => 604800,
'messages_to_receive' => 1,
'receive_wait_time' => 3,
'subscribers' => [
'logging_enabled' => false,
'push_notifications' => true,
'push_type' => 'multicast',
'notification_retries' => 3,
'notification_retries_delay' => 60,
'message_delay' => 0,
'message_timeout' => 30,
'message_expiration' => 604800,
'messages_to_receive' => 1,
'rate_limit' => -1,
'receive_wait_time' => 3,
'subscribers' => [
[ 'protocol' => 'http', 'endpoint' => 'http://fake.com' ]
]
],
Expand Down Expand Up @@ -195,4 +197,15 @@ public function testOnMessageReceived()
new Message(123, ['foo' => 'bar'], [])
));
}

public function testQueueInfo()
{
$this->assertNull($this->provider->queueInfo());

$this->provider->create();
$queue = $this->provider->queueInfo();
$this->assertEquals('530295fe3c94fbcf0c79cffe', $queue->id);
$this->assertEquals('test', $queue->name);
$this->assertEquals('52f67d032001c00005000057', $queue->project_id);
}
}