Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consumption] Do not close context. #183

Merged
merged 1 commit into from
Aug 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pkg/enqueue/Consumption/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ public function consume(ExtensionInterface $runtimeExtension = null)
$context->setExecutionInterrupted(true);

$extension->onInterrupted($context);
$this->psrContext->close();

return;
} catch (\Exception $exception) {
Expand All @@ -158,10 +157,8 @@ public function consume(ExtensionInterface $runtimeExtension = null)

try {
$this->onInterruptionByException($extension, $context);
$this->psrContext->close();
} catch (\Exception $e) {
// for some reason finally does not work here on php5.5
$this->psrContext->close();

throw $e;
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
$this->consumer->bind($queue, $this->processor);
}

try {
$this->consumer->consume($this->getRuntimeExtensions($input, $output));
} finally {
$this->consumer->getPsrContext()->close();
}
$this->consumer->consume($this->getRuntimeExtensions($input, $output));
}

/**
Expand Down
6 changes: 1 addition & 5 deletions pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ protected function execute(InputInterface $input, OutputInterface $output)

$runtimeExtensions = new ChainExtension($extensions);

try {
$this->consumer->consume($runtimeExtensions);
} finally {
$this->consumer->getPsrContext()->close();
}
$this->consumer->consume($runtimeExtensions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,10 @@ protected function execute(InputInterface $input, OutputInterface $output)

$runtimeExtensions = new ChainExtension($extensions);

try {
foreach ($queues as $queue) {
$this->consumer->bind($queue, $processor);
}

$this->consumer->consume($runtimeExtensions);
} finally {
$this->consumer->getPsrContext()->close();
foreach ($queues as $queue) {
$this->consumer->bind($queue, $processor);
}

$this->consumer->consume($runtimeExtensions);
}
}
8 changes: 4 additions & 4 deletions pkg/enqueue/Tests/Consumption/QueueConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -652,13 +652,13 @@ public function testShouldAllowInterruptConsumingOnIdle()
$queueConsumer->consume();
}

public function testShouldCloseSessionWhenConsumptionInterrupted()
public function testShouldNotCloseContextWhenConsumptionInterrupted()
{
$messageConsumerStub = $this->createMessageConsumerStub($message = null);

$contextStub = $this->createPsrContextStub($messageConsumerStub);
$contextStub
->expects($this->once())
->expects($this->never())
->method('close')
;

Expand All @@ -681,15 +681,15 @@ public function testShouldCloseSessionWhenConsumptionInterrupted()
$queueConsumer->consume();
}

public function testShouldCloseSessionWhenConsumptionInterruptedByException()
public function testShouldNotCloseContextWhenConsumptionInterruptedByException()
{
$expectedException = new \Exception();

$messageConsumerStub = $this->createMessageConsumerStub($message = $this->createMessageMock());

$contextStub = $this->createPsrContextStub($messageConsumerStub);
$contextStub
->expects($this->once())
->expects($this->never())
->method('close')
;

Expand Down
14 changes: 2 additions & 12 deletions pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public function testShouldExecuteConsumptionAndUseDefaultQueueName()

$context = $this->createPsrContextMock();
$context
->expects($this->once())
->expects($this->never())
->method('close')
;

Expand All @@ -106,11 +106,6 @@ public function testShouldExecuteConsumptionAndUseDefaultQueueName()
->method('consume')
->with($this->isInstanceOf(ChainExtension::class))
;
$consumer
->expects($this->once())
->method('getPsrContext')
->will($this->returnValue($context))
;

$queueMetaRegistry = $this->createQueueMetaRegistry([
'default' => [],
Expand Down Expand Up @@ -138,7 +133,7 @@ public function testShouldExecuteConsumptionAndUseCustomClientDestinationName()

$context = $this->createPsrContextMock();
$context
->expects($this->once())
->expects($this->never())
->method('close')
;

Expand All @@ -153,11 +148,6 @@ public function testShouldExecuteConsumptionAndUseCustomClientDestinationName()
->method('consume')
->with($this->isInstanceOf(ChainExtension::class))
;
$consumer
->expects($this->once())
->method('getPsrContext')
->will($this->returnValue($context))
;

$queueMetaRegistry = $this->createQueueMetaRegistry([
'non-default-queue' => [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public function testShouldExecuteConsumption()
{
$context = $this->createContextMock();
$context
->expects($this->once())
->expects($this->never())
->method('close')
;

Expand All @@ -58,11 +58,6 @@ public function testShouldExecuteConsumption()
->method('consume')
->with($this->isInstanceOf(ChainExtension::class))
;
$consumer
->expects($this->once())
->method('getPsrContext')
->will($this->returnValue($context))
;

$command = new ConsumeMessagesCommand($consumer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ public function testThrowIfNeitherQueueOptionNorProcessorImplementsQueueSubscrib
]);
}

public function testShouldExecuteConsumptionWithExplisitlySetQueueViaQueueOption()
public function testShouldExecuteConsumptionWithExplicitlySetQueueViaQueueOption()
{
$processor = $this->createProcessor();

$context = $this->createContextMock();
$context
->expects($this->once())
->expects($this->never())
->method('close')
;

Expand All @@ -118,11 +118,6 @@ public function testShouldExecuteConsumptionWithExplisitlySetQueueViaQueueOption
->method('consume')
->with($this->isInstanceOf(ChainExtension::class))
;
$consumer
->expects($this->exactly(1))
->method('getPsrContext')
->will($this->returnValue($context))
;

$container = new Container();
$container->set('processor-service', $processor);
Expand All @@ -143,7 +138,7 @@ public function testShouldExecuteConsumptionWhenProcessorImplementsQueueSubscrib

$context = $this->createContextMock();
$context
->expects($this->once())
->expects($this->never())
->method('close')
;

Expand All @@ -163,11 +158,6 @@ public function testShouldExecuteConsumptionWhenProcessorImplementsQueueSubscrib
->method('consume')
->with($this->isInstanceOf(ChainExtension::class))
;
$consumer
->expects($this->at(3))
->method('getPsrContext')
->will($this->returnValue($context))
;

$container = new Container();
$container->set('processor-service', $processor);
Expand Down