Skip to content

Commit 6d1e29c

Browse files
committed
feat: sqs + direct mode
1 parent 5948893 commit 6d1e29c

File tree

3 files changed

+41
-3
lines changed

3 files changed

+41
-3
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ SpaceCat Task Processor is a Node.js service that processes messages from the AW
2727

2828
## Usage
2929
- The service is designed to run as a serverless function or background worker.
30-
- It listens for messages on the SQS queue and processes them automatically.
30+
- It can be invoked in two ways:
31+
- **SQS mode:** listens to the `SPACECAT-TASK-PROCESSOR-JOBS` queue and processes messages automatically (default path for existing workflows).
32+
- **Direct mode:** the Lambda entrypoint auto-detects single-message payloads (e.g., from AWS Step Functions) and executes the corresponding handler synchronously. This is used by the new agent workflows to obtain immediate results before triggering follow-up actions.
3133

3234
## Development
3335
- To run tests:

src/index.js

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ function getElapsedSeconds(startTime) {
5454
* @param {UniversalContext} context the context of the universal serverless function
5555
* @returns {Response} a response
5656
*/
57-
async function run(message, context) {
57+
async function processTask(message, context) {
5858
const { log } = context;
5959
const { type, siteId } = message;
6060

@@ -80,9 +80,30 @@ async function run(message, context) {
8080
}
8181
}
8282

83-
export const main = wrap(run)
83+
const runSQS = wrap(processTask)
8484
.with(dataAccess)
8585
.with(sqsEventAdapter)
8686
.with(imsClientWrapper)
8787
.with(secrets, { name: getSecretName })
8888
.with(helixStatus);
89+
90+
const runDirect = wrap(processTask)
91+
.with(dataAccess)
92+
.with(imsClientWrapper)
93+
.with(secrets, { name: getSecretName })
94+
.with(helixStatus);
95+
96+
function isSqsEvent(event, context) {
97+
if (Array.isArray(event?.Records)) {
98+
return true;
99+
}
100+
if (Array.isArray(context?.invocation?.event?.Records)) {
101+
return true;
102+
}
103+
return typeof event?.type !== 'string';
104+
}
105+
106+
export const main = async (event, context) => {
107+
const handler = isSqsEvent(event, context) ? runSQS : runDirect;
108+
return handler(event, context);
109+
};

test/index.test.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,4 +180,19 @@ describe('Index Tests', () => {
180180
expect(resp.status).to.equal(500); // Should return internal server error
181181
expect(context.log.error.calledWithMatch(sinon.match('demo-url-processor task for test-site failed after'))).to.be.true;
182182
});
183+
184+
it('processes direct invocation events without SQS adapter', async () => {
185+
const directContext = {
186+
...context,
187+
invocation: undefined,
188+
};
189+
const directEvent = {
190+
type: 'dummy',
191+
siteId: 'direct-site',
192+
};
193+
194+
const resp = await main(directEvent, directContext);
195+
expect(resp.status).to.equal(200);
196+
expect(directContext.log.info.calledWith('Found task handler for type: dummy')).to.be.true;
197+
});
183198
});

0 commit comments

Comments
 (0)