-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathstream.ts
54 lines (46 loc) · 2.11 KB
/
stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import { flags, SfdxCommand } from '@salesforce/command';
import { StreamingClient } from '@salesforce/core';
import * as fs from 'fs-extra';
import { CDCEvent } from '../../../shared/typeDefs';
import { replay, dir } from '../../../shared/flags';
const writeJSONOptions = {
spaces: 2
};
export default class CDCStream extends SfdxCommand {
public static examples = [
'sfdx shane:cdc:stream // get all the change events',
'sfdx shane:cdc:stream -o Account // get all the change events on a single object',
'sfdx shane:cdc:stream -d myDir // stream change events to myDir/cdc, organized into folders by object api type'
];
protected static flagsConfig = {
object: flags.string({
char: 'o',
description: 'subscribe to change events for only a single object (api name, including __c)'
}),
dir,
replay
};
protected static requiresUsername = true;
public async run(): Promise<any> {
const streamProcessor = message => {
this.ux.logJson(message);
return { completed: false };
};
const streamProcessorToFile = (message: CDCEvent) => {
const filename = `${this.flags.dir}/cdc/records/${message.payload.ChangeEventHeader.entityName}/${message.event.replayId}.json`;
fs.outputJSON(filename, message, writeJSONOptions, () => {});
return { completed: false };
};
const endpoint = this.flags.object ? `/data/${this.flags.object.replace('__c', '__')}ChangeEvent` : '/data/ChangeEvents';
// create a client
const options = new StreamingClient.DefaultOptions(this.org, endpoint, this.flags.dir ? streamProcessorToFile : streamProcessor);
options.apiVersion = await this.org.retrieveMaxApiVersion();
// options.subscribeTimeout = new Duration(60 * 100);
const client = await StreamingClient.create(options);
client.replay(this.flags.replay);
await client.handshake();
await client.subscribe(async () => {
this.ux.log(`Listening on ${endpoint}... (ctrl-c to exit)`);
});
}
}