Skip to content

Commit ae69633

Browse files
example: updates
1 parent 93a454a commit ae69633

File tree

5 files changed

+203
-38
lines changed

5 files changed

+203
-38
lines changed

dist/simpleClient.js

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,39 +7,65 @@ const workergroup_service_1 = require("@pylonbot/pylon-gateway-protobuf/dist/gat
77
// test client wip
88
class WorkerGroupClient {
99
constructor(dsn) {
10-
this.isReady = false;
1110
this.recvQueue = [];
1211
this.recvPromise = new Promise((r) => {
1312
this.recvInterrupt = r;
1413
});
14+
this.drained = false;
1515
// todo: derive fields from dsn string
1616
this.dsn = dsn;
1717
this.consumerGroup = "test";
1818
this.consumerId = "worker-1";
1919
this.client = new workergroup_service_1.GatewayWorkerGroupClient("localhost:4021", grpc_js_1.ChannelCredentials.createInsecure());
2020
this.connect();
21+
this.installSignalHandler();
2122
}
2223
connect() {
24+
if (this.drained) {
25+
// we shouldn't connect to a drained stream
26+
return;
27+
}
2328
const meta = new grpc_js_1.Metadata();
24-
meta.set("x-pylon-shard-key", "00000000000000000-0-1");
29+
meta.set("x-pylon-shard-key", "621224863100829716-0-1");
30+
meta.add("x-pylon-event-types", "MESSAGE_CREATE");
2531
const stream = this.client.workerStream(meta);
2632
this.stream = stream;
2733
stream.once("close", () => {
28-
console.log("stream closed...");
34+
console.log("stream closed... reconnecting in 5s");
2935
this.stream = undefined;
36+
if (this.drainResolve) {
37+
this.drainResolve();
38+
}
39+
setTimeout(() => {
40+
this.connect();
41+
}, 5000);
3042
});
3143
stream.once("error", (e) => {
32-
console.error(e);
44+
console.error(`stream error: ${e.message}`);
45+
stream.destroy();
3346
});
3447
stream.on("data", (data) => {
35-
console.log("data");
36-
this.recvQueue.push(data);
37-
this.recvInterrupt?.();
38-
this.recvPromise = new Promise((r) => {
39-
this.recvInterrupt = r;
40-
});
48+
switch (data.payload?.$case) {
49+
case "identifyResponse":
50+
const { routerTicket } = data.payload.identifyResponse;
51+
console.log(`Authenticated! routerTicket: ${routerTicket}`);
52+
break;
53+
case "eventEnvelope":
54+
this.sequence = data.payload.eventEnvelope.header?.seq;
55+
this.recvQueue.push(data.payload.eventEnvelope);
56+
this.recvInterrupt?.();
57+
this.recvPromise = new Promise((r) => {
58+
this.recvInterrupt = r;
59+
});
60+
break;
61+
case "streamClosed":
62+
console.info(`received graceful close: ${data.payload.streamClosed.reason}`);
63+
stream.destroy();
64+
default:
65+
console.log(`unhandled server message: ${data.payload?.$case}`);
66+
}
4167
});
42-
console.log("Authenticating");
68+
console.log("Authenticating...");
4369
stream.write(workergroup_1.WorkerStreamClientMessage.fromPartial({
4470
payload: {
4571
$case: "identifyRequest",
@@ -51,6 +77,37 @@ class WorkerGroupClient {
5177
},
5278
}));
5379
}
80+
installSignalHandler() {
81+
process.on("SIGTERM", async () => {
82+
console.info("received SIGTERM signal, draining + waiting 30s for shutdown");
83+
setTimeout(() => {
84+
console.warn("waited 30s, shutting down");
85+
process.exit(1);
86+
}, 30000);
87+
await this.drain();
88+
process.exit(0);
89+
});
90+
}
91+
async drain() {
92+
if (!this.stream || this.drained) {
93+
console.warn("request to drain non-ready stream ignored");
94+
return;
95+
}
96+
this.drained = true;
97+
const seq = this.sequence || 0;
98+
console.info(`draining stream, sequence: ${seq}`);
99+
const drainPromise = new Promise((r) => (this.drainResolve = r));
100+
this.stream.write(workergroup_1.WorkerStreamClientMessage.fromPartial({
101+
payload: {
102+
$case: "drainRequest",
103+
drainRequest: {
104+
sequence: `${seq}`,
105+
},
106+
},
107+
}));
108+
await drainPromise;
109+
console.info("drain complete");
110+
}
54111
async *events() {
55112
if (!this.stream) {
56113
this.connect();

dist/test.js

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,20 @@
11
"use strict";
22
Object.defineProperty(exports, "__esModule", { value: true });
33
const simpleClient_1 = require("./simpleClient");
4+
function messageCreate(message) {
5+
console.log(`new message from ${message.author?.username}: ${message.content}`);
6+
}
47
new Promise(async (r) => {
5-
const client = new simpleClient_1.WorkerGroupClient("pylon://not-a-real-endpoint-yet");
8+
const client = new simpleClient_1.WorkerGroupClient("pylon://auth-token@pylon-router-endpoint/worker-group-id");
9+
// crude event handler that works off the raw proto structs
610
for await (const event of client.events()) {
7-
console.log(event);
11+
switch (event.eventData?.$case) {
12+
case "messageCreateEvent":
13+
const { messageData } = event.eventData.messageCreateEvent;
14+
if (messageData) {
15+
messageCreate(messageData);
16+
}
17+
break;
18+
}
819
}
920
});

src/simpleClient.ts

Lines changed: 96 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
ClientDuplexStream,
44
Metadata,
55
} from "@grpc/grpc-js";
6+
import { EventEnvelope } from "@pylonbot/pylon-gateway-protobuf/dist/discord/v1/event";
67

78
import {
89
WorkerStreamClientMessage,
@@ -22,14 +23,18 @@ export class WorkerGroupClient {
2223
private stream:
2324
| ClientDuplexStream<WorkerStreamClientMessage, WorkerStreamServerMessage>
2425
| undefined;
25-
private isReady = false;
2626

27-
recvQueue: any[] = [];
28-
recvInterrupt?: (value?: unknown) => void;
29-
recvPromise = new Promise((r) => {
27+
private recvQueue: EventEnvelope[] = [];
28+
private recvInterrupt?: (value?: unknown) => void;
29+
private recvPromise = new Promise((r) => {
3030
this.recvInterrupt = r;
3131
});
3232

33+
private drained = false;
34+
private drainResolve?: Function;
35+
36+
private sequence?: string;
37+
3338
constructor(dsn: string) {
3439
// todo: derive fields from dsn string
3540
this.dsn = dsn;
@@ -42,40 +47,71 @@ export class WorkerGroupClient {
4247
);
4348

4449
this.connect();
50+
this.installSignalHandler();
4551
}
4652

4753
private connect() {
54+
if (this.drained) {
55+
// we shouldn't connect to a drained stream
56+
return;
57+
}
58+
4859
const meta = new Metadata();
49-
meta.set("x-pylon-shard-key", "00000000000000000-0-1");
60+
// you can call .add() multiple times per key
61+
meta.add("x-pylon-shard-key", "000000000000000-0-1");
62+
63+
meta.add("x-pylon-event-types", "MESSAGE_CREATE");
5064

5165
const stream = this.client.workerStream(meta);
5266
this.stream = stream;
5367

5468
stream.once("close", () => {
55-
console.log("stream closed...");
69+
console.log("stream closed... reconnecting in 5s");
5670
this.stream = undefined;
71+
if (this.drainResolve) {
72+
this.drainResolve();
73+
}
74+
setTimeout(() => {
75+
this.connect();
76+
}, 5000);
5777
});
5878

5979
stream.once("error", (e: any) => {
60-
console.error(e);
80+
console.error(`stream error: ${e.message}`);
81+
stream.destroy();
6182
});
6283

6384
stream.on("data", (data: WorkerStreamServerMessage) => {
64-
console.log("data");
65-
this.recvQueue.push(data);
66-
this.recvInterrupt?.();
67-
this.recvPromise = new Promise((r) => {
68-
this.recvInterrupt = r;
69-
});
85+
switch (data.payload?.$case) {
86+
case "identifyResponse":
87+
const { routerTicket } = data.payload.identifyResponse;
88+
console.log(`Authenticated! routerTicket: ${routerTicket}`);
89+
break;
90+
case "eventEnvelope":
91+
this.sequence = data.payload.eventEnvelope.header?.seq;
92+
this.recvQueue.push(data.payload.eventEnvelope);
93+
this.recvInterrupt?.();
94+
this.recvPromise = new Promise((r) => {
95+
this.recvInterrupt = r;
96+
});
97+
break;
98+
case "streamClosed":
99+
console.info(
100+
`received graceful close: ${data.payload.streamClosed.reason}`
101+
);
102+
stream.destroy();
103+
default:
104+
console.log(`unhandled server message: ${data.payload?.$case}`);
105+
}
70106
});
71107

72-
console.log("Authenticating");
108+
console.log("Authenticating...");
73109
stream.write(
74110
WorkerStreamClientMessage.fromPartial({
75111
payload: {
76112
$case: "identifyRequest",
77113
identifyRequest: {
78-
authToken: "noauth",
114+
authToken: "noauth", // todo: read params from connection string
79115
consumerGroup: this.consumerGroup,
80116
consumerId: this.consumerId,
81117
},
@@ -84,7 +120,51 @@ export class WorkerGroupClient {
84120
);
85121
}
86122

87-
public async *events() {
123+
private installSignalHandler() {
124+
process.on("SIGTERM", async () => {
125+
console.info(
126+
"received SIGTERM signal, draining + waiting 30s for shutdown"
127+
);
128+
setTimeout(() => {
129+
console.warn("waited 30s, shutting down");
130+
process.exit(1);
131+
}, 30000);
132+
133+
await this.drain();
134+
135+
process.exit(0);
136+
});
137+
}
138+
139+
public async drain() {
140+
if (!this.stream || this.drained) {
141+
console.warn("request to drain non-ready stream ignored");
142+
return;
143+
}
144+
145+
this.drained = true;
146+
const seq = this.sequence || 0;
147+
148+
console.info(`draining stream, sequence: ${seq}`);
149+
150+
const drainPromise = new Promise((r) => (this.drainResolve = r));
151+
this.stream.write(
152+
WorkerStreamClientMessage.fromPartial({
153+
payload: {
154+
$case: "drainRequest",
155+
drainRequest: {
156+
sequence: `${seq}`,
157+
},
158+
},
159+
})
160+
);
161+
162+
await drainPromise;
163+
164+
console.info("drain complete");
165+
}
166+
167+
public async *events(): AsyncGenerator<EventEnvelope, unknown, undefined> {
88168
if (!this.stream) {
89169
this.connect();
90170
}

src/test.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,26 @@
1+
import { MessageData } from "@pylonbot/pylon-gateway-protobuf/dist/discord/v1/model";
12
import { WorkerGroupClient } from "./simpleClient";
23

4+
function messageCreate(message: MessageData) {
5+
console.log(
6+
`new message from ${message.author?.username}: ${message.content}`
7+
);
8+
}
9+
310
new Promise(async (r) => {
4-
const client = new WorkerGroupClient("pylon://not-a-real-endpoint-yet");
11+
const client = new WorkerGroupClient(
12+
"pylon://auth-token@pylon-router-endpoint/worker-group-id"
13+
);
514

15+
// crude event handler that works off the raw proto structs
616
for await (const event of client.events()) {
7-
console.log(event);
17+
switch (event.eventData?.$case) {
18+
case "messageCreateEvent":
19+
const { messageData } = event.eventData.messageCreateEvent;
20+
if (messageData) {
21+
messageCreate(messageData);
22+
}
23+
break;
24+
}
825
}
926
});

yarn.lock

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666

6767
"@pylonbot/pylon-gateway-protobuf@https://github.com/pylonbot/pylon-gateway-protobuf-typescript":
6868
version "0.0.1"
69-
resolved "https://github.com/pylonbot/pylon-gateway-protobuf-typescript#68b5e633bfc0eaecb742dd292a80fb913b3ceab1"
69+
resolved "https://github.com/pylonbot/pylon-gateway-protobuf-typescript#fcb2eed99f8a07d5da18fc1461276c367a790502"
7070
dependencies:
7171
"@grpc/grpc-js" "^1.2.12"
7272
protobufjs "^6.8.8"
@@ -77,14 +77,14 @@
7777
integrity sha512-5tXH6Bx/kNGd3MgffdmP4dy2Z+G4eaXw0SE81Tq3BNadtnMR5/ySMzX4SLEzHJzSmPNn4HIdpQsBvXMUykr58w==
7878

7979
"@types/node@>=12.12.47":
80-
version "14.14.37"
81-
resolved "https://registry.yarnpkg.com/@types/node/-/node-14.14.37.tgz#a3dd8da4eb84a996c36e331df98d82abd76b516e"
82-
integrity sha512-XYmBiy+ohOR4Lh5jE379fV2IU+6Jn4g5qASinhitfyO71b/sCo6MKsMLF5tc7Zf2CE8hViVQyYSobJNke8OvUw==
80+
version "14.14.41"
81+
resolved "https://registry.yarnpkg.com/@types/node/-/node-14.14.41.tgz#d0b939d94c1d7bd53d04824af45f1139b8c45615"
82+
integrity sha512-dueRKfaJL4RTtSa7bWeTK1M+VH+Gns73oCgzvYfHZywRCoPSd8EkXBL0mZ9unPTveBn+D9phZBaxuzpwjWkW0g==
8383

8484
"@types/node@^13.7.0":
85-
version "13.13.48"
86-
resolved "https://registry.yarnpkg.com/@types/node/-/node-13.13.48.tgz#46a3df718aed5217277f2395a682e055a487e341"
87-
integrity sha512-z8wvSsgWQzkr4sVuMEEOvwMdOQjiRY2Y/ZW4fDfjfe3+TfQrZqFKOthBgk2RnVEmtOKrkwdZ7uTvsxTBLjKGDQ==
85+
version "13.13.50"
86+
resolved "https://registry.yarnpkg.com/@types/node/-/node-13.13.50.tgz#bc8ebf70c392a98ffdba7aab9b46989ea96c1c62"
87+
integrity sha512-y7kkh+hX/0jZNxMyBR/6asG0QMSaPSzgeVK63dhWHl4QAXCQB8lExXmzLL6SzmOgKHydtawpMnNhlDbv7DXPEA==
8888

8989
abort-controller@^3.0.0:
9090
version "3.0.0"

0 commit comments

Comments
 (0)