Skip to content

Commit 17f6bdf

Browse files
committed
Add a routed message producer to produce to multiple connections
1 parent 2df7480 commit 17f6bdf

File tree

6 files changed

+207
-63
lines changed

6 files changed

+207
-63
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
<?php
2+
3+
namespace spec\ContinuousPipe\Message\Router;
4+
5+
use ContinuousPipe\Message\Delay\DelayedMessage;
6+
use ContinuousPipe\Message\Message;
7+
use ContinuousPipe\Message\MessageException;
8+
use ContinuousPipe\Message\MessageProducer;
9+
use ContinuousPipe\River\Message\OperationalMessage;
10+
use PhpSpec\ObjectBehavior;
11+
use Prophecy\Argument;
12+
13+
class RoutedMessageProducerSpec extends ObjectBehavior
14+
{
15+
function let(MessageProducer $operationalProducer, MessageProducer $delayedProducer, MessageProducer $wildcardProducer)
16+
{
17+
$this->beConstructedWith([
18+
DelayedMessage::class => $delayedProducer,
19+
OperationalMessage::class => $operationalProducer,
20+
'*' => $wildcardProducer,
21+
]);
22+
}
23+
24+
function it_matches_interfaces_and_call_the_correct_producer(DelayedMessage $message, MessageProducer $delayedProducer, MessageProducer $wildcardProducer)
25+
{
26+
$delayedProducer->produce($message)->shouldBeCalled();
27+
$wildcardProducer->produce(Argument::any())->shouldNotBeCalled();
28+
29+
$this->produce($message);
30+
}
31+
32+
function it_supports_a_wildcard(Message $message, MessageProducer $wildcardProducer)
33+
{
34+
$wildcardProducer->produce($message)->shouldBeCalled();
35+
36+
$this->produce($message);
37+
}
38+
39+
function it_throws_an_exception_if_no_producer_is_matched(Message $message)
40+
{
41+
$this->beConstructedWith([]);
42+
43+
$this->shouldThrow(MessageException::class)->duringProduce($message);
44+
}
45+
}

spec/ContinuousPipe/Message/Transaction/ExtendDeadlineDuringTransactionSpec.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ function it_runs_the_message(TransactionManager $transactionManager, PulledMessa
2828
$this->run($pulledMessage, function() {});
2929
}
3030

31-
function it_extends_the_message_dealine_when_it_is_a_long_running_with_an_exception(TransactionManager $transactionManager, PulledMessage $pulledMessage, LongRunningMessage $message, MessageDeadlineExtenderFactory $extenderFactory, MessageDeadlineExtender $extender)
31+
function it_extends_the_message_dealine_when_it_is_a_long_running_with_an_exception(TransactionManager $transactionManager, PulledMessage $pulledMessage, LongRunningMessage $longRunningMessage, MessageDeadlineExtenderFactory $extenderFactory, MessageDeadlineExtender $extender)
3232
{
33-
$pulledMessage->getMessage()->willReturn($message);
33+
$pulledMessage->getMessage()->willReturn($longRunningMessage);
3434

3535
$extenderFactory->forMessage($pulledMessage)->willReturn($extender);
3636
$transactionManager->run($pulledMessage, Argument::any())->willThrow(new \RuntimeException('Something went wrong'));

src/ContinuousPipe/Message/MessageProducer.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,10 @@
44

55
interface MessageProducer
66
{
7+
/**
8+
* @param Message $message
9+
*
10+
* @throws MessageException
11+
*/
712
public function produce(Message $message);
813
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
namespace ContinuousPipe\Message\Router;
4+
5+
use ContinuousPipe\Message\Message;
6+
use ContinuousPipe\Message\MessageException;
7+
use ContinuousPipe\Message\MessageProducer;
8+
9+
class RoutedMessageProducer implements MessageProducer
10+
{
11+
/**
12+
* @var array<string,MessageProducer>
13+
*/
14+
private $messageToProducerMapping;
15+
16+
/**
17+
* @param array<string,MessageProducer> $messageToProducerMapping
18+
*/
19+
public function __construct(array $messageToProducerMapping)
20+
{
21+
$this->messageToProducerMapping = $messageToProducerMapping;
22+
}
23+
24+
public function produce(Message $message)
25+
{
26+
foreach ($this->messageToProducerMapping as $messageType => $producer) {
27+
if ($this->messageMatches($message, $messageType)) {
28+
return $producer->produce($message);
29+
}
30+
}
31+
32+
throw new MessageException(sprintf(
33+
'Message of type "%s" did not match any producer',
34+
get_class($message)
35+
));
36+
}
37+
38+
private function messageMatches(Message $message, string $messageType) : bool
39+
{
40+
return is_a($message, $messageType)
41+
|| in_array($messageType, class_implements($message))
42+
|| $messageType == '*';
43+
}
44+
}

src/ContinuousPipe/MessageBundle/DependencyInjection/Configuration.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,22 @@ public function getConfigTreeBuilder()
4141
->scalarNode('subscription')->isRequired()->end()
4242
->end()
4343
->end()
44+
->arrayNode('router')
45+
->children()
46+
->arrayNode('message_to_connection_mapping')
47+
->useAttributeAsKey('message_class')
48+
->prototype('array')
49+
->beforeNormalization()
50+
->ifString()
51+
->then(function ($v) { return array('connection' => $v); })
52+
->end()
53+
->children()
54+
->scalarNode('connection')->isRequired()->end()
55+
->end()
56+
->end()
57+
->end()
58+
->end()
59+
->end()
4460
->end()
4561
->end()
4662
->booleanNode('debug')->defaultFalse()->end()

src/ContinuousPipe/MessageBundle/DependencyInjection/MessageExtension.php

Lines changed: 95 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use ContinuousPipe\Message\GooglePubSub\PubSubMessageProducer;
99
use ContinuousPipe\Message\GooglePubSub\PubSubMessagePuller;
1010
use ContinuousPipe\Message\InMemory\ArrayMessagePuller;
11+
use ContinuousPipe\Message\Router\RoutedMessageProducer;
1112
use Google\Cloud\ServiceBuilder;
1213
use Symfony\Component\Config\FileLocator;
1314
use Symfony\Component\DependencyInjection\ContainerBuilder;
@@ -26,13 +27,13 @@ public function load(array $configs, ContainerBuilder $container)
2627
$configuration = new Configuration();
2728
$config = $this->processConfiguration($configuration, $configs);
2829

29-
$loader = new Loader\XmlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
30+
$loader = new Loader\XmlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config'));
3031

3132
if (isset($config['simple_bus'])) {
3233
$loader->load('simple-bus.xml');
3334

3435
$container->getDefinition('continuouspipe.message.simple_bus.producer')->replaceArgument(0, new Reference(
35-
'continuouspipe.message.'.$config['simple_bus']['connection'].'.message_producer'
36+
'continuouspipe.message.' . $config['simple_bus']['connection'] . '.message_producer'
3637
));
3738
}
3839

@@ -49,8 +50,7 @@ public function load(array $configs, ContainerBuilder $container)
4950

5051
$container
5152
->getDefinition('continuouspipe.message.command.pull_and_consumer')
52-
->replaceArgument(0, new Reference('continuouspipe.message.'.$config['command']['connection'].'.message_puller'))
53-
;
53+
->replaceArgument(0, new Reference('continuouspipe.message.' . $config['command']['connection'] . '.message_puller'));
5454
}
5555

5656
$drivers = [];
@@ -61,7 +61,7 @@ public function load(array $configs, ContainerBuilder $container)
6161
}
6262

6363
foreach ($drivers as $driver) {
64-
if (file_exists($filePath = 'drivers/'.$driver.'.xml')) {
64+
if (file_exists($filePath = 'drivers/' . $driver . '.xml')) {
6565
$loader->load($filePath);
6666
}
6767
}
@@ -77,70 +77,104 @@ private function createConnection(ContainerBuilder $container, string $name, arr
7777
{
7878
$driverConfiguration = $configuration['driver'];
7979

80-
$pullerName = 'continuouspipe.message.' . $name . '.message_puller';
81-
$producerName = 'continuouspipe.message.' . $name . '.message_producer';
80+
$pullerName = $this->getConnectionPullerName($name);
81+
$producerName = $this->getConnectionProducerName($name);
8282

8383
if (array_key_exists('direct', $driverConfiguration)) {
84-
$container->setDefinition(
85-
$pullerName,
86-
new Definition(ArrayMessagePuller::class)
87-
);
88-
89-
$container->setDefinition(
90-
$producerName,
91-
new Definition(FromProducerToConsumer::class, [
92-
new Reference('continuouspipe.message.message_consumer'),
93-
new Reference('jms_serializer'),
94-
])
95-
);
96-
97-
$container->setDefinition(
98-
$producerName.'.delayed_messages_buffer',
99-
(new Definition(DelayedMessagesBuffer::class, [
100-
new Reference($producerName.'.delayed_messages_buffer.inner'),
101-
]))->setDecoratedService($producerName)
102-
);
103-
}
104-
105-
if (array_key_exists('google_pub_sub', $driverConfiguration)) {
106-
$container->setDefinition(
107-
$pullerName,
108-
new Definition(PubSubMessagePuller::class, [
109-
new Reference('jms_serializer'),
110-
new Reference('logger'),
111-
$driverConfiguration['google_pub_sub']['project_id'],
112-
$driverConfiguration['google_pub_sub']['service_account_path'],
113-
$driverConfiguration['google_pub_sub']['topic'],
114-
$driverConfiguration['google_pub_sub']['subscription']
115-
])
116-
);
117-
118-
$container->setDefinition(
119-
$producerName.'.service_builder',
120-
new Definition(ServiceBuilder::class, [
121-
[
122-
'projectId' => $driverConfiguration['google_pub_sub']['project_id'],
123-
'keyFilePath' => $driverConfiguration['google_pub_sub']['service_account_path'],
124-
]
125-
])
126-
);
127-
128-
$container->setDefinition(
129-
$producerName,
130-
new Definition(PubSubMessageProducer::class, [
131-
new Reference('jms_serializer'),
132-
new Reference($producerName.'.service_builder'),
133-
$driverConfiguration['google_pub_sub']['topic']
134-
])
135-
);
84+
$this->createDirectConnection($container, $pullerName, $producerName);
85+
} elseif (array_key_exists('google_pub_sub', $driverConfiguration)) {
86+
$this->createGooglePubSubConnection($container, $pullerName, $producerName, $driverConfiguration);
87+
} elseif (array_key_exists('router', $driverConfiguration)) {
88+
$this->createRouterConnection($container, $pullerName, $producerName, $driverConfiguration);
89+
} else {
90+
throw new \RuntimeException(sprintf(
91+
'Driver not found with the following configuration keys: %s',
92+
implode(', ', array_keys($driverConfiguration))
93+
));
13694
}
13795

13896
if ($configuration['debug']) {
139-
$container->setDefinition($producerName.'.traced', (new Definition(
97+
$container->setDefinition($producerName . '.traced', (new Definition(
14098
TracedMessageProducer::class, [
141-
new Reference($producerName.'.traced.inner')
99+
new Reference($producerName . '.traced.inner')
142100
]
143101
))->setDecoratedService($producerName));
144102
}
145103
}
104+
105+
private function createDirectConnection(ContainerBuilder $container, string $pullerName, string $producerName)
106+
{
107+
$container->setDefinition(
108+
$pullerName,
109+
new Definition(ArrayMessagePuller::class)
110+
);
111+
112+
$container->setDefinition(
113+
$producerName,
114+
new Definition(FromProducerToConsumer::class, [
115+
new Reference('continuouspipe.message.message_consumer'),
116+
new Reference('jms_serializer'),
117+
])
118+
);
119+
120+
$container->setDefinition(
121+
$producerName . '.delayed_messages_buffer',
122+
(new Definition(DelayedMessagesBuffer::class, [
123+
new Reference($producerName . '.delayed_messages_buffer.inner'),
124+
]))->setDecoratedService($producerName)
125+
);
126+
}
127+
128+
private function createGooglePubSubConnection(ContainerBuilder $container, string $pullerName, string $producerName, array $driverConfiguration)
129+
{
130+
$container->setDefinition(
131+
$pullerName,
132+
new Definition(PubSubMessagePuller::class, [
133+
new Reference('jms_serializer'),
134+
new Reference('logger'),
135+
$driverConfiguration['google_pub_sub']['project_id'],
136+
$driverConfiguration['google_pub_sub']['service_account_path'],
137+
$driverConfiguration['google_pub_sub']['topic'],
138+
$driverConfiguration['google_pub_sub']['subscription']
139+
])
140+
);
141+
142+
$container->setDefinition(
143+
$producerName . '.service_builder',
144+
new Definition(ServiceBuilder::class, [
145+
[
146+
'projectId' => $driverConfiguration['google_pub_sub']['project_id'],
147+
'keyFilePath' => $driverConfiguration['google_pub_sub']['service_account_path'],
148+
]
149+
])
150+
);
151+
152+
$container->setDefinition(
153+
$producerName,
154+
new Definition(PubSubMessageProducer::class, [
155+
new Reference('jms_serializer'),
156+
new Reference($producerName . '.service_builder'),
157+
$driverConfiguration['google_pub_sub']['topic']
158+
])
159+
);
160+
}
161+
162+
private function createRouterConnection(ContainerBuilder $container, string $pullerName, string $producerName, array $driverConfiguration)
163+
{
164+
$container->setDefinition($producerName, new Definition(RoutedMessageProducer::class, [
165+
array_map(function(string $connectionName) {
166+
return new Reference($this->getConnectionProducerName($connectionName));
167+
}, $driverConfiguration['message_to_connection_mapping']),
168+
]));
169+
}
170+
171+
private function getConnectionPullerName(string $name): string
172+
{
173+
return 'continuouspipe.message.' . $name . '.message_puller';
174+
}
175+
176+
private function getConnectionProducerName(string $name): string
177+
{
178+
return 'continuouspipe.message.' . $name . '.message_producer';
179+
}
146180
}

0 commit comments

Comments
 (0)