sqsiphon provides a framework for writing Amazon SQS polling applications. It is designed to poll, and process messages, as quickly as possible. FIFO queues are supported.
- sqsiphon polls for up to 10 messages (the SQS allowed maximum).
- The retrieved messages are inspected to determine if any are tagged as being members of a FIFO queue.
- Messages that are not tagged for a FIFO queue are placed into a general
processing batch. Any FIFO tagged messages are added to a batch specifically
for the tagged FIFO queue. For example, consider a poll event returns three
messages: message
A
is untagged, messageB
is tagged for "foo", and messageC
is tagged for "bar". MessageA
will be put on the general processing batch and two new batches will be created: "foo" and "bar", each with one message added for processing. - All available processing batches are processed: the general batch's messages are processed concurrently, and each FIFO batch is processed sequentially.
- Messages that generate an error during processing are left on the queue.
FIFO Errors: When a message on a FIFO queue cannot be processed successfully, the message, and any remaining messages in the batch, will be left on the queue. It is recommened that a corresponding dead letter queue be configured so that these messages will be moved there by SQS.
const { SQS } = require('aws-sdk');
const sqs = new SQS({
apiVersion: '2012-11-05',
region: 'us-east-1'
});
const sqsiphonFactory = require('@knockaway/sqsiphon');
const app = sqsiphonFactory({
sqs,
queueUrl: 'url for the sqs queue',
handler: messageHandler
});
function shutdown(signal) {
app.stop();
}
['SIGTERM', 'SIGINT'].forEach(signal => process.on(signal, shutdown));
if ((require.main === module) === true) {
app.start();
}
async function messageHandler(message) {
// `message` is an SQS message as returned by the `aws-sdk`
// ...
// Do something with the message or `throw Error('failed')`
}
This module exports a factory function which accepts an options object with the following properties:
logger
(optional): An object that follows the Log4j logger interface. The default is an instance ofabstract-logging
.sqs
(required): An instance ofSQS
from theaws-sdk
.queueUrl
(required): A string URL pointing to the SQS instance to poll.handler
(required): A function to handle received messages. Must be anasync function
. The function will receive one parameter:message
. The parameter is an instance of SQS Message. If the message cannot be processed for any reason, an instance ofError
should be thrown. If no error is thrown, the message has been considered to be successfully processed.tracer
(optional): an OpenTracing compliant tracer instance.receiveMessageParameters
(optional): an object that conforms to the object described bysqs.receiveMessage
. The default has:AttributeNames: ['All']
,MaxNumberOfMessages: 10
,MessageAttributeNames: ['All']
, andVisibilityTimeout: 30
. TheQueueUrl
is always overridden by the passed inqueueUrl
value.
The factory returns an application instance. The application instance is an event emitter.
isRunning
(boolean): indicates if the application is polling for messages or not.start()
: initiates the application to start polling for messages.stop()
: initiates the application to stop polling for messages. Any messages currently being processed will be completed.
error
: fired when an unexpected error occurs. Receives anError
object.request-error
: fired when a communication error occurs. Receives anError
object.processing-error
: fired when an error occurs while processing a message. Receives an object witherror
andmessage
properties.fifo-processing-aborted
: fired when a FIFO batch stop processing due to an error. Receives an object withmessage
andmessages
properties. This event will fire subsequent to aprocessing-error
event.received-messages
: fired when a new batch of messages has been received. Receives an array of SQS message objects.handled-message
: fired when a message has been successfully handled. Receives an SQS message object.