Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ If you would like to extend the functionality of the existing PubSub client or c

Here is an overview of the functions provided by the PubSubClient class:

- `initialize()` - Initializes a WebSocket connection to the specified URL.
- `initialize()` - Initializes a WebSocket connection to the specified URL. By default, it points to 'ws://localhost:8080'.

- `publish(topic, message)` - Publishes a message to the specified topic.

Expand Down
9 changes: 9 additions & 0 deletions src/App.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ export default class App extends Base {
process.env.REPORTINGID = reportingId;
process.env.STANDALONE = standalone;
process.env.STANDALONE_PREFIX = standalonePrefix;

// Set the pubSub URL if present
process.env.PUB_SUB_URL = new URLSearchParams(window.location.search).get('pubSubUrl');

if (platform) {
process.env.PLATFORM = platform;
} else {
Expand Down Expand Up @@ -407,6 +411,11 @@ export default class App extends Base {
logger.error('No Mac Address Found in Parameter Initialization response...', 'getParameterInitializationValues');
}

// Set the pubSub URL if present
if (query.params.pubSubUrl) {
process.env.PUB_SUB_URL = query.params.pubSubUrl;
}

if (query.task) {
setTimeout(() => {
const intentReader = new IntentReader();
Expand Down
49 changes: 33 additions & 16 deletions src/pubSubClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
*
* SPDX-License-Identifier: Apache-2.0
*/

const logger = require('../src/utils/Logger')('pubSubClient.js');
require('dotenv').config({ override: true });

const defaultWsUrl = 'ws://your-ws-url-here.com';

class PubSubClient {
constructor() {
this.ws = null;
this.url = defaultWsUrl;
this.url = process.env.PUB_SUB_URL ? process.env.PUB_SUB_URL : 'ws://localhost:8080';
this.PUBSUB_SUBSCRIBE_TOPIC_SUFFIX = '_FCS';
this.PUBSUB_PUBLISH_TOPIC_SUFFIX = '_FCA';
}
Expand All @@ -35,6 +32,7 @@ class PubSubClient {
const appUrl = window.location;
const pubSubTopicUUID = new URLSearchParams(appUrl.search).get('pubsub_uuid');
const macAddress = process.env.MACADDRESS;
const appId = process.env.CURRENT_APPID;

// Priority #1: Use pubSubTopicUUID if it's available
if (pubSubTopicUUID) {
Expand All @@ -51,12 +49,12 @@ class PubSubClient {
console.warn(`WARNING: No pubsub_uuid query parameter or MAC address found. Using default value: ${pubSubTopic}`);
}

process.env.PUBSUB_SUBSCRIBE_TOPIC = pubSubTopic + this.PUBSUB_SUBSCRIBE_TOPIC_SUFFIX;
process.env.PUBSUB_PUBLISH_TOPIC = pubSubTopic + this.PUBSUB_PUBLISH_TOPIC_SUFFIX;
process.env.PUBSUB_SUBSCRIBE_TOPIC = pubSubTopic + '_' + appId + this.PUBSUB_SUBSCRIBE_TOPIC_SUFFIX;
process.env.PUBSUB_PUBLISH_TOPIC = pubSubTopic + '_' + appId + this.PUBSUB_PUBLISH_TOPIC_SUFFIX;

// Establish WS Connection
this.ws = new WebSocket(this.url);
logger.info('Establishing a WS connection...', 'initialize');
logger.info(`Establishing a WS connection to ${this.url}...`, 'initialize');

return new Promise((resolve, reject) => {
this.ws.addEventListener('open', (event) => {
Expand All @@ -65,13 +63,13 @@ class PubSubClient {
});

this.ws.addEventListener('error', (event) => {
if (this.url === defaultWsUrl) {
logger.error('WARNING: WebSocket connections will fail to initialize. The file has not been properly configured. Please update the URL to point to your WebSocket server for communication to work.');
} else {
logger.error('Failed to initialize a WS connection...', 'initialize');
}
logger.info('Failed to initialize a WS connection...', event);
this.ws = null; // Ensure ws is null if connection fails
reject(false);
});
}).catch((error) => {
logger.info('Continuing without PubSub due to WS connection failure.');
return false;
});
}

Expand All @@ -82,6 +80,11 @@ class PubSubClient {
return false;
}

if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
logger.error('WS connection is not open. Cannot publish message.');
return false;
}

const publishMsg = {
operation: 'pub',
topic,
Expand All @@ -92,7 +95,7 @@ class PubSubClient {

// If headers are passed in, add them to the payload object
if (headers) {
payload.payload.headers = headers;
publishMsg.payload.headers = headers;
}

logger.info('Publishing message: ', JSON.stringify(publishMsg));
Expand All @@ -109,6 +112,11 @@ class PubSubClient {

// Subscribe to a topic
subscribe(topic, callback) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
logger.error('WS connection is not open. Cannot subscribe to topic.');
return false;
}

const subscribeMsg = {
operation: 'sub',
topic,
Expand Down Expand Up @@ -146,6 +154,11 @@ class PubSubClient {

// Unsubscribe to a topic
unsubscribe(topic) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
logger.error('WS connection is not open. Cannot unsubscribe from topic.');
return false;
}

const payload = {
operation: 'unsub',
topic,
Expand All @@ -164,8 +177,8 @@ class PubSubClient {
// Checks WebSocket connection status
isConnected() {
let status = false;
if (this.ws && this.ws.readyState == this.ws.OPEN) {
logger.info('WS connection already Established');
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
logger.info('WS connection already established');
status = true;
}
return status;
Expand All @@ -174,7 +187,11 @@ class PubSubClient {

const getClient = async () => {
const pubSubClient = new PubSubClient();
await pubSubClient.initialize();
try {
await pubSubClient.initialize();
} catch (error) {
logger.error(error);
}
return pubSubClient;
};

Expand Down