diff --git a/drush.services.yml b/drush.services.yml index 61735ee1..25017c35 100644 --- a/drush.services.yml +++ b/drush.services.yml @@ -7,12 +7,6 @@ services: - '@datetime.time' tags: - { name: drush.command } - helfi_api_base.pubsub_commands: - class: \Drupal\helfi_api_base\Commands\PubSubCommands - arguments: - - '@helfi_api_base.pubsub_manager' - tags: - - { name: drush.command } helfi_api_base.deploy_commands: class: \Drupal\helfi_api_base\Commands\DeployCommands arguments: ['@event_dispatcher'] diff --git a/helfi_api_base.services.yml b/helfi_api_base.services.yml index 1c4e2ffa..86b93760 100644 --- a/helfi_api_base.services.yml +++ b/helfi_api_base.services.yml @@ -108,53 +108,15 @@ services: arguments: - '@config.factory' - Drupal\helfi_api_base\Azure\PubSub\SettingsFactory: '@helfi_api_base.pubsub_settings_factory' - helfi_api_base.pubsub_settings_factory: - class: Drupal\helfi_api_base\Azure\PubSub\SettingsFactory - arguments: - - '@helfi_api_base.vault_manager' - - Drupal\helfi_api_base\Azure\PubSub\PubSubClientFactory: '@helfi_api_base.pubsub_client_factory' - helfi_api_base.pubsub_client_factory: - class: Drupal\helfi_api_base\Azure\PubSub\PubSubClientFactory - - helfi_api_base.pubsub_client: - public: false - class: \WebSocket\Client - factory: ['@helfi_api_base.pubsub_client_factory', 'create'] - arguments: - - '@helfi_api_base.pubsub_settings' - - '@datetime.time' - - Drupal\helfi_api_base\Azure\PubSub\Settings: '@helfi_api_base.pubsub_settings' - helfi_api_base.pubsub_settings: - class: Drupal\helfi_api_base\Azure\PubSub\Settings - factory: ['@helfi_api_base.pubsub_settings_factory', 'create'] - - Drupal\helfi_api_base\Azure\PubSub\PubSubManager: '@helfi_api_base.pubsub_manager' - helfi_api_base.pubsub_manager: - class: Drupal\helfi_api_base\Azure\PubSub\PubSubManager - arguments: - - '@helfi_api_base.pubsub_client' - - '@event_dispatcher' - - '@datetime.time' - - '@helfi_api_base.pubsub_settings' - - Drupal\helfi_api_base\Cache\CacheTagInvalidatorInterface: '@helfi_api_base.cache_tag_invalidator' - Drupal\helfi_api_base\Cache\CacheTagInvalidator: '@helfi_api_base.cache_tag_invalidator' - helfi_api_base.cache_tag_invalidator: - class: Drupal\helfi_api_base\Cache\CacheTagInvalidator - arguments: - - '@helfi_api_base.pubsub_manager' - - Drupal\helfi_api_base\EventSubscriber\CacheTagInvalidatorSubscriber: '@helfi_api_base.cache_tag_invalidator_subscriber' - helfi_api_base.cache_tag_invalidator_subscriber: - class: Drupal\helfi_api_base\EventSubscriber\CacheTagInvalidatorSubscriber - arguments: - - '@cache_tags.invalidator' - - '@helfi_api_base.environment_resolver' - tags: - - { name: event_subscriber } + Drupal\helfi_api_base\Azure\PubSub\PubSubClientFactory: ~ + Drupal\helfi_api_base\Azure\PubSub\Settings: + factory: ['Drupal\helfi_api_base\Azure\PubSub\SettingsFactory', 'create'] + Drupal\helfi_api_base\Azure\PubSub\PubSubManager: ~ + Drupal\helfi_api_base\Azure\PubSub\PubSubManagerInterface: '@Drupal\helfi_api_base\Azure\PubSub\PubSubManager' + Drupal\helfi_api_base\Cache\CacheTagInvalidatorInterface: '@Drupal\helfi_api_base\Cache\CacheTagInvalidator' + Drupal\helfi_api_base\Cache\CacheTagInvalidator: ~ + + Drupal\helfi_api_base\EventSubscriber\CacheTagInvalidatorSubscriber: ~ Drupal\helfi_api_base\Entity\Revision\RevisionManager: '@helfi_api_base.revision_manager' helfi_api_base.revision_manager: diff --git a/src/Azure/PubSub/AccessTokenType.php b/src/Azure/PubSub/AccessTokenType.php new file mode 100644 index 00000000..dced4b5f --- /dev/null +++ b/src/Azure/PubSub/AccessTokenType.php @@ -0,0 +1,13 @@ +endpoint, '/'), $settings->hub); + public function create(AccessTokenType $type) : Client { + $url = sprintf('wss://%s/client/hubs/%s', rtrim($this->settings->endpoint, '/'), $this->settings->hub); + + $accessKey = $this->settings->accessKey; + if ($type === AccessTokenType::Secondary) { + $accessKey = $this->settings->secondaryAccessKey; + } $authorizationToken = JWT::encode([ 'aud' => $url, - 'iat' => $time->getCurrentTime(), - 'exp' => $time->getCurrentTime() + 3600, + 'iat' => $this->time->getCurrentTime(), + 'exp' => $this->time->getCurrentTime() + 3600, 'role' => [ 'webpubsub.sendToGroup', 'webpubsub.joinLeaveGroup', ], - ], $settings->accessKey, 'HS256'); + ], $accessKey, 'HS256'); return new Client($url, [ 'headers' => [ diff --git a/src/Azure/PubSub/PubSubManager.php b/src/Azure/PubSub/PubSubManager.php index 0658782b..b7f09d03 100644 --- a/src/Azure/PubSub/PubSubManager.php +++ b/src/Azure/PubSub/PubSubManager.php @@ -6,7 +6,6 @@ use Drupal\Component\Datetime\TimeInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; -use WebSocket\Client; use WebSocket\ConnectionException; /** @@ -24,8 +23,6 @@ final class PubSubManager implements PubSubManagerInterface { /** * Constructs a new instance. * - * @param \WebSocket\Client $client - * The websocket client. * @param \Symfony\Contracts\EventDispatcher\EventDispatcherInterface $eventDispatcher * The event dispatcher service. * @param \Drupal\Component\Datetime\TimeInterface $time @@ -34,13 +31,41 @@ final class PubSubManager implements PubSubManagerInterface { * The PubSub settings. */ public function __construct( - private readonly Client $client, + private readonly PubSubClientFactory $clientFactory, private readonly EventDispatcherInterface $eventDispatcher, private readonly TimeInterface $time, private readonly Settings $settings, ) { } + private function clientReceive() : string { + try { + return (string) $this->clientFactory + ->create(AccessTokenType::Primary) + ->receive(); + } + catch (ConnectionException) { + } + return (string) $this->clientFactory + ->create(AccessTokenType::Secondary) + ->receive(); + } + + private function clientText(array $message) : void { + $message = $this->encodeMessage($message); + + try { + $this->clientFactory + ->create(AccessTokenType::Primary) + ->text($message); + } + catch (ConnectionException) { + } + $this->clientFactory + ->create(AccessTokenType::Secondary) + ->text($message); + } + /** * Joins a group. * @@ -52,16 +77,14 @@ private function joinGroup() : void { if ($this->joinedGroup) { return; } - $this->client->text( - $this->encodeMessage([ - 'type' => 'joinGroup', - 'group' => $this->settings->group, - ]) - ); + $this->clientText([ + 'type' => 'joinGroup', + 'group' => $this->settings->group, + ]); try { // Wait until we've actually joined the group. - $message = $this->decodeMessage((string) $this->client->receive()); + $message = $this->decodeMessage($this->clientReceive()); if (isset($message['event']) && $message['event'] === 'connected') { $this->joinedGroup = TRUE; @@ -135,17 +158,15 @@ public function sendMessage(array $message) : self { $this->assertSettings(); $this->joinGroup(); - $this->client - ->text( - $this->encodeMessage([ - 'type' => 'sendToGroup', - 'group' => $this->settings->group, - 'dataType' => 'json', - 'data' => $message + [ - 'timestamp' => $this->time->getCurrentTime(), - ], - ]) - ); + $this + ->clientText([ + 'type' => 'sendToGroup', + 'group' => $this->settings->group, + 'dataType' => 'json', + 'data' => $message + [ + 'timestamp' => $this->time->getCurrentTime(), + ], + ]); return $this; } @@ -157,7 +178,7 @@ public function receive() : string { $this->assertSettings(); $this->joinGroup(); - $message = (string) $this->client->receive(); + $message = $this->clientReceive(); $json = $this->decodeMessage($message); $this->eventDispatcher diff --git a/src/Azure/PubSub/Settings.php b/src/Azure/PubSub/Settings.php index 166e844b..fcd0d6aa 100644 --- a/src/Azure/PubSub/Settings.php +++ b/src/Azure/PubSub/Settings.php @@ -19,13 +19,16 @@ final class Settings { * @param string $endpoint * The API endpoint. * @param string $accessKey - * The API access token. + * The API access key. + * @param string $secondaryAccessKey + * The secondary API access key. */ public function __construct( public readonly string $hub, public readonly string $group, public readonly string $endpoint, public readonly string $accessKey, + public readonly string $secondaryAccessKey, ) { } diff --git a/src/Azure/PubSub/SettingsFactory.php b/src/Azure/PubSub/SettingsFactory.php index f7ef26e9..715f5769 100644 --- a/src/Azure/PubSub/SettingsFactory.php +++ b/src/Azure/PubSub/SettingsFactory.php @@ -34,6 +34,7 @@ public function create() : Settings { 'group' => '', 'endpoint' => '', 'access_key' => '', + 'secondary_access_key' => '', ]; if ($settings = $this->vaultManager->get('pubsub')) { @@ -50,7 +51,8 @@ public function create() : Settings { $data->hub ?: '', $data->group ?: '', $data->endpoint ?: '', - $data->access_key ?: '' + $data->access_key ?: '', + $data->secondary_access_key ?: '', ); } diff --git a/src/Commands/PubSubCommands.php b/src/Commands/PubSubCommands.php index 7fd0619e..d0f9a94b 100644 --- a/src/Commands/PubSubCommands.php +++ b/src/Commands/PubSubCommands.php @@ -4,6 +4,7 @@ namespace Drupal\helfi_api_base\Commands; +use Drupal\Core\DependencyInjection\AutowireTrait; use Drupal\helfi_api_base\Azure\PubSub\PubSubManagerInterface; use Drush\Attributes\Command; use Drush\Commands\DrushCommands; @@ -20,6 +21,8 @@ */ final class PubSubCommands extends DrushCommands { + use AutowireTrait; + public const MAX_MESSAGES = 100; public const CLIENT_TIMEOUT = 120;