Skip to content

Commit

Permalink
feat: Implement Google PubSub receiver
Browse files Browse the repository at this point in the history
TODO:

- [ ] Rename custom attributes to match original CE names.
- [ ] Implement
  • Loading branch information
gnarea committed Jul 7, 2023
1 parent afa29b5 commit de66de9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
6 changes: 5 additions & 1 deletion src/lib/googlePubSub.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { CloudEvent, EmitterFunction } from 'cloudevents';
import type { CloudEvent, EmitterFunction, type CloudEventV1, type Headers } from 'cloudevents';
import { PubSub } from '@google-cloud/pubsub';
import { google } from '@google-cloud/pubsub/build/protos/protos.js';
import { getUnixTime } from 'date-fns';
Expand Down Expand Up @@ -73,3 +73,7 @@ export function makeGooglePubSubEmitter(): EmitterFunction {
await topic.publishMessage(message);
};
}

export function convertGooglePubSubMessage(headers: Headers, body: Buffer): CloudEventV1<Buffer> {
throw new Error(`Not implemented${headers}${body}`);
}
17 changes: 17 additions & 0 deletions src/lib/receivers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ jest.unstable_mockModule('./ceBinary.js', () => {
return { convertCeBinaryMessage: mockCeBinaryConverter };
});

const mockGooglePubSubConverter = Symbol('mockGooglePubSubConverter');
let wasGooglePubSubImported = false;
jest.unstable_mockModule('./googlePubSub.ts', () => {
wasGooglePubSubImported = true;
return {
convertGooglePubSubMessage: mockGooglePubSubConverter,
};
});

const { makeReceiver } = await import('./receivers.js');

describe('makeReceiver', () => {
Expand All @@ -15,12 +24,20 @@ describe('makeReceiver', () => {

await makeReceiver('ce-http-binary');
expect(wasCeImported).toBeTrue();
expect(wasGooglePubSubImported).toBeFalse();

await makeReceiver('google-pubsub');
expect(wasGooglePubSubImported).toBeTrue();
});

test('CloudEvents binary receiver should be returned if requested', async () => {
await expect(makeReceiver('ce-http-binary')).resolves.toBe(mockCeBinaryConverter);
});

test('Google PubSub receiver should be returned if requested', async () => {
await expect(makeReceiver('google-pubsub')).resolves.toBe(mockGooglePubSubConverter);
});

test('Unsupported transport should be refused', async () => {
await expect(makeReceiver('unsupported')).rejects.toThrowWithMessage(
Error,
Expand Down
3 changes: 3 additions & 0 deletions src/lib/receivers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ export async function makeReceiver(transport: string): Promise<Receiver> {
if (transport === 'ce-http-binary') {
const { convertCeBinaryMessage } = await import('./ceBinary.js');
receiver = convertCeBinaryMessage;
} else if (transport === 'google-pubsub') {
const { convertGooglePubSubMessage } = await import('./googlePubSub.js');
receiver = convertGooglePubSubMessage;
} else {
throw new Error(`Unsupported receiver type (${transport})`);
}
Expand Down

0 comments on commit de66de9

Please sign in to comment.