-
Notifications
You must be signed in to change notification settings - Fork 11
Open
Description
Currently we use Prooph Event Sourcing for our Aggregate Roots. Since the Prooph Service Bus was deprecated we use the Symfony Messenger instead.
services:
_defaults:
public: false
Prooph\EventStoreBusBridge\EventPublisher:
class: Acme\Shared\Infrastructure\Prooph\EventPublisher
arguments:
- '@event.bus'
tags:
- { name: 'prooph_event_store.default.plugin' }
<?php
namespace Acme\Shared\Infrastructure\Prooph;
use Iterator;
use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Plugin\AbstractPlugin;
use Prooph\EventStore\TransactionalActionEventEmitterEventStore;
use Symfony\Component\Messenger\MessageBusInterface;
final class EventPublisher extends AbstractPlugin
{
private MessageBusInterface $eventBus;
/**
* @var Iterator[]
*/
private array $cachedEventStreams = [];
public function __construct(MessageBusInterface $eventBus)
{
$this->eventBus = $eventBus;
}
public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void
{
$this->listenerHandlers[] = $eventStore->attach(
ActionEventEmitterEventStore::EVENT_APPEND_TO,
function (ActionEvent $event) use ($eventStore): void {
$recordedEvents = $event->getParam('streamEvents', new \ArrayIterator());
if (! $this->inTransaction($eventStore)) {
if ($event->getParam('streamNotFound', false)
|| $event->getParam('concurrencyException', false)
) {
return;
}
foreach ($recordedEvents as $recordedEvent) {
$this->eventBus->dispatch($recordedEvent);
}
} else {
$this->cachedEventStreams[] = $recordedEvents;
}
}
);
$this->listenerHandlers[] = $eventStore->attach(
ActionEventEmitterEventStore::EVENT_CREATE,
function (ActionEvent $event) use ($eventStore): void {
$stream = $event->getParam('stream');
$recordedEvents = $stream->streamEvents();
if (! $this->inTransaction($eventStore)) {
if ($event->getParam('streamExistsAlready', false)) {
return;
}
foreach ($recordedEvents as $recordedEvent) {
$this->eventBus->dispatch($recordedEvent);
}
} else {
$this->cachedEventStreams[] = $recordedEvents;
}
}
);
if ($eventStore instanceof TransactionalActionEventEmitterEventStore) {
$this->listenerHandlers[] = $eventStore->attach(
TransactionalActionEventEmitterEventStore::EVENT_COMMIT,
function (ActionEvent $event): void {
foreach ($this->cachedEventStreams as $stream) {
foreach ($stream as $recordedEvent) {
$this->eventBus->dispatch($recordedEvent);
}
}
$this->cachedEventStreams = [];
}
);
$this->listenerHandlers[] = $eventStore->attach(
TransactionalActionEventEmitterEventStore::EVENT_ROLLBACK,
function (ActionEvent $event): void {
$this->cachedEventStreams = [];
}
);
}
}
private function inTransaction(EventStore $eventStore): bool
{
return $eventStore instanceof TransactionalActionEventEmitterEventStore
&& $eventStore->inTransaction();
}
}We have some legacy projects using Doctrine Entities. We don't want to use Event Sourcing. But we would like to publish Domain Events from the Entity to the Messenger Bus.
Has anybody implemented a Domain Publisher using Doctrine Event Subscribers or Lifecycle Callbacks for instance? I guess flushing on the entity manager and publishing the domain event should happen in the same transaction?
Metadata
Metadata
Assignees
Labels
No labels