Skip to content

Commit

Permalink
refactor(stream): change sending into reusable functions
Browse files Browse the repository at this point in the history
  • Loading branch information
justmoon committed Oct 29, 2024
1 parent 046d8d9 commit 071c8d9
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 82 deletions.
26 changes: 26 additions & 0 deletions packages/lib-protocol-stream/src/stream/add-send-amount.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { isFailure } from "@dassie/lib-type-utils"

import { assertConnectionCanSendMoney } from "../connection/assert-can-send"
import type { ConnectionState } from "../connection/state"
import type { StreamState } from "./state"

interface AddSendAmountOptions {
connectionState: ConnectionState
state: StreamState
amount: bigint
}

export function addSendAmount({
connectionState,
state,
amount,
}: AddSendAmountOptions) {
{
const result = assertConnectionCanSendMoney(connectionState)
if (isFailure(result)) return result
}

state.sendMaximum += amount

return
}
88 changes: 88 additions & 0 deletions packages/lib-protocol-stream/src/stream/send-and-await.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { createDeferred, createScope } from "@dassie/lib-reactive"
import { isFailure } from "@dassie/lib-type-utils"

import { NoExchangeRateFailure } from "../connection/failures/no-exchange-rate-failure"
import { NoRemoteAddressFailure } from "../connection/failures/no-remote-address-failure"
import { sendUntilDone } from "../connection/send-until-done"
import type { ConnectionState } from "../connection/state"
import { addSendAmount } from "./add-send-amount"
import {
SEND_INCOMPLETE_FAILURE,
SEND_TIMEOUT_FAILURE,
SendFailure,
} from "./failures/send-failure"
import type { RemoteMoneyEvent, StreamState } from "./state"

const DEFAULT_TIMEOUT = 30_000

interface SendAndAwaitOptions {
connectionState: ConnectionState
state: StreamState
amount: bigint
timeout?: number | undefined
}

export function sendAndAwait({
connectionState,
state,
amount,
timeout = DEFAULT_TIMEOUT,
}: SendAndAwaitOptions): Promise<
void | SendFailure | NoRemoteAddressFailure | NoExchangeRateFailure
> {
const scope = createScope("stream-send")

{
const result = addSendAmount({ connectionState, state, amount })
if (isFailure(result)) return Promise.resolve(result)
}

const deferred = createDeferred<void | SendFailure>()
const targetAmount = state.sendMaximum

const handleSendCompleted = (result: void | SendFailure = undefined) => {
scope.dispose().catch((error: unknown) => {
connectionState.context.logger.error("error disposing send scope", {
error,
})
})
connectionState.context.clock.clearTimeout(timeoutId)
deferred.resolve(result)
}

const timeoutId = connectionState.context.clock.setTimeout(() => {
handleSendCompleted(SEND_TIMEOUT_FAILURE)
}, timeout)

const sentListener = () => {
if (state.sentAmount >= targetAmount) {
handleSendCompleted()
}
}
state.topics.moneySent.on(scope, sentListener)

const remoteListener = (event: RemoteMoneyEvent) => {
if (
event.receiveMaximum - event.receivedAmount <
connectionState.context.policy.deMinimisAmount
) {
handleSendCompleted()
}
}
state.topics.remoteMoney.on(scope, remoteListener)

sendUntilDone(connectionState)
.catch((error: unknown) => {
connectionState.context.logger.error(
"unexpected error returned by send loop",
{
error,
},
)
})
.finally(() => {
handleSendCompleted(SEND_INCOMPLETE_FAILURE)
})

return deferred
}
99 changes: 17 additions & 82 deletions packages/lib-protocol-stream/src/stream/stream.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
import {
type Listener,
type Topic,
createDeferred,
createScope,
} from "@dassie/lib-reactive"
import { isFailure } from "@dassie/lib-type-utils"
import { type Listener, type Topic } from "@dassie/lib-reactive"

import { assertConnectionCanSendMoney } from "../connection/assert-can-send"
import type { NoExchangeRateFailure } from "../connection/failures/no-exchange-rate-failure"
import type { NoRemoteAddressFailure } from "../connection/failures/no-remote-address-failure"
import { sendUntilDone } from "../connection/send-until-done"
import type { ConnectionState } from "../connection/state"
import type { EventEmitter } from "../types/event-emitter"
import { addSendAmount } from "./add-send-amount"
import { closeStream } from "./close"
import {
SEND_INCOMPLETE_FAILURE,
SEND_TIMEOUT_FAILURE,
type SendFailure,
} from "./failures/send-failure"
import type { RemoteMoneyEvent, StreamEvents, StreamState } from "./state"
import { type SendFailure } from "./failures/send-failure"
import { sendAndAwait } from "./send-and-await"
import type { StreamEvents, StreamState } from "./state"

export interface SendOptions {
amount: bigint
timeout?: number
}

const DEFAULT_TIMEOUT = 30_000

export class Stream implements EventEmitter<StreamEvents> {
constructor(
private readonly connectionState: ConnectionState,
Expand All @@ -43,66 +31,16 @@ export class Stream implements EventEmitter<StreamEvents> {
*/
send({
amount,
timeout = DEFAULT_TIMEOUT,
timeout,
}: SendOptions): Promise<
void | SendFailure | NoRemoteAddressFailure | NoExchangeRateFailure
> {
const scope = createScope("stream-send")

{
const result = this.addSendAmount(amount)
if (isFailure(result)) return Promise.resolve(result)
}

const deferred = createDeferred<void | SendFailure>()
const targetAmount = this.state.sendMaximum

const handleSendCompleted = (result: void | SendFailure = undefined) => {
scope.dispose().catch((error: unknown) => {
this.connectionState.context.logger.error(
"error disposing send scope",
{ error },
)
})
this.connectionState.context.clock.clearTimeout(timeoutId)
deferred.resolve(result)
}

const timeoutId = this.connectionState.context.clock.setTimeout(() => {
handleSendCompleted(SEND_TIMEOUT_FAILURE)
}, timeout)

const sentListener = () => {
if (this.state.sentAmount >= targetAmount) {
handleSendCompleted()
}
}
this.state.topics.moneySent.on(scope, sentListener)

const remoteListener = (event: RemoteMoneyEvent) => {
if (
event.receiveMaximum - event.receivedAmount <
this.connectionState.context.policy.deMinimisAmount
) {
handleSendCompleted()
}
}
this.state.topics.remoteMoney.on(scope, remoteListener)

sendUntilDone(this.connectionState)
.catch((error: unknown) => {
this.connectionState.context.logger.error(
"unexpected error returned by send loop",
{
error,
},
)
})
.finally(() => {
handleSendCompleted(SEND_INCOMPLETE_FAILURE)
})

return deferred
return sendAndAwait({
connectionState: this.connectionState,
state: this.state,
amount,
timeout,
})
}

/**
Expand All @@ -116,14 +54,11 @@ export class Stream implements EventEmitter<StreamEvents> {
}

addSendAmount(amount: bigint) {
{
const result = assertConnectionCanSendMoney(this.connectionState)
if (isFailure(result)) return result
}

this.state.sendMaximum += amount

return
return addSendAmount({
connectionState: this.connectionState,
state: this.state,
amount,
})
}

addReceiveAmount(amount: bigint) {
Expand Down

0 comments on commit 071c8d9

Please sign in to comment.