Skip to content

Commit

Permalink
fix: do not wait for stream reads and writes at the same time (#2290)
Browse files Browse the repository at this point in the history
achingbrain/it#109 will change the behaviour
of byte streams to wait for the first read before resolving the promise
returned from the first write in order to have guarentees that once
the promise has resolved, the data has been sent so update the tests
etc to remove deadlocks.
  • Loading branch information
achingbrain authored Dec 2, 2023
1 parent 09dd029 commit 10ea197
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 24 deletions.
33 changes: 18 additions & 15 deletions packages/connection-encrypter-plaintext/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,26 @@ class Plaintext implements ConnectionEncrypter {
type = KeyType.Secp256k1
}

// Encode the public key and write it to the remote peer
await pb.write({
id: localId.toBytes(),
pubkey: {
Type: type,
Data: localId.publicKey ?? new Uint8Array(0)
}
}, {
signal
})

this.log('write pubkey exchange to peer %p', remoteId)

// Get the Exchange message
const response = await pb.read({
signal
})
const [
, response
] = await Promise.all([
// Encode the public key and write it to the remote peer
pb.write({
id: localId.toBytes(),
pubkey: {
Type: type,
Data: localId.publicKey ?? new Uint8Array(0)
}
}, {
signal
}),
// Get the Exchange message
pb.read({
signal
})
])

let peerId
try {
Expand Down
3 changes: 2 additions & 1 deletion packages/multistream-select/test/dialer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { logger } from '@libp2p/logger'
import { expect } from 'aegir/chai'
import randomBytes from 'iso-random-stream/src/random.js'
import all from 'it-all'
import drain from 'it-drain'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import pTimeout from 'p-timeout'
Expand All @@ -29,7 +30,7 @@ describe('Dialer', () => {

// Ensure stream is usable after selection - send data outgoing -> incoming
const input = [randomBytes(10), randomBytes(64), randomBytes(3)]
void pipe(input, selection.stream)
void pipe(input, selection.stream, drain)

// wait for incoming end to have completed negotiation
await handled
Expand Down
16 changes: 10 additions & 6 deletions packages/pnet/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,17 @@ class PreSharedKeyConnectionProtector implements ConnectionProtector {
const signal = AbortSignal.timeout(this.timeout)

const bytes = byteStream(connection)
await bytes.write(localNonce, {
signal
})

const result = await bytes.read(NONCE_LENGTH, {
signal
})
const [
, result
] = await Promise.all([
bytes.write(localNonce, {
signal
}),
bytes.read(NONCE_LENGTH, {
signal
})
])

const remoteNonce = result.subarray()

Expand Down
2 changes: 1 addition & 1 deletion packages/protocol-identify/test/push.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ describe('identify (push)', () => {
const updatedAddress = multiaddr('/ip4/127.0.0.1/tcp/48322')

const pb = pbStream(stream)
await pb.write({
void pb.write({
publicKey: remotePeer.publicKey,
protocols: [
updatedProtocol
Expand Down
2 changes: 1 addition & 1 deletion packages/protocol-ping/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ describe('ping', () => {
const input = Uint8Array.from([0, 1, 2, 3, 4])

const b = byteStream(outgoingStream)
await b.write(input)
void b.write(input)

const output = await b.read()

Expand Down

0 comments on commit 10ea197

Please sign in to comment.