diff --git a/apps/solace-message-client-testing-app/src/app/subscriber/subscriber.component.html b/apps/solace-message-client-testing-app/src/app/subscriber/subscriber.component.html index 5479874..6057771 100644 --- a/apps/solace-message-client-testing-app/src/app/subscriber/subscriber.component.html +++ b/apps/solace-message-client-testing-app/src/app/subscriber/subscriber.component.html @@ -13,6 +13,7 @@ Topic Queue Topic Endpoint + Queue Browser diff --git a/apps/solace-message-client-testing-app/src/app/subscriber/subscriber.component.ts b/apps/solace-message-client-testing-app/src/app/subscriber/subscriber.component.ts index 3052038..bce9d85 100644 --- a/apps/solace-message-client-testing-app/src/app/subscriber/subscriber.component.ts +++ b/apps/solace-message-client-testing-app/src/app/subscriber/subscriber.component.ts @@ -65,8 +65,11 @@ export class SubscriberComponent implements OnDestroy { case SubscriptionDestinationType.TOPIC_ENDPOINT: { return this.solaceMessageClient.consume$(destination); } + case SubscriptionDestinationType.QUEUE_BROWSER: { + return this.solaceMessageClient.browse$(destination); + } default: { - throw Error(`[UnsupportedDestinationError] Expected '${SubscriptionDestinationType.TOPIC}', '${SubscriptionDestinationType.QUEUE}', or '${SubscriptionDestinationType.TOPIC_ENDPOINT}', but was ${destinationType}`); + throw Error(`[UnsupportedDestinationError] Expected '${SubscriptionDestinationType.TOPIC}', '${SubscriptionDestinationType.QUEUE}', '${SubscriptionDestinationType.TOPIC_ENDPOINT}', or '${SubscriptionDestinationType.QUEUE_BROWSER}', but was ${destinationType}`); } } })(); @@ -139,6 +142,7 @@ export class SubscriberComponent implements OnDestroy { Unlike a durable endpoint, the lifecycle of a non-durable endpoint (also known as a private, temporary endpoint) is bound to the client that created it, with an additional 60s in case of unexpected disconnect. Topic endpoints in particular are useful for not losing messages in the event of short connection interruptions as messages are retained on the broker until they are consumed.`, + queueBrowser: `Browses messages in a queue, without removing/consuming the messages.`, }; } @@ -146,4 +150,5 @@ export enum SubscriptionDestinationType { QUEUE = 'QUEUE', TOPIC = 'TOPIC', TOPIC_ENDPOINT = 'TOPIC_ENDPOINT', + QUEUE_BROWSER = 'QUEUE_BROWSER', } diff --git a/projects/solace-message-client/src/lib/solace-message-client.spec.ts b/projects/solace-message-client/src/lib/solace-message-client.spec.ts index 47add2a..7441f05 100644 --- a/projects/solace-message-client/src/lib/solace-message-client.spec.ts +++ b/projects/solace-message-client/src/lib/solace-message-client.spec.ts @@ -5,7 +5,7 @@ import { SolaceSessionProvider } from './solace-session-provider'; import { ObserveCaptor } from '@scion/toolkit/testing'; import { TestBed } from '@angular/core/testing'; import { NgZone } from '@angular/core'; -import { Destination, DestinationType, Message, MessageConsumer, MessageConsumerEventName, MessageConsumerProperties, MessageDeliveryModeType, MessageType, QueueType, SDTFieldType, Session, SessionEvent, SessionEventCode, SolaceError } from './solace.model'; +import { Destination, DestinationType, Message, MessageConsumer, MessageConsumerEventName, MessageConsumerProperties, MessageDeliveryModeType, MessageType, QueueBrowser, QueueBrowserEventName, QueueBrowserProperties, QueueType, SDTFieldType, Session, SessionEvent, SessionEventCode, SolaceError } from './solace.model'; import { SolaceObjectFactory } from './solace-object-factory'; import { asyncScheduler, noop } from 'rxjs'; import { UUID } from '@scion/toolkit/uuid'; @@ -24,7 +24,7 @@ describe('SolaceMessageClient', () => { factoryProperties.profile = solace.SolclientFactoryProfiles.version10; solace.SolclientFactory.init(factoryProperties); // Mock the Solace Session - session = createSpyObj('sessionClient', ['on', 'connect', 'subscribe', 'unsubscribe', 'send', 'dispose', 'disconnect', 'createMessageConsumer']); + session = createSpyObj('sessionClient', ['on', 'connect', 'subscribe', 'unsubscribe', 'send', 'dispose', 'disconnect', 'createMessageConsumer', 'createQueueBrowser']); // Capture Solace lifecycle hooks session.on.and.callFake((eventCode: SessionEventCode, callback: (event: SessionEvent | Message) => void) => { sessionEventCallbacks.set(eventCode, callback); @@ -1648,6 +1648,230 @@ describe('SolaceMessageClient', () => { })]); }); }); + + describe('SolaceMessageClient#browse$', () => { + + it('should connect to a queue if passing a queue \'string\' literal', async () => { + const solaceMessageClient = TestBed.inject(SolaceMessageClient); + + // Connect to the broker + solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'}); + await simulateLifecycleEvent(SessionEventCode.UP_NOTICE); + + // Connect to the queue browser + const queueBrowserMock = installQueueBrowserMock(); + solaceMessageClient.browse$('queue').subscribe(); + await drainMicrotaskQueue(); + + // Expect the queue browser to connect to the broker + expect(queueBrowserMock.queueBrowser.connect).toHaveBeenCalledTimes(1); + + // Simulate the queue browser to be connected to the broker + await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP); + + // Expect connected to the queue browser + expect(queueBrowserMock.queueBrowserProperties).toEqual({ + queueDescriptor: { + type: QueueType.QUEUE, name: 'queue', + }, + }); + }); + + it('should allow connecting to a queue endpoint passing a config', async () => { + const solaceMessageClient = TestBed.inject(SolaceMessageClient); + + // Connect to the broker + solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'}); + await simulateLifecycleEvent(SessionEventCode.UP_NOTICE); + + // Connect to the queue browser + const queueBrowserMock = installQueueBrowserMock(); + const properties: QueueBrowserProperties = { + queueDescriptor: { + type: QueueType.QUEUE, name: 'queue', + }, + }; + solaceMessageClient.browse$(properties).subscribe(); + await drainMicrotaskQueue(); + + // Expect the queue browser to connect to the broker + expect(queueBrowserMock.queueBrowser.connect).toHaveBeenCalledTimes(1); + + // Simulate the queue browser to be connected to the broker + await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP); + + // Expect connected to the queue browser + expect(queueBrowserMock.queueBrowserProperties).toEqual(properties); + }); + + it('should allow browsing messages', async () => { + const solaceMessageClient = TestBed.inject(SolaceMessageClient); + + // Connect to the broker + solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'}); + await simulateLifecycleEvent(SessionEventCode.UP_NOTICE); + + // Connect to the queue browser + const queueBrowserMock = installQueueBrowserMock(); + const messageCaptor = new ObserveCaptor(); + solaceMessageClient.browse$('queue').subscribe(messageCaptor); + await drainMicrotaskQueue(); + + // Simulate the queue browser to be connected to the broker + await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP); + + // Simulate to receive a message + const msg1 = createQueueMessage('queue'); + const msg2 = createQueueMessage('queue'); + const msg3 = createQueueMessage('queue'); + + await queueBrowserMock.simulateMessage(msg1); + await queueBrowserMock.simulateMessage(msg2); + await queueBrowserMock.simulateMessage(msg3); + + expect(messageCaptor.getValues()).toEqual([ + jasmine.objectContaining({message: msg1}), + jasmine.objectContaining({message: msg2}), + jasmine.objectContaining({message: msg3}), + ]); + }); + + it('should receive messages inside the Angular zone', async () => { + const solaceMessageClient = TestBed.inject(SolaceMessageClient); + + // Connect to the broker + solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'}); + await simulateLifecycleEvent(SessionEventCode.UP_NOTICE); + + // Connect to the queue browser + const queueBrowserMock = installQueueBrowserMock(); + let receivedMessageInsideAngularZone; + solaceMessageClient.browse$('queue').subscribe(() => { + receivedMessageInsideAngularZone = NgZone.isInAngularZone(); + }); + await drainMicrotaskQueue(); + + // Simulate the queue browser to be connected to the broker + await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP); + + // Simulate to receive a message + await queueBrowserMock.simulateMessage(createQueueMessage('queue')); + + // Expect message to be received inside the Angular zone + expect(receivedMessageInsideAngularZone).toBeTrue(); + }); + + it('should start the queue browser when connected to the broker (UP)', async () => { + const solaceMessageClient = TestBed.inject(SolaceMessageClient); + + // Connect to the broker + solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'}); + await simulateLifecycleEvent(SessionEventCode.UP_NOTICE); + + // Connect to the queue browser + const queueBrowserMock = installQueueBrowserMock(); + solaceMessageClient.browse$('queue').subscribe(); + await drainMicrotaskQueue(); + + // Expect the queue browser to connect to the broker + expect(queueBrowserMock.queueBrowser.connect).toHaveBeenCalledTimes(1); + + // Simulate the queue browser to be connected to the broker + await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP, new Error()); + + // Expect the queue browser to be started + expect(queueBrowserMock.queueBrowser.start).toHaveBeenCalledTimes(1); + }); + + it('should error on connection error (CONNECT_FAILED_ERROR)', async () => { + const solaceMessageClient = TestBed.inject(SolaceMessageClient); + + // Connect to the broker + solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'}); + await simulateLifecycleEvent(SessionEventCode.UP_NOTICE); + + // Connect to the queue browser + const queueBrowserMock = installQueueBrowserMock(); + const messageCaptor = new ObserveCaptor(); + solaceMessageClient.browse$('queue').subscribe(messageCaptor); + await drainMicrotaskQueue(); + + // Expect the queue browser to connect to the broker + expect(queueBrowserMock.queueBrowser.connect).toHaveBeenCalledTimes(1); + + // Simulate that the connection cannot be established + await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.CONNECT_FAILED_ERROR, new Error()); + + // Expect the Observable to error + expect(messageCaptor.hasErrored()).toBeTrue(); + + // Expect to disconnect from the broker + expect(queueBrowserMock.queueBrowser.stop).toHaveBeenCalledTimes(1); + expect(queueBrowserMock.queueBrowser.disconnect).toHaveBeenCalledTimes(1); + }); + + it('should complete the Observable when the connection goes down (DOWN), e.g., after a successful session disconnect', async () => { + const solaceMessageClient = TestBed.inject(SolaceMessageClient); + + // Connect to the broker + solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'}); + await simulateLifecycleEvent(SessionEventCode.UP_NOTICE); + + // Connect to the queue browser + const queueBrowserMock = installQueueBrowserMock(); + const messageCaptor = new ObserveCaptor(); + solaceMessageClient.browse$('queue').subscribe(messageCaptor); + await drainMicrotaskQueue(); + + // Expect the queue browser to connect to the broker + expect(queueBrowserMock.queueBrowser.connect).toHaveBeenCalledTimes(1); + + // Simulate the queue browser to be connected to the broker + await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP); + + // Simulate connection going down + await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.DOWN, new Error()); + + // Expect the Observable to complete + expect(messageCaptor.hasCompleted()).toBeTrue(); + }); + + it('should provide headers contained in the message', async () => { + const solaceMessageClient = TestBed.inject(SolaceMessageClient); + + // Connect to the broker + solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'}); + await simulateLifecycleEvent(SessionEventCode.UP_NOTICE); + + // Connect to the queue browser + const queueBrowserMock = installQueueBrowserMock(); + const messageCaptor = new ObserveCaptor(); + solaceMessageClient.browse$('queue').subscribe(messageCaptor); + await drainMicrotaskQueue(); + + // Simulate the queue browser to be connected to the broker + await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP); + + // Simulate to receive a message + const message = createQueueMessage('queue'); + const userPropertyMap = SolaceObjectFactory.createSDTMapContainer(); + userPropertyMap.addField('key1', SDTFieldType.STRING, 'value'); + userPropertyMap.addField('key2', SDTFieldType.BOOL, true); + userPropertyMap.addField('key3', SDTFieldType.INT32, 123); + message.setUserPropertyMap(userPropertyMap); + + await queueBrowserMock.simulateMessage(message); + + // Expect headers to be contained in the envelope + expect(messageCaptor.getValues()).toEqual([jasmine.objectContaining({ + message: message, + headers: new Map() + .set('key1', 'value') + .set('key2', true) + .set('key3', 123), + })]); + }); + }); } /** @@ -1690,6 +1914,12 @@ describe('SolaceMessageClient', () => { return message; } + function createQueueMessage(queue: string): Message { + const message = SolaceObjectFactory.createMessage(); + message.setDestination(SolaceObjectFactory.createDurableQueueDestination(queue)); + return message; + } + /** * Captures the most recent invocation to {@link Session.subscribe}. */ @@ -1733,6 +1963,13 @@ describe('SolaceMessageClient', () => { function installMessageConsumerMock(): MessageConsumerMock { return new MessageConsumerMock(session); } + + /** + * Installs a queue browser mock; + */ + function installQueueBrowserMock(): QueueBrowserMock { + return new QueueBrowserMock(session); + } }); class SessionSubscribeCaptor { @@ -1839,3 +2076,62 @@ class MessageConsumerMock { await drainMicrotaskQueue(); } } + +class QueueBrowserMock { + + private _callbacks = new Map void>(); + + public queueBrowser: SpyObj; + public queueBrowserProperties: QueueBrowserProperties; + + constructor(session: SpyObj) { + this.queueBrowser = createSpyObj('queueBrowser', ['on', 'connect', 'disconnect', 'start', 'stop']); + + // Configure session to return a queue browser mock and capture the passed config + session.createQueueBrowser.and.callFake((queueBrowserProperties: QueueBrowserProperties) => { + this.queueBrowserProperties = queueBrowserProperties; + return this.queueBrowser; + }); + + // Capture Solace lifecycle hooks + this.queueBrowser.on.and.callFake((eventName: QueueBrowserEventName, callback: (event: Message | SolaceError | void) => void) => { + this._callbacks.set(eventName, callback); + }); + + this.installMockDefaultBehavior(); + } + + private installMockDefaultBehavior(): void { + // Fire 'DOWN' event when invoking 'disconnect' + this.queueBrowser.disconnect.and.callFake(() => { + this.simulateLifecycleEvent(QueueBrowserEventName.DOWN); + this.simulateLifecycleEvent(QueueBrowserEventName.DISPOSED); + }); + } + + /** + * Simulates the Solace message broker to send a message to the Solace queue browser. + */ + public async simulateLifecycleEvent(eventName: QueueBrowserEventName, error?: SolaceError): Promise { + await drainMicrotaskQueue(); + + const callback = this._callbacks.get(eventName); + if (!callback) { + throw Error(`[SpecError] No callback registered for event '${eventName}'`); + } + callback && callback(error); + await drainMicrotaskQueue(); + } + + /** + * Simulates the Solace message broker to publish a message to the Solace queue browser. + */ + public async simulateMessage(message: Message): Promise { + const callback = this._callbacks.get(QueueBrowserEventName.MESSAGE); + if (!callback) { + throw Error(`[SpecError] No callback registered for event '${QueueBrowserEventName.MESSAGE}'`); + } + callback && callback(message); + await drainMicrotaskQueue(); + } +} diff --git a/projects/solace-message-client/src/lib/solace-message-client.ts b/projects/solace-message-client/src/lib/solace-message-client.ts index eb3045f..1596b80 100644 --- a/projects/solace-message-client/src/lib/solace-message-client.ts +++ b/projects/solace-message-client/src/lib/solace-message-client.ts @@ -1,7 +1,7 @@ // tslint:disable:no-redundant-jsdoc import { Injectable } from '@angular/core'; import { BehaviorSubject, EMPTY, noop, Observable, OperatorFunction } from 'rxjs'; -import { Message, MessageConsumerProperties, MessageDeliveryModeType, MessageType, SDTField, Session } from './solace.model'; +import { Message, MessageConsumerProperties, MessageDeliveryModeType, MessageType, QueueBrowserProperties, SDTField, Session } from './solace.model'; import { map } from 'rxjs/operators'; import { SolaceMessageClientConfig } from './solace-message-client.config'; @@ -152,6 +152,14 @@ export abstract class SolaceMessageClient { */ public abstract consume$(topicOrDescriptor: string | MessageConsumerProperties): Observable; + /** + * Browses messages in a queue, without removing/consuming the messages. + * + * @param queueOrDescriptor - Specifies the queue to browse, or a descriptor object describing how to connect to the queue browser. + * @return Observable that emits spooled messages in the specified queue. The Observable never completes. If not connected to the broker yet, or if the connect attempt failed, the Observable errors. + */ + public abstract browse$(queueOrDescriptor: string | QueueBrowserProperties): Observable; + /** * Publishes a message to the given topic destination. The message is transported to all consumers subscribed to the topic. * @@ -247,6 +255,10 @@ export class NullSolaceMessageClient implements SolaceMessageClient { return EMPTY; } + public browse$(queueOrDescriptor: string | QueueBrowserProperties): Observable { + return EMPTY; + } + public publish(topic: string, data?: ArrayBufferLike | DataView | string | SDTField | Message, options?: PublishOptions): Promise { return Promise.resolve(); } diff --git "a/projects/solace-message-client/src/lib/\311\265solace-message-client.ts" "b/projects/solace-message-client/src/lib/\311\265solace-message-client.ts" index 3d1b906..6229f2c 100644 --- "a/projects/solace-message-client/src/lib/\311\265solace-message-client.ts" +++ "b/projects/solace-message-client/src/lib/\311\265solace-message-client.ts" @@ -8,7 +8,7 @@ import { TopicMatcher } from './topic-matcher'; import { observeInside } from '@scion/toolkit/operators'; import { SolaceSessionProvider } from './solace-session-provider'; import { SolaceMessageClientConfig } from './solace-message-client.config'; -import { Destination, Message, MessageConsumer, MessageConsumerEventName, MessageConsumerProperties, MessageDeliveryModeType, OperationError, QueueType, SDTField, SDTFieldType, Session, SessionEvent, SessionEventCode, SessionProperties } from './solace.model'; +import { Destination, Message, MessageConsumer, MessageConsumerEventName, MessageConsumerProperties, MessageDeliveryModeType, OperationError, QueueBrowser, QueueBrowserEventName, QueueBrowserProperties, QueueType, SDTField, SDTFieldType, Session, SessionEvent, SessionEventCode, SessionProperties } from './solace.model'; import { TopicSubscriptionCounter } from './topic-subscription-counter'; import { SerialExecutor } from './serial-executor.service'; import { SolaceObjectFactory } from './solace-object-factory'; @@ -355,6 +355,80 @@ export class ɵSolaceMessageClient implements SolaceMessageClient, OnDestroy { / ); } + public browse$(queueOrDescriptor: string | QueueBrowserProperties): Observable { + if (queueOrDescriptor === undefined) { + throw Error('[SolaceMessageClient] Missing required queue or descriptor.'); + } + + // If passed a `string` literal, connect to the given queue using default 'browsing' options. + if (typeof queueOrDescriptor === 'string') { + return this.createQueueBrowser$({ + queueDescriptor: {type: QueueType.QUEUE, name: queueOrDescriptor}, + }); + } + + return this.createQueueBrowser$(queueOrDescriptor); + } + + private createQueueBrowser$(queueBrowserProperties: QueueBrowserProperties): Observable { + return new Observable((observer: Observer): TeardownLogic => { + let queueBrowser: QueueBrowser = undefined; + let disposed = false; + this.session + .then(session => { + queueBrowser = session.createQueueBrowser(queueBrowserProperties); + + // Define browser event listeners + queueBrowser.on(QueueBrowserEventName.UP, () => { + console.debug(`[SolaceMessageClient] solclientjs queue browser event: QueueBrowserEventName.UP`); // tslint:disable-line:no-console + queueBrowser.start(); + }); + queueBrowser.on(QueueBrowserEventName.CONNECT_FAILED_ERROR, (error: OperationError) => { + console.debug(`[SolaceMessageClient] solclientjs queue browser event: QueueBrowserEventName.CONNECT_FAILED_ERROR`, error); // tslint:disable-line:no-console + observer.error(error); + }); + queueBrowser.on(QueueBrowserEventName.DOWN_ERROR, (error: OperationError) => { + console.debug(`[SolaceMessageClient] solclientjs queue browser event: QueueBrowserEventName.DOWN_ERROR`, error); // tslint:disable-line:no-console + observer.error(error); + }); + queueBrowser.on(QueueBrowserEventName.DOWN, (error: OperationError) => { // event emitted after successful disconnect request + console.debug(`[SolaceMessageClient] solclientjs queue browser event: QueueBrowserEventName.DOWN`, error); // tslint:disable-line:no-console + observer.complete(); + }); + queueBrowser.on(QueueBrowserEventName.DISPOSED, (error: OperationError) => { // event emitted after successful disconnect request + console.debug(`[SolaceMessageClient] solclientjs queue browser event: QueueBrowserEventName.DOWN`, error); // tslint:disable-line:no-console + disposed = true; + observer.complete(); + }); + + // Define browser event listener + queueBrowser.on(QueueBrowserEventName.MESSAGE, (message: Message) => { + console.debug(`[SolaceMessageClient] solclientjs queue browser event: QueueBrowserEventName.MESSAGE`, message); // tslint:disable-line:no-console + NgZone.assertNotInAngularZone(); + observer.next(message); + }); + + // Connect the browser + queueBrowser.connect(); + }) + .catch(error => { + observer.error(error); + }); + + return (): void => { + // Initiate an orderly disconnection of the browser. In turn, we will receive a `QueueBrowserEventName#DOWN` event and dispose the consumer. + if (queueBrowser && !disposed) { + queueBrowser.stop(); + queueBrowser.disconnect(); + } + }; + }) + .pipe( + mapToMessageEnvelope(), + observeInside(continueFn => this._zone.run(continueFn)), + ); + } + public publish(topic: string, data?: Data | Message, options?: PublishOptions): Promise { const destination = SolaceObjectFactory.createTopicDestination(topic); return this.sendToDestination(destination, data, options);