Skip to content

Commit

Permalink
feat(solace-message-client): allow browsing messages in a queue, with…
Browse files Browse the repository at this point in the history
…out removing/consuming the messages

Added following API for browsing messages sent to a queue: `SolaceMessageClient#browse$`
  • Loading branch information
GreenRover committed Aug 20, 2021
1 parent b377ed5 commit c3bc031
Show file tree
Hide file tree
Showing 5 changed files with 393 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<mat-option [value]="SubscriptionDestinationType.TOPIC" [matTooltip]="tooltips.topic" matTooltipClass="large">Topic</mat-option>
<mat-option [value]="SubscriptionDestinationType.QUEUE" [matTooltip]="tooltips.queue" matTooltipClass="large">Queue</mat-option>
<mat-option [value]="SubscriptionDestinationType.TOPIC_ENDPOINT" [matTooltip]="tooltips.topicEndpoint" matTooltipClass="large">Topic Endpoint</mat-option>
<mat-option [value]="SubscriptionDestinationType.QUEUE_BROWSER" [matTooltip]="tooltips.queueBrowser" matTooltipClass="large">Queue Browser</mat-option>
</mat-select>
</mat-form-field>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ export class SubscriberComponent implements OnDestroy {
case SubscriptionDestinationType.TOPIC_ENDPOINT: {
return this.solaceMessageClient.consume$(destination);
}
case SubscriptionDestinationType.QUEUE_BROWSER: {
return this.solaceMessageClient.browse$(destination);
}
default: {
throw Error(`[UnsupportedDestinationError] Expected '${SubscriptionDestinationType.TOPIC}', '${SubscriptionDestinationType.QUEUE}', or '${SubscriptionDestinationType.TOPIC_ENDPOINT}', but was ${destinationType}`);
throw Error(`[UnsupportedDestinationError] Expected '${SubscriptionDestinationType.TOPIC}', '${SubscriptionDestinationType.QUEUE}', '${SubscriptionDestinationType.TOPIC_ENDPOINT}', or '${SubscriptionDestinationType.QUEUE_BROWSER}', but was ${destinationType}`);
}
}
})();
Expand Down Expand Up @@ -139,11 +142,13 @@ export class SubscriberComponent implements OnDestroy {
Unlike a durable endpoint, the lifecycle of a non-durable endpoint (also known as a private, temporary endpoint) is bound to the client that created it, with an additional 60s in case of unexpected disconnect.
Topic endpoints in particular are useful for not losing messages in the event of short connection interruptions as messages are retained on the broker until they are consumed.`,
queueBrowser: `Browses messages in a queue, without removing/consuming the messages.`,
};
}

export enum SubscriptionDestinationType {
QUEUE = 'QUEUE',
TOPIC = 'TOPIC',
TOPIC_ENDPOINT = 'TOPIC_ENDPOINT',
QUEUE_BROWSER = 'QUEUE_BROWSER',
}
300 changes: 298 additions & 2 deletions projects/solace-message-client/src/lib/solace-message-client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { SolaceSessionProvider } from './solace-session-provider';
import { ObserveCaptor } from '@scion/toolkit/testing';
import { TestBed } from '@angular/core/testing';
import { NgZone } from '@angular/core';
import { Destination, DestinationType, Message, MessageConsumer, MessageConsumerEventName, MessageConsumerProperties, MessageDeliveryModeType, MessageType, QueueType, SDTFieldType, Session, SessionEvent, SessionEventCode, SolaceError } from './solace.model';
import { Destination, DestinationType, Message, MessageConsumer, MessageConsumerEventName, MessageConsumerProperties, MessageDeliveryModeType, MessageType, QueueBrowser, QueueBrowserEventName, QueueBrowserProperties, QueueType, SDTFieldType, Session, SessionEvent, SessionEventCode, SolaceError } from './solace.model';
import { SolaceObjectFactory } from './solace-object-factory';
import { asyncScheduler, noop } from 'rxjs';
import { UUID } from '@scion/toolkit/uuid';
Expand All @@ -24,7 +24,7 @@ describe('SolaceMessageClient', () => {
factoryProperties.profile = solace.SolclientFactoryProfiles.version10;
solace.SolclientFactory.init(factoryProperties);
// Mock the Solace Session
session = createSpyObj('sessionClient', ['on', 'connect', 'subscribe', 'unsubscribe', 'send', 'dispose', 'disconnect', 'createMessageConsumer']);
session = createSpyObj('sessionClient', ['on', 'connect', 'subscribe', 'unsubscribe', 'send', 'dispose', 'disconnect', 'createMessageConsumer', 'createQueueBrowser']);
// Capture Solace lifecycle hooks
session.on.and.callFake((eventCode: SessionEventCode, callback: (event: SessionEvent | Message) => void) => {
sessionEventCallbacks.set(eventCode, callback);
Expand Down Expand Up @@ -1648,6 +1648,230 @@ describe('SolaceMessageClient', () => {
})]);
});
});

describe('SolaceMessageClient#browse$', () => {

it('should connect to a queue if passing a queue \'string\' literal', async () => {
const solaceMessageClient = TestBed.inject(SolaceMessageClient);

// Connect to the broker
solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'});
await simulateLifecycleEvent(SessionEventCode.UP_NOTICE);

// Connect to the queue browser
const queueBrowserMock = installQueueBrowserMock();
solaceMessageClient.browse$('queue').subscribe();
await drainMicrotaskQueue();

// Expect the queue browser to connect to the broker
expect(queueBrowserMock.queueBrowser.connect).toHaveBeenCalledTimes(1);

// Simulate the queue browser to be connected to the broker
await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP);

// Expect connected to the queue browser
expect(queueBrowserMock.queueBrowserProperties).toEqual({
queueDescriptor: {
type: QueueType.QUEUE, name: 'queue',
},
});
});

it('should allow connecting to a queue endpoint passing a config', async () => {
const solaceMessageClient = TestBed.inject(SolaceMessageClient);

// Connect to the broker
solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'});
await simulateLifecycleEvent(SessionEventCode.UP_NOTICE);

// Connect to the queue browser
const queueBrowserMock = installQueueBrowserMock();
const properties: QueueBrowserProperties = {
queueDescriptor: {
type: QueueType.QUEUE, name: 'queue',
},
};
solaceMessageClient.browse$(properties).subscribe();
await drainMicrotaskQueue();

// Expect the queue browser to connect to the broker
expect(queueBrowserMock.queueBrowser.connect).toHaveBeenCalledTimes(1);

// Simulate the queue browser to be connected to the broker
await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP);

// Expect connected to the queue browser
expect(queueBrowserMock.queueBrowserProperties).toEqual(properties);
});

it('should allow browsing messages', async () => {
const solaceMessageClient = TestBed.inject(SolaceMessageClient);

// Connect to the broker
solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'});
await simulateLifecycleEvent(SessionEventCode.UP_NOTICE);

// Connect to the queue browser
const queueBrowserMock = installQueueBrowserMock();
const messageCaptor = new ObserveCaptor<MessageEnvelope>();
solaceMessageClient.browse$('queue').subscribe(messageCaptor);
await drainMicrotaskQueue();

// Simulate the queue browser to be connected to the broker
await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP);

// Simulate to receive a message
const msg1 = createQueueMessage('queue');
const msg2 = createQueueMessage('queue');
const msg3 = createQueueMessage('queue');

await queueBrowserMock.simulateMessage(msg1);
await queueBrowserMock.simulateMessage(msg2);
await queueBrowserMock.simulateMessage(msg3);

expect(messageCaptor.getValues()).toEqual([
jasmine.objectContaining<MessageEnvelope>({message: msg1}),
jasmine.objectContaining<MessageEnvelope>({message: msg2}),
jasmine.objectContaining<MessageEnvelope>({message: msg3}),
]);
});

it('should receive messages inside the Angular zone', async () => {
const solaceMessageClient = TestBed.inject(SolaceMessageClient);

// Connect to the broker
solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'});
await simulateLifecycleEvent(SessionEventCode.UP_NOTICE);

// Connect to the queue browser
const queueBrowserMock = installQueueBrowserMock();
let receivedMessageInsideAngularZone;
solaceMessageClient.browse$('queue').subscribe(() => {
receivedMessageInsideAngularZone = NgZone.isInAngularZone();
});
await drainMicrotaskQueue();

// Simulate the queue browser to be connected to the broker
await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP);

// Simulate to receive a message
await queueBrowserMock.simulateMessage(createQueueMessage('queue'));

// Expect message to be received inside the Angular zone
expect(receivedMessageInsideAngularZone).toBeTrue();
});

it('should start the queue browser when connected to the broker (UP)', async () => {
const solaceMessageClient = TestBed.inject(SolaceMessageClient);

// Connect to the broker
solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'});
await simulateLifecycleEvent(SessionEventCode.UP_NOTICE);

// Connect to the queue browser
const queueBrowserMock = installQueueBrowserMock();
solaceMessageClient.browse$('queue').subscribe();
await drainMicrotaskQueue();

// Expect the queue browser to connect to the broker
expect(queueBrowserMock.queueBrowser.connect).toHaveBeenCalledTimes(1);

// Simulate the queue browser to be connected to the broker
await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP, new Error());

// Expect the queue browser to be started
expect(queueBrowserMock.queueBrowser.start).toHaveBeenCalledTimes(1);
});

it('should error on connection error (CONNECT_FAILED_ERROR)', async () => {
const solaceMessageClient = TestBed.inject(SolaceMessageClient);

// Connect to the broker
solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'});
await simulateLifecycleEvent(SessionEventCode.UP_NOTICE);

// Connect to the queue browser
const queueBrowserMock = installQueueBrowserMock();
const messageCaptor = new ObserveCaptor<MessageEnvelope>();
solaceMessageClient.browse$('queue').subscribe(messageCaptor);
await drainMicrotaskQueue();

// Expect the queue browser to connect to the broker
expect(queueBrowserMock.queueBrowser.connect).toHaveBeenCalledTimes(1);

// Simulate that the connection cannot be established
await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.CONNECT_FAILED_ERROR, new Error());

// Expect the Observable to error
expect(messageCaptor.hasErrored()).toBeTrue();

// Expect to disconnect from the broker
expect(queueBrowserMock.queueBrowser.stop).toHaveBeenCalledTimes(1);
expect(queueBrowserMock.queueBrowser.disconnect).toHaveBeenCalledTimes(1);
});

it('should complete the Observable when the connection goes down (DOWN), e.g., after a successful session disconnect', async () => {
const solaceMessageClient = TestBed.inject(SolaceMessageClient);

// Connect to the broker
solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'});
await simulateLifecycleEvent(SessionEventCode.UP_NOTICE);

// Connect to the queue browser
const queueBrowserMock = installQueueBrowserMock();
const messageCaptor = new ObserveCaptor<MessageEnvelope>();
solaceMessageClient.browse$('queue').subscribe(messageCaptor);
await drainMicrotaskQueue();

// Expect the queue browser to connect to the broker
expect(queueBrowserMock.queueBrowser.connect).toHaveBeenCalledTimes(1);

// Simulate the queue browser to be connected to the broker
await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP);

// Simulate connection going down
await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.DOWN, new Error());

// Expect the Observable to complete
expect(messageCaptor.hasCompleted()).toBeTrue();
});

it('should provide headers contained in the message', async () => {
const solaceMessageClient = TestBed.inject(SolaceMessageClient);

// Connect to the broker
solaceMessageClient.connect({url: 'some-url', vpnName: 'some-vpn'});
await simulateLifecycleEvent(SessionEventCode.UP_NOTICE);

// Connect to the queue browser
const queueBrowserMock = installQueueBrowserMock();
const messageCaptor = new ObserveCaptor<MessageEnvelope>();
solaceMessageClient.browse$('queue').subscribe(messageCaptor);
await drainMicrotaskQueue();

// Simulate the queue browser to be connected to the broker
await queueBrowserMock.simulateLifecycleEvent(QueueBrowserEventName.UP);

// Simulate to receive a message
const message = createQueueMessage('queue');
const userPropertyMap = SolaceObjectFactory.createSDTMapContainer();
userPropertyMap.addField('key1', SDTFieldType.STRING, 'value');
userPropertyMap.addField('key2', SDTFieldType.BOOL, true);
userPropertyMap.addField('key3', SDTFieldType.INT32, 123);
message.setUserPropertyMap(userPropertyMap);

await queueBrowserMock.simulateMessage(message);

// Expect headers to be contained in the envelope
expect(messageCaptor.getValues()).toEqual([jasmine.objectContaining<MessageEnvelope>({
message: message,
headers: new Map()
.set('key1', 'value')
.set('key2', true)
.set('key3', 123),
})]);
});
});
}

/**
Expand Down Expand Up @@ -1690,6 +1914,12 @@ describe('SolaceMessageClient', () => {
return message;
}

function createQueueMessage(queue: string): Message {
const message = SolaceObjectFactory.createMessage();
message.setDestination(SolaceObjectFactory.createDurableQueueDestination(queue));
return message;
}

/**
* Captures the most recent invocation to {@link Session.subscribe}.
*/
Expand Down Expand Up @@ -1733,6 +1963,13 @@ describe('SolaceMessageClient', () => {
function installMessageConsumerMock(): MessageConsumerMock {
return new MessageConsumerMock(session);
}

/**
* Installs a queue browser mock;
*/
function installQueueBrowserMock(): QueueBrowserMock {
return new QueueBrowserMock(session);
}
});

class SessionSubscribeCaptor {
Expand Down Expand Up @@ -1839,3 +2076,62 @@ class MessageConsumerMock {
await drainMicrotaskQueue();
}
}

class QueueBrowserMock {

private _callbacks = new Map<QueueBrowserEventName, (event: Message | SolaceError | void) => void>();

public queueBrowser: SpyObj<QueueBrowser>;
public queueBrowserProperties: QueueBrowserProperties;

constructor(session: SpyObj<Session>) {
this.queueBrowser = createSpyObj('queueBrowser', ['on', 'connect', 'disconnect', 'start', 'stop']);

// Configure session to return a queue browser mock and capture the passed config
session.createQueueBrowser.and.callFake((queueBrowserProperties: QueueBrowserProperties) => {
this.queueBrowserProperties = queueBrowserProperties;
return this.queueBrowser;
});

// Capture Solace lifecycle hooks
this.queueBrowser.on.and.callFake((eventName: QueueBrowserEventName, callback: (event: Message | SolaceError | void) => void) => {
this._callbacks.set(eventName, callback);
});

this.installMockDefaultBehavior();
}

private installMockDefaultBehavior(): void {
// Fire 'DOWN' event when invoking 'disconnect'
this.queueBrowser.disconnect.and.callFake(() => {
this.simulateLifecycleEvent(QueueBrowserEventName.DOWN);
this.simulateLifecycleEvent(QueueBrowserEventName.DISPOSED);
});
}

/**
* Simulates the Solace message broker to send a message to the Solace queue browser.
*/
public async simulateLifecycleEvent(eventName: QueueBrowserEventName, error?: SolaceError): Promise<void> {
await drainMicrotaskQueue();

const callback = this._callbacks.get(eventName);
if (!callback) {
throw Error(`[SpecError] No callback registered for event '${eventName}'`);
}
callback && callback(error);
await drainMicrotaskQueue();
}

/**
* Simulates the Solace message broker to publish a message to the Solace queue browser.
*/
public async simulateMessage(message: Message): Promise<void> {
const callback = this._callbacks.get(QueueBrowserEventName.MESSAGE);
if (!callback) {
throw Error(`[SpecError] No callback registered for event '${QueueBrowserEventName.MESSAGE}'`);
}
callback && callback(message);
await drainMicrotaskQueue();
}
}
Loading

0 comments on commit c3bc031

Please sign in to comment.