Skip to content

Commit

Permalink
use .async
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Nov 14, 2024
1 parent ed25c82 commit 89ac67a
Showing 1 changed file with 25 additions and 26 deletions.
51 changes: 25 additions & 26 deletions packages/sql-sqlite-wasm/src/SqliteClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ export const makeOpfs = (
const transformRows = Statement.defaultTransforms(
options.transformResultNames!
).array
const pending = new Map<number, Deferred.Deferred<[columns: Array<string>, rows: Array<any>], SqlError>>()
const pending = new Map<number, (effect: Exit.Exit<[Array<string>, Array<any>], SqlError>) => void>()

const acquireWorker = Effect.gen(function*() {
const scope = yield* Effect.scope
Expand All @@ -236,16 +236,13 @@ export const makeOpfs = (
Deferred.unsafeDone(readyDeferred, Exit.void)
return
}
const deferred = pending.get(id)
if (!deferred) return
const resume = pending.get(id)
if (!resume) return
pending.delete(id)
if (error) {
Deferred.unsafeDone(
deferred,
Exit.fail(new SqlError({ cause: error as string, message: "Failed to execute statement" }))
)
resume(Exit.fail(new SqlError({ cause: error as string, message: "Failed to execute statement" })))
} else {
Deferred.unsafeDone(deferred, Exit.succeed(results))
resume(Exit.succeed(results))
}
}
port.addEventListener("message", onMessage)
Expand Down Expand Up @@ -275,25 +272,24 @@ export const makeOpfs = (
sql: string,
params: ReadonlyArray<Statement.Primitive> = [],
rowMode: "object" | "array" = "object"
): Effect.Effect<Array<any>, SqlError, never> =>
Effect.gen(function*() {
const fiber = Option.getOrThrow(Fiber.getCurrentFiber())
const deferred = yield* Deferred.make<[columns: Array<string>, rows: Array<Array<any>>], SqlError>()
const worker = yield* ScopedRef.get(workerRef)

const id = currentId++
pending.set(id, deferred)
const tranferables = fiber.getFiberRef(currentTransferables)
worker.postMessage([id, sql, params], tranferables as any)

const [columns, rows] = yield* Deferred.await(deferred)
return rowMode === "object" ?
rows.map((row) => rowToObject(columns, row))
: rows
})
): Effect.Effect<Array<any>, SqlError, never> => {
const rows = Effect.flatMap(ScopedRef.get(workerRef), (worker) =>
Effect.async<[Array<string>, Array<any>], SqlError>((resume) => {
const fiber = Option.getOrThrow(Fiber.getCurrentFiber())

const id = currentId++
pending.set(id, resume)
const tranferables = fiber.getFiberRef(currentTransferables)
worker.postMessage([id, sql, params], tranferables as any)
}))
return rowMode === "object"
? Effect.map(rows, extractObject)
: Effect.map(rows, extractRows)
}

const runTransform = options.transformResultNames
? (sql: string, params?: ReadonlyArray<Statement.Primitive>) => Effect.map(run(sql, params), transformRows)
? (sql: string, params?: ReadonlyArray<Statement.Primitive>) =>
Effect.map(run(sql, params), transformRows)
: run

return identity<SqliteConnection>({
Expand Down Expand Up @@ -328,7 +324,8 @@ export const makeOpfs = (
restore(semaphore.take(1)),
Effect.tap(
Effect.scope,
(scope) => Scope.addFinalizer(scope, semaphore.release(1))
(scope) =>
Scope.addFinalizer(scope, semaphore.release(1))
)
),
connection
Expand Down Expand Up @@ -359,6 +356,8 @@ function rowToObject(columns: Array<string>, row: Array<any>) {
}
return obj
}
const extractObject = (rows: [Array<string>, Array<any>]) => rows[1].map((row) => rowToObject(rows[0], row))
const extractRows = (rows: [Array<string>, Array<any>]) => rows[1]

/**
* @category tranferables
Expand Down

0 comments on commit 89ac67a

Please sign in to comment.