diff --git a/CHANGELOG.md b/CHANGELOG.md index e62627c8..a1f52dd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ ## NEXT +- (Fix) - Peers will now only initiate a single transfer for many documents with + the same attachment, fixing a case which could cause syncing to hang + indefinitely. +- (Fix) - Peers will now attempt to download attachments for documents which + they already possess prior to syncing but are missing attachments for. +- (Improvement) - Syncer will cancel itself if it does not receive anything from + the other peer within a ten second window. +- (Improvement) - Syncer will cancel an attachment transfer if it doesn't + receive anything from the other side within a ten second window. - (Improvement) - Warn in the console when a replica could not ingest an attachment during sync. - (Improvement) - Better error messages for web syncing failures (e.g. 404, diff --git a/src/syncer/attachment_transfer.ts b/src/syncer/attachment_transfer.ts index a839d8f2..c51b6a60 100644 --- a/src/syncer/attachment_transfer.ts +++ b/src/syncer/attachment_transfer.ts @@ -2,6 +2,8 @@ import { deferred } from "../../deps.ts"; import { BlockingBus } from "../streams/stream_utils.ts"; import { DocBase, ShareAddress } from "../util/doc-types.ts"; import { isErr, ValidationError } from "../util/errors.ts"; +import { BumpingTimeout } from "./bumping_timeout.ts"; +import { TIMEOUT_MS } from "./constants.ts"; import { MultiDeferred } from "./multi_deferred.ts"; import { AttachmentTransferOpts, @@ -64,6 +66,8 @@ export class AttachmentTransfer { } }; + let bumpingTimeout: BumpingTimeout | null = null; + const counterStream = new ReadableStream({ async start(controller) { const newReader = stream.getReader(); @@ -71,9 +75,16 @@ export class AttachmentTransfer { // @ts-ignore Node's ReadableStream types does not like this for some reason. reader = newReader; + bumpingTimeout = new BumpingTimeout(() => { + controller.error("Attachment download timed out."); + }, TIMEOUT_MS); + while (true) { const { done, value } = await newReader.read(); + // Clear timeout here. + bumpingTimeout.bump(); + if (done) { break; } @@ -83,6 +94,7 @@ export class AttachmentTransfer { controller.enqueue(value); } + bumpingTimeout.close(); controller.close(); }, }); diff --git a/src/syncer/bumping_timeout.ts b/src/syncer/bumping_timeout.ts new file mode 100644 index 00000000..e9ccb1c6 --- /dev/null +++ b/src/syncer/bumping_timeout.ts @@ -0,0 +1,29 @@ + +/** A timeout which can be 'bumped' manually, restarting the timer. */ +export class BumpingTimeout { + private timeout: number; + private cb: () => void; + private ms: number; + private closed = false; + + constructor(cb: () => void, ms: number) { + this.cb = cb; + this.ms = ms; + this.timeout = setTimeout(cb, ms); + } + + bump() { + if (this.closed) { + return; + } + + clearTimeout(this.timeout); + this.timeout = setTimeout(this.cb, this.ms); + } + + close() { + this.closed = true; + + clearTimeout(this.timeout); + } +} diff --git a/src/syncer/constants.ts b/src/syncer/constants.ts new file mode 100644 index 00000000..a31a8fa6 --- /dev/null +++ b/src/syncer/constants.ts @@ -0,0 +1,3 @@ +// Research indicates 10 seconds is the limit for holding someone's attention without any kind of feedback. +/** The maximum number of milliseconds to wait for some kind of communication from the other peer. */ +export const TIMEOUT_MS = 10000; diff --git a/src/syncer/sync_agent.ts b/src/syncer/sync_agent.ts index c4d75b77..7a013c6b 100644 --- a/src/syncer/sync_agent.ts +++ b/src/syncer/sync_agent.ts @@ -19,6 +19,7 @@ import { SyncerManager } from "./syncer_manager.ts"; import { TransferManager } from "./transfer_manager.ts"; import { isErr } from "../util/errors.ts"; import { MultiformatReplica } from "../replica/multiformat_replica.ts"; +import { QuerySourceEvent } from "../replica/replica-types.ts"; /** Mediates synchronisation on behalf of a `Replica`. */ @@ -39,6 +40,7 @@ export class SyncAgent { private hasPrepared = deferred(); private hasReconciled = deferred(); + private hasCheckedAllExistingDocsForAttachments = deferred(); private requestedCount = 0; private sentDocsCount = 0; @@ -111,6 +113,16 @@ export class SyncAgent { } })(); + const existingDocAttachmentChecker = new ExistingDocAttachmentChecker({ + outboundEventQueue: this.outboundEventQueue, + formats: opts.formats, + replica: opts.replica, + counterpartId: this.counterpartId, + transferManager: opts.transferManager, + hasCheckedAllExistingDocsForAttachments: + this.hasCheckedAllExistingDocsForAttachments, + }); + const wantTracker = new WantTracker(); const reconciler = new SyncAgentReconciler({ @@ -160,7 +172,10 @@ export class SyncAgent { }, }); - gossiper.isDone.then(() => { + // Resolves when this gossiper has everything it wants and its counterpart is fulfilled. Never resolves if appetite is 'continuous'. + gossiper.isDone.then(async () => { + await this.hasCheckedAllExistingDocsForAttachments; + this.isDoneMultiDeferred.resolve(); }); @@ -704,6 +719,88 @@ class SyncAgentReconciler { } } +type ExistingDocAttachmentCheckerOpts = { + outboundEventQueue: AsyncQueue; + formats: FormatsArg | undefined; + replica: MultiformatReplica; + counterpartId: string; + transferManager: TransferManager; + hasCheckedAllExistingDocsForAttachments: Deferred; +}; + +class ExistingDocAttachmentChecker { + constructor(opts: ExistingDocAttachmentCheckerOpts) { + const existingDocsStream = opts.replica.getQueryStream( + { orderBy: "localIndex ASC" }, + "existing", + opts.formats, + ); + + const formatLookup = getFormatLookup(opts.formats); + const handleDownload = opts.transferManager.handleDownload.bind( + opts.transferManager, + ); + + existingDocsStream.pipeTo( + new WritableStream< + QuerySourceEvent> + >({ + start(controller) { + opts.replica.onEvent((event) => { + if (event.kind === "willClose") { + controller.error(); + } + }); + }, + async write(event) { + if (event.kind === "existing" || event.kind === "success") { + // Get the right format here... + const format = formatLookup[event.doc.format]; + + const res = await opts.replica.getAttachment(event.doc, format); + + if (isErr(res)) { + // This doc can't have an attachment attached. Do nothing. + return; + } else if (res === undefined) { + const downloadResult = await handleDownload( + event.doc, + opts.replica, + opts.counterpartId, + ); + + // Direct download not supported, send an upload request instead. + if (isErr(downloadResult)) { + const attachmentInfo = format.getAttachmentInfo( + event.doc, + ) as { size: number; hash: string }; + + opts.transferManager.registerExpectedTransfer( + opts.replica.share, + attachmentInfo.hash, + ); + + opts.outboundEventQueue.push({ + kind: "WANT_ATTACHMENT", + attachmentHash: attachmentInfo.hash, + doc: event.doc, + shareAddress: opts.replica.share, + }); + } + } + } + }, + }), + ).then(() => { + // We are done checking existing docs + opts.hasCheckedAllExistingDocsForAttachments.resolve(); + }).catch(() => { + // The replica was closed (probably because syncing was cancelled). + opts.hasCheckedAllExistingDocsForAttachments.resolve(); + }); + } +} + /** Tracks which documents a SyncAgent has sent WANT messages for and how many have actually been received. */ class WantTracker { private isSealed = false; diff --git a/src/syncer/syncer.ts b/src/syncer/syncer.ts index 3cc6819b..de8a7406 100644 --- a/src/syncer/syncer.ts +++ b/src/syncer/syncer.ts @@ -23,6 +23,8 @@ import { TransferManager } from "./transfer_manager.ts"; import { MultiDeferred } from "./multi_deferred.ts"; import { AsyncQueue, deferred } from "../../deps.ts"; import { SyncerManager } from "./syncer_manager.ts"; +import { BumpingTimeout } from "./bumping_timeout.ts"; +import { TIMEOUT_MS } from "./constants.ts"; /** Syncs the data of a Peer's replicas with that of another peer. * @@ -56,6 +58,8 @@ export class Syncer { private partnerIsFulfilled = deferred(); private isDoneMultiDeferred = new MultiDeferred(); private heartbeatInterval: number; + // Used to timeout when we haven't heard from the other peer in a while. + private bumpingTimeout: BumpingTimeout; constructor(opts: SyncerOpts) { // Have to do this because we'll be using these values in a context where 'this' is different @@ -124,6 +128,8 @@ export class Syncer { await this.partner.closeConnection(); + this.bumpingTimeout.close(); + this.isDoneMultiDeferred.resolve(); }); @@ -158,6 +164,12 @@ export class Syncer { }); }); }); + + this.bumpingTimeout = new BumpingTimeout(() => { + this.cancel( + "No communication from the other peer in the last three seconds.", + ); + }, TIMEOUT_MS); } private addShare( @@ -226,6 +238,8 @@ export class Syncer { /** Handle inbound events from the other peer. */ private async handleIncomingEvent(event: SyncerEvent) { + this.bumpingTimeout.bump(); + switch (event.kind) { // Handle an incoming salted handsake case "DISCLOSE": { @@ -349,6 +363,8 @@ export class Syncer { /** Stop syncing. */ async cancel(reason?: Error | string) { + this.bumpingTimeout.close(); + this.isDoneMultiDeferred.reject(reason); for (const [_addr, agent] of this.syncAgents) { diff --git a/src/syncer/transfer_manager.ts b/src/syncer/transfer_manager.ts index 0e335629..605d4213 100644 --- a/src/syncer/transfer_manager.ts +++ b/src/syncer/transfer_manager.ts @@ -5,7 +5,6 @@ import { FormatsArg, } from "../formats/format_types.ts"; import { getFormatLookup } from "../formats/util.ts"; -import { QuerySourceEvent } from "../replica/replica-types.ts"; import { Replica } from "../replica/replica.ts"; import { BlockingBus } from "../streams/stream_utils.ts"; import { AuthorAddress, Path, ShareAddress } from "../util/doc-types.ts"; @@ -62,44 +61,6 @@ export class TransferManager { // pass a syncagent's isDone to this registerSyncAgent(agent: SyncAgent) { - // create a sealed thing for when all syncagents are done syncing docs. - - const existingDocsStream = agent.replica.getQueryStream( - { orderBy: "localIndex ASC" }, - "existing", - this.formats, - ); - - const { formatsLookup } = this; - const handleDownload = this.handleDownload.bind(this); - - const pipedExistingDocsToManager = existingDocsStream.pipeTo( - new WritableStream< - QuerySourceEvent> - >({ - async write(event) { - if (event.kind === "existing" || event.kind === "success") { - // Get the right format here... - const format = formatsLookup[event.doc.format]; - - const res = await agent.replica.getAttachment(event.doc, format); - - if (isErr(res)) { - // This doc can't have an attachment attached. Do nothing. - return; - } else if (res === undefined) { - await handleDownload( - event.doc, - agent.replica, - agent.counterpartId, - ); - } - } - }, - }), - ); - - this.madeAllAttachmentRequestsEnroller.enrol(pipedExistingDocsToManager); this.madeAllAttachmentRequestsEnroller.enrol(agent.isDone()); } @@ -110,10 +71,15 @@ export class TransferManager { } registerExpectedTransfer(share: ShareAddress, hash: string) { - const promise = deferred(); const key = `${share}_${hash}`; - this.expectedTransferPromises.set(key, promise); + if (this.expectedTransferPromises.has(key)) { + // We're already expecting this transfer, no need to add another promise. + return; + } + + const promise = deferred(); + this.expectedTransferPromises.set(key, promise); this.receivedAllExpectedTransfersEnroller.enrol(promise); } @@ -150,14 +116,16 @@ export class TransferManager { attachmentHash: attachmentInfo.hash, }); + // The partner doesn't have it. if (result === undefined) { + /* I don't think this is needed... const key = `${replica.share}_${attachmentInfo.hash}`; const promise = deferred(); this.expectedTransferPromises.set(key, promise); promise.resolve(); + */ - // The sync agent will send a blob req. return "no_attachment"; } diff --git a/src/test/syncer/syncer.test.ts b/src/test/syncer/syncer.test.ts index 03292d1c..881032d2 100644 --- a/src/test/syncer/syncer.test.ts +++ b/src/test/syncer/syncer.test.ts @@ -20,6 +20,7 @@ import { } from "../test-utils.ts"; import DefaultCryptoDriver from "../../crypto/default_driver.ts"; import { sleep } from "../../util/misc.ts"; +import { notErr } from "../../util/errors.ts"; setGlobalCryptoDriver(DefaultCryptoDriver); @@ -50,6 +51,153 @@ const scenarios: MultiplyScenarioOutput<{ scenarios: setOverlap, }); +Deno.test("Sync a single document", async (test) => { + const authorKeypair = await Crypto.generateAuthorKeypair( + "test", + ) as AuthorKeypair; + + const shareKeypair = await Crypto.generateShareKeypair( + "apples", + ) as ShareKeypair; + + for (const driverScenario of syncDriverScenarios) { + await test.step({ + name: `Finishes and syncs (${driverScenario.name})`, + fn: async () => { + const replicaA = new Replica({ + driver: new ReplicaDriverMemory(shareKeypair.shareAddress), + shareSecret: shareKeypair.secret, + }); + + const replicaB = new Replica({ + driver: new ReplicaDriverMemory(shareKeypair.shareAddress), + shareSecret: shareKeypair.secret, + }); + + await replicaA.set(authorKeypair, { + path: "/test/path", + text: "Hello", + }); + + const peerA = new Peer(); + peerA.addReplica(replicaA); + + const peerB = new Peer(); + peerB.addReplica(replicaB); + + const syncDriverScenario = driverScenario.item( + [FormatEs5], + "once", + ); + + // Initiate sync for each peer using the driver. + const [syncerA, syncerB] = await syncDriverScenario.setup(peerA, peerB); + + // Check that the sync finishes. + await Promise.all([syncerA.isDone(), syncerB.isDone()]); + + // Check that all replicas fully synced. + + await syncDriverScenario.teardown(); + + assert( + await replicaDocsAreSynced([replicaA, replicaB]), + `+a docs are in sync`, + ); + + assert( + await replicaAttachmentsAreSynced([replicaA, replicaB]), + `+a attachments are in sync`, + ); + + await replicaA.close(true); + await replicaB.close(true); + }, + }); + } +}); + +Deno.test("Sync a single document (with attachment)", async (test) => { + const authorKeypair = await Crypto.generateAuthorKeypair( + "test", + ) as AuthorKeypair; + + const shareKeypair = await Crypto.generateShareKeypair( + "apples", + ) as ShareKeypair; + + for (const driverScenario of syncDriverScenarios) { + await test.step({ + name: `Finishes and syncs (${driverScenario.name})`, + fn: async () => { + const replicaA = new Replica({ + driver: new ReplicaDriverMemory(shareKeypair.shareAddress), + shareSecret: shareKeypair.secret, + }); + + const replicaB = new Replica({ + driver: new ReplicaDriverMemory(shareKeypair.shareAddress), + shareSecret: shareKeypair.secret, + }); + + const randomBytes = crypto.getRandomValues( + new Uint8Array(32 * 32 * 32), + ); + + const multiplicationFactor = 32; + + const bytes = new Uint8Array( + randomBytes.length * multiplicationFactor, + ); + + for (let i = 0; i < multiplicationFactor - 1; i++) { + bytes.set(randomBytes, i * randomBytes.length); + } + + await replicaA.set(authorKeypair, { + path: "/test/path.txt", + text: "Hello", + attachment: bytes, + }); + + const peerA = new Peer(); + peerA.addReplica(replicaA); + + const peerB = new Peer(); + peerB.addReplica(replicaB); + + const syncDriverScenario = driverScenario.item( + [FormatEs5], + "once", + ); + + // Initiate sync for each peer using the driver. + const [syncerA, syncerB] = await syncDriverScenario.setup(peerA, peerB); + + // Check that the sync finishes. + await Promise.all([syncerA.isDone(), syncerB.isDone()]); + + // Check that all replicas fully synced. + + await syncDriverScenario.teardown(); + + assert( + await replicaDocsAreSynced([replicaA, replicaB]), + `+a docs are in sync`, + ); + + assert( + await replicaAttachmentsAreSynced([replicaA, replicaB]), + `+a attachments are in sync`, + ); + + await replicaA.close(true); + await replicaB.close(true); + }, + }); + } +}); + Deno.test("Syncing (appetite 'once')", async (test) => { const authorKeypair = await Crypto.generateAuthorKeypair( "test", @@ -566,3 +714,196 @@ Deno.test({ } }, }); + +Deno.test("Syncs attachments for docs which were synced in previous sessions", async (test) => { + const authorKeypair = await Crypto.generateAuthorKeypair( + "test", + ) as AuthorKeypair; + + const shareKeypair = await Crypto.generateShareKeypair( + "apples", + ) as ShareKeypair; + + for (const driverScenario of syncDriverScenarios) { + await test.step({ + name: `Finishes and syncs (${driverScenario.name})`, + fn: async () => { + const replicaA = new Replica({ + driver: new ReplicaDriverMemory(shareKeypair.shareAddress), + shareSecret: shareKeypair.secret, + }); + + const replicaB = new Replica({ + driver: new ReplicaDriverMemory(shareKeypair.shareAddress), + shareSecret: shareKeypair.secret, + }); + + const randomBytes = crypto.getRandomValues( + new Uint8Array(32 * 32 * 32), + ); + + const multiplicationFactor = 32; + + const bytes = new Uint8Array( + randomBytes.length * multiplicationFactor, + ); + + for (let i = 0; i < multiplicationFactor - 1; i++) { + bytes.set(randomBytes, i * randomBytes.length); + } + + // Create a new document with an attachment + const newDocRes = await FormatEs5.generateDocument({ + share: shareKeypair.shareAddress, + config: { + shareSecret: shareKeypair.secret, + }, + keypair: authorKeypair, + timestamp: Date.now() * 1000, + input: { + text: "Test attachment", + attachment: bytes, + format: "es.5", + path: "/test-attachment", + }, + }); + + assert(notErr(newDocRes)); + assert(newDocRes.attachment instanceof Uint8Array); + + // Give replica A the doc AND attachment to ingest + await replicaA.ingest(FormatEs5, newDocRes.doc, "local"); + await replicaA.ingestAttachment( + FormatEs5, + newDocRes.doc, + newDocRes.attachment, + "local", + ); + + // Only give replica B the doc WITHOUT the attachment + await replicaB.ingest(FormatEs5, newDocRes.doc, "local"); + + const peerA = new Peer(); + peerA.addReplica(replicaA); + + const peerB = new Peer(); + peerB.addReplica(replicaB); + + const syncDriverScenario = driverScenario.item( + [FormatEs5], + "once", + ); + + // Initiate sync for each peer using the driver. + const [syncerA, syncerB] = await syncDriverScenario.setup(peerA, peerB); + + // Check that the sync finishes. + await Promise.all([syncerA.isDone(), syncerB.isDone()]); + + // Check that all replicas fully synced. + + await syncDriverScenario.teardown(); + + assert( + await replicaDocsAreSynced([replicaA, replicaB]), + `+a docs are in sync`, + ); + + assert( + await replicaAttachmentsAreSynced([replicaA, replicaB]), + `+a attachments are in sync`, + ); + + await replicaA.close(true); + await replicaB.close(true); + }, + }); + } +}); + +Deno.test("Only initiates a single attachment transfer for two documents with the same attachment", async (test) => { + const authorKeypair = await Crypto.generateAuthorKeypair( + "test", + ) as AuthorKeypair; + + const shareKeypair = await Crypto.generateShareKeypair( + "apples", + ) as ShareKeypair; + + for (const driverScenario of syncDriverScenarios) { + await test.step({ + name: `Finishes and syncs (${driverScenario.name})`, + fn: async () => { + const replicaA = new Replica({ + driver: new ReplicaDriverMemory(shareKeypair.shareAddress), + shareSecret: shareKeypair.secret, + }); + + const replicaB = new Replica({ + driver: new ReplicaDriverMemory(shareKeypair.shareAddress), + shareSecret: shareKeypair.secret, + }); + + const randomBytes = crypto.getRandomValues( + new Uint8Array(32 * 32 * 32), + ); + + const multiplicationFactor = 32; + + const bytes = new Uint8Array( + randomBytes.length * multiplicationFactor, + ); + + for (let i = 0; i < multiplicationFactor - 1; i++) { + bytes.set(randomBytes, i * randomBytes.length); + } + + await replicaA.set(authorKeypair, { + path: "/test/path.txt", + text: "Hello", + attachment: bytes, + }); + + await replicaA.set(authorKeypair, { + path: "/test/path2.txt", + text: "Hello again!", + attachment: bytes, + }); + + const peerA = new Peer(); + peerA.addReplica(replicaA); + + const peerB = new Peer(); + peerB.addReplica(replicaB); + + const syncDriverScenario = driverScenario.item( + [FormatEs5], + "once", + ); + + // Initiate sync for each peer using the driver. + const [syncerA, syncerB] = await syncDriverScenario.setup(peerA, peerB); + + // Check that the sync finishes. + await Promise.all([syncerA.isDone(), syncerB.isDone()]); + + // Check that all replicas fully synced. + + await syncDriverScenario.teardown(); + + assert( + await replicaDocsAreSynced([replicaA, replicaB]), + `+a docs are in sync`, + ); + + assert( + await replicaAttachmentsAreSynced([replicaA, replicaB]), + `+a attachments are in sync`, + ); + + await replicaA.close(true); + await replicaB.close(true); + }, + }); + } +}); diff --git a/src/test/test-utils.ts b/src/test/test-utils.ts index bca98870..dc83b7b3 100644 --- a/src/test/test-utils.ts +++ b/src/test/test-utils.ts @@ -162,16 +162,26 @@ export function writeRandomDocs( ) { const fstRand = randomId(); - const hasAttachment = Math.random() >= 0.8; + const hasAttachment = Math.random() >= 0.5; const setPromises = Array.from({ length: n }, () => { const rand = randomId(); if (hasAttachment) { - const bytes = crypto.getRandomValues( + const randomBytes = crypto.getRandomValues( new Uint8Array((Math.random() + 0.1) * 32 * 32 * 32), ); + const multiplicationFactor = Math.random() * 32; + + const bytes = new Uint8Array( + randomBytes.length * multiplicationFactor, + ); + + for (let i = 0; i < multiplicationFactor - 1; i++) { + bytes.set(randomBytes, i * randomBytes.length); + } + return storage.set(keypair, { text: `${rand}`, path: `/${fstRand}/${rand}.txt`,