-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathstream.ts
50 lines (42 loc) · 1.98 KB
/
stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import { flags, SfdxCommand } from '@salesforce/command';
import { StreamingClient } from '@salesforce/core';
// import { Duration } from '@salesforce/kit';
import * as fs from 'fs-extra';
import { PlatformEvent } from '../../../shared/typeDefs';
const writeJSONOptions = {
spaces: 2
};
export default class EventStream extends SfdxCommand {
public static examples = [
'sfdx shane:events:stream -e SomeEvent__e // subscribe to an event stream',
'sfdx shane:events:stream -e SomeEvent__e -d myDir // stream events to myDir'
];
protected static flagsConfig = {
event: flags.string({ char: 'e', description: `the platform event's api name` }),
dir: flags.directory({ char: 'd', description: 'stream the events to a folder instead of the console' }),
replay: flags.integer({ char: 'r', description: 'replay Id to begin from', default: -1 })
};
protected static requiresUsername = true;
public async run(): Promise<any> {
const streamProcessor = message => {
this.ux.logJson(message);
return { completed: false };
};
const streamProcessorToFile = (message: PlatformEvent) => {
const filename = `${this.flags.dir}/${message.event.replayId}.json`;
fs.outputJSON(filename, message, writeJSONOptions, () => {});
return { completed: false };
};
const endpoint = `/event/${this.flags.event}`;
// create a client
const options = new StreamingClient.DefaultOptions(this.org, endpoint, this.flags.dir ? streamProcessorToFile : streamProcessor);
options.apiVersion = await this.org.retrieveMaxApiVersion();
// options.subscribeTimeout = new Duration(60 * 100);
const client = await StreamingClient.create(options);
client.replay(this.flags.replay);
await client.handshake();
await client.subscribe(async () => {
this.ux.log(`Listening on ${endpoint}... (ctrl-c to exit)`);
});
}
}