Skip to content

Commit

Permalink
Add sql.reserve method
Browse files Browse the repository at this point in the history
  • Loading branch information
porsager committed Jul 1, 2023
1 parent ba498fd commit 7f6e0cc
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 10 deletions.
2 changes: 1 addition & 1 deletion cjs/src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
return // Consider opening if able and sent.length < 50

connection.reserved
? x[5] === 73 // I
? !connection.reserved.release && x[5] === 73 // I
? ending
? terminate()
: (connection.reserved = null, onopen(connection))
Expand Down
42 changes: 40 additions & 2 deletions cjs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ function Postgres(a, b) {
END: CLOSE,
PostgresError,
options,
reserve,
listen,
begin,
close,
Expand Down Expand Up @@ -199,6 +200,36 @@ function Postgres(a, b) {
return await sql`select pg_notify(${ channel }, ${ '' + payload })`
}

async function reserve() {
const q = Queue()
const c = open.length
? open.shift()
: await new Promise(r => {
queries.push({ reserve: r })
closed.length && connect(closed.shift())
})

move(c, reserved)
c.reserved = () => q.length
? c.execute(q.shift())
: move(c, reserved)
c.reserved.release = true

const sql = Sql(handler)
sql.release = () => {
c.reserved = null
onopen(c)
}

return sql

function handler(q) {
c.queue === full
? q.push(q)
: c.execute(q) || move(c, full)
}
}

async function begin(options, fn) {
!fn && (fn = options, options = '')
const queries = Queue()
Expand Down Expand Up @@ -270,6 +301,7 @@ function Postgres(a, b) {
queue === open
? c.idleTimer.start()
: c.idleTimer.cancel()
return c
}

function json(x) {
Expand Down Expand Up @@ -348,6 +380,7 @@ function Postgres(a, b) {
function connect(c, query) {
move(c, connecting)
c.connect(query)
return c
}

function onend(c) {
Expand All @@ -361,8 +394,13 @@ function Postgres(a, b) {
let max = Math.ceil(queries.length / (connecting.length + 1))
, ready = true

while (ready && queries.length && max-- > 0)
ready = c.execute(queries.shift())
while (ready && queries.length && max-- > 0) {
const query = queries.shift()
if (query.reserve)
return query.reserve(c)

ready = c.execute(query)
}

ready
? move(c, busy)
Expand Down
20 changes: 20 additions & 0 deletions cjs/tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2499,3 +2499,23 @@ t('concurrent cursors multiple connections', async() => {

return ['12233445566778', xs.sort().join('')]
})

t('reserve connection', async() => {
const reserved = await sql.reserve()

setTimeout(() => reserved.release(), 500)

const xs = await Promise.all([
reserved`select 1 as x`.then(([{ x }]) => ({ time: Date.now(), x })),
sql`select 2 as x`.then(([{ x }]) => ({ time: Date.now(), x })),
reserved`select 3 as x`.then(([{ x }]) => ({ time: Date.now(), x }))
])

if (xs[1].time - xs[2].time < 500)
throw new Error('Wrong time')

return [
'123',
xs.map(x => x.x).join('')
]
})
2 changes: 1 addition & 1 deletion deno/src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
return // Consider opening if able and sent.length < 50

connection.reserved
? x[5] === 73 // I
? !connection.reserved.release && x[5] === 73 // I
? ending
? terminate()
: (connection.reserved = null, onopen(connection))
Expand Down
42 changes: 40 additions & 2 deletions deno/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ function Postgres(a, b) {
END: CLOSE,
PostgresError,
options,
reserve,
listen,
begin,
close,
Expand Down Expand Up @@ -200,6 +201,36 @@ function Postgres(a, b) {
return await sql`select pg_notify(${ channel }, ${ '' + payload })`
}

async function reserve() {
const q = Queue()
const c = open.length
? open.shift()
: await new Promise(r => {
queries.push({ reserve: r })
closed.length && connect(closed.shift())
})

move(c, reserved)
c.reserved = () => q.length
? c.execute(q.shift())
: move(c, reserved)
c.reserved.release = true

const sql = Sql(handler)
sql.release = () => {
c.reserved = null
onopen(c)
}

return sql

function handler(q) {
c.queue === full
? q.push(q)
: c.execute(q) || move(c, full)
}
}

async function begin(options, fn) {
!fn && (fn = options, options = '')
const queries = Queue()
Expand Down Expand Up @@ -271,6 +302,7 @@ function Postgres(a, b) {
queue === open
? c.idleTimer.start()
: c.idleTimer.cancel()
return c
}

function json(x) {
Expand Down Expand Up @@ -349,6 +381,7 @@ function Postgres(a, b) {
function connect(c, query) {
move(c, connecting)
c.connect(query)
return c
}

function onend(c) {
Expand All @@ -362,8 +395,13 @@ function Postgres(a, b) {
let max = Math.ceil(queries.length / (connecting.length + 1))
, ready = true

while (ready && queries.length && max-- > 0)
ready = c.execute(queries.shift())
while (ready && queries.length && max-- > 0) {
const query = queries.shift()
if (query.reserve)
return query.reserve(c)

ready = c.execute(query)
}

ready
? move(c, busy)
Expand Down
20 changes: 20 additions & 0 deletions deno/tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2502,4 +2502,24 @@ t('concurrent cursors multiple connections', async() => {
return ['12233445566778', xs.sort().join('')]
})

t('reserve connection', async() => {
const reserved = await sql.reserve()

setTimeout(() => reserved.release(), 500)

const xs = await Promise.all([
reserved`select 1 as x`.then(([{ x }]) => ({ time: Date.now(), x })),
sql`select 2 as x`.then(([{ x }]) => ({ time: Date.now(), x })),
reserved`select 3 as x`.then(([{ x }]) => ({ time: Date.now(), x }))
])

if (xs[1].time - xs[2].time < 500)
throw new Error('Wrong time')

return [
'123',
xs.map(x => x.x).join('')
]
})

;window.addEventListener("unload", () => Deno.exit(process.exitCode))
4 changes: 2 additions & 2 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
return // Consider opening if able and sent.length < 50

connection.reserved
? x[5] === 73 // I
? !connection.reserved.release && x[5] === 73 // I
? ending
? terminate()
: (connection.reserved = null, onopen(connection))
Expand All @@ -571,7 +571,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
final && (final(), final = null)

if (result.command === 'BEGIN' && max !== 1 && !connection.reserved)
return errored(Errors.generic('UNSAFE_TRANSACTION', 'Only use sql.begin or max: 1'))
return errored(Errors.generic('UNSAFE_TRANSACTION', 'Only use sql.begin, sql.reserved or max: 1'))

if (query.options.simple)
return BindComplete()
Expand Down
42 changes: 40 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ function Postgres(a, b) {
END: CLOSE,
PostgresError,
options,
reserve,
listen,
begin,
close,
Expand Down Expand Up @@ -199,6 +200,36 @@ function Postgres(a, b) {
return await sql`select pg_notify(${ channel }, ${ '' + payload })`
}

async function reserve() {
const q = Queue()
const c = open.length
? open.shift()
: await new Promise(r => {
queries.push({ reserve: r })
closed.length && connect(closed.shift())
})

move(c, reserved)
c.reserved = () => q.length
? c.execute(q.shift())
: move(c, reserved)
c.reserved.release = true

const sql = Sql(handler)
sql.release = () => {
c.reserved = null
onopen(c)
}

return sql

function handler(q) {
c.queue === full
? q.push(q)
: c.execute(q) || move(c, full)
}
}

async function begin(options, fn) {
!fn && (fn = options, options = '')
const queries = Queue()
Expand Down Expand Up @@ -270,6 +301,7 @@ function Postgres(a, b) {
queue === open
? c.idleTimer.start()
: c.idleTimer.cancel()
return c
}

function json(x) {
Expand Down Expand Up @@ -348,6 +380,7 @@ function Postgres(a, b) {
function connect(c, query) {
move(c, connecting)
c.connect(query)
return c
}

function onend(c) {
Expand All @@ -361,8 +394,13 @@ function Postgres(a, b) {
let max = Math.ceil(queries.length / (connecting.length + 1))
, ready = true

while (ready && queries.length && max-- > 0)
ready = c.execute(queries.shift())
while (ready && queries.length && max-- > 0) {
const query = queries.shift()
if (query.reserve)
return query.reserve(c)

ready = c.execute(query)
}

ready
? move(c, busy)
Expand Down
20 changes: 20 additions & 0 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2499,3 +2499,23 @@ t('concurrent cursors multiple connections', async() => {

return ['12233445566778', xs.sort().join('')]
})

t('reserve connection', async() => {
const reserved = await sql.reserve()

setTimeout(() => reserved.release(), 510)

const xs = await Promise.all([
reserved`select 1 as x`.then(([{ x }]) => ({ time: Date.now(), x })),
sql`select 2 as x`.then(([{ x }]) => ({ time: Date.now(), x })),
reserved`select 3 as x`.then(([{ x }]) => ({ time: Date.now(), x }))
])

if (xs[1].time - xs[2].time < 500)
throw new Error('Wrong time')

return [
'123',
xs.map(x => x.x).join('')
]
})

1 comment on commit 7f6e0cc

@stephane-klein
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@porsager ❤️

Please sign in to comment.