Skip to content

Commit 9a50aa1

Browse files
Tighten up CF Socket state handling
1 parent c24c3de commit 9a50aa1

File tree

1 file changed

+22
-10
lines changed

1 file changed

+22
-10
lines changed

packages/pg/lib/stream.js

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,7 @@ class CFSocket extends EventEmitter {
6666
const { connect } = await import('cloudflare:sockets')
6767
this._cfSocket = connect(`${host}:${port}`, options)
6868
this._cfWriter = this._cfSocket.writable.getWriter()
69-
70-
this._cfSocket.closed.then(() => {
71-
if (!this._upgrading) {
72-
log('CF socket closed')
73-
this._cfSocket = null
74-
this.emit('close')
75-
} else {
76-
this._upgrading = false
77-
}
78-
})
69+
this._addClosedHandler()
7970

8071
this._cfReader = this._cfSocket.readable.getReader()
8172
if (this.ssl) {
@@ -149,14 +140,35 @@ class CFSocket extends EventEmitter {
149140
}
150141

151142
startTls(options) {
143+
if (this._upgraded) {
144+
// Don't try to upgrade again.
145+
this.emit('error', 'Cannot call `startTls()` more than once on a socket')
146+
return
147+
}
152148
this._cfWriter.releaseLock()
153149
this._cfReader.releaseLock()
154150
this._upgrading = true
155151
this._cfSocket = this._cfSocket.startTls(options)
156152
this._cfWriter = this._cfSocket.writable.getWriter()
157153
this._cfReader = this._cfSocket.readable.getReader()
154+
this._addClosedHandler()
158155
this._listen().catch((e) => this.emit('error', e))
159156
}
157+
158+
_addClosedHandler() {
159+
this._cfSocket.closed
160+
.then(() => {
161+
if (!this._upgrading) {
162+
log('CF socket closed')
163+
this._cfSocket = null
164+
this.emit('close')
165+
} else {
166+
this._upgrading = false
167+
this._upgraded = true
168+
}
169+
})
170+
.catch((e) => this.emit('error', e))
171+
}
160172
}
161173

162174
const debug = false

0 commit comments

Comments
 (0)