Skip to content

Commit

Permalink
Merge pull request #302 from earthstar-project/sync-test-net-widen
Browse files Browse the repository at this point in the history
Fix attachments for existing docs not being synced, prevent syncing hangs, add timeouts
  • Loading branch information
sgwilym authored Jan 18, 2023
2 parents 4f2c969 + 772c8fd commit 2fae6f9
Show file tree
Hide file tree
Showing 9 changed files with 530 additions and 45 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## NEXT

- (Fix) - Peers will now only initiate a single transfer for many documents with
the same attachment, fixing a case which could cause syncing to hang
indefinitely.
- (Fix) - Peers will now attempt to download attachments for documents which
they already possess prior to syncing but are missing attachments for.
- (Improvement) - Syncer will cancel itself if it does not receive anything from
the other peer within a ten second window.
- (Improvement) - Syncer will cancel an attachment transfer if it doesn't
receive anything from the other side within a ten second window.
- (Improvement) - Warn in the console when a replica could not ingest an
attachment during sync.
- (Improvement) - Better error messages for web syncing failures (e.g. 404,
Expand Down
12 changes: 12 additions & 0 deletions src/syncer/attachment_transfer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { deferred } from "../../deps.ts";
import { BlockingBus } from "../streams/stream_utils.ts";
import { DocBase, ShareAddress } from "../util/doc-types.ts";
import { isErr, ValidationError } from "../util/errors.ts";
import { BumpingTimeout } from "./bumping_timeout.ts";
import { TIMEOUT_MS } from "./constants.ts";
import { MultiDeferred } from "./multi_deferred.ts";
import {
AttachmentTransferOpts,
Expand Down Expand Up @@ -64,16 +66,25 @@ export class AttachmentTransfer<F> {
}
};

let bumpingTimeout: BumpingTimeout | null = null;

const counterStream = new ReadableStream<Uint8Array>({
async start(controller) {
const newReader = stream.getReader();

// @ts-ignore Node's ReadableStream types does not like this for some reason.
reader = newReader;

bumpingTimeout = new BumpingTimeout(() => {
controller.error("Attachment download timed out.");
}, TIMEOUT_MS);

while (true) {
const { done, value } = await newReader.read();

// Clear timeout here.
bumpingTimeout.bump();

if (done) {
break;
}
Expand All @@ -83,6 +94,7 @@ export class AttachmentTransfer<F> {
controller.enqueue(value);
}

bumpingTimeout.close();
controller.close();
},
});
Expand Down
29 changes: 29 additions & 0 deletions src/syncer/bumping_timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@

/** A timeout which can be 'bumped' manually, restarting the timer. */
export class BumpingTimeout {
private timeout: number;
private cb: () => void;
private ms: number;
private closed = false;

constructor(cb: () => void, ms: number) {
this.cb = cb;
this.ms = ms;
this.timeout = setTimeout(cb, ms);
}

bump() {
if (this.closed) {
return;
}

clearTimeout(this.timeout);
this.timeout = setTimeout(this.cb, this.ms);
}

close() {
this.closed = true;

clearTimeout(this.timeout);
}
}
3 changes: 3 additions & 0 deletions src/syncer/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Research indicates 10 seconds is the limit for holding someone's attention without any kind of feedback.
/** The maximum number of milliseconds to wait for some kind of communication from the other peer. */
export const TIMEOUT_MS = 10000;
99 changes: 98 additions & 1 deletion src/syncer/sync_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { SyncerManager } from "./syncer_manager.ts";
import { TransferManager } from "./transfer_manager.ts";
import { isErr } from "../util/errors.ts";
import { MultiformatReplica } from "../replica/multiformat_replica.ts";
import { QuerySourceEvent } from "../replica/replica-types.ts";

/** Mediates synchronisation on behalf of a `Replica`.
*/
Expand All @@ -39,6 +40,7 @@ export class SyncAgent<F> {

private hasPrepared = deferred();
private hasReconciled = deferred();
private hasCheckedAllExistingDocsForAttachments = deferred();

private requestedCount = 0;
private sentDocsCount = 0;
Expand Down Expand Up @@ -111,6 +113,16 @@ export class SyncAgent<F> {
}
})();

const existingDocAttachmentChecker = new ExistingDocAttachmentChecker({
outboundEventQueue: this.outboundEventQueue,
formats: opts.formats,
replica: opts.replica,
counterpartId: this.counterpartId,
transferManager: opts.transferManager,
hasCheckedAllExistingDocsForAttachments:
this.hasCheckedAllExistingDocsForAttachments,
});

const wantTracker = new WantTracker();

const reconciler = new SyncAgentReconciler({
Expand Down Expand Up @@ -160,7 +172,10 @@ export class SyncAgent<F> {
},
});

gossiper.isDone.then(() => {
// Resolves when this gossiper has everything it wants and its counterpart is fulfilled. Never resolves if appetite is 'continuous'.
gossiper.isDone.then(async () => {
await this.hasCheckedAllExistingDocsForAttachments;

this.isDoneMultiDeferred.resolve();
});

Expand Down Expand Up @@ -704,6 +719,88 @@ class SyncAgentReconciler<F> {
}
}

type ExistingDocAttachmentCheckerOpts<F> = {
outboundEventQueue: AsyncQueue<SyncAgentEvent>;
formats: FormatsArg<F> | undefined;
replica: MultiformatReplica;
counterpartId: string;
transferManager: TransferManager<F, unknown>;
hasCheckedAllExistingDocsForAttachments: Deferred<unknown>;
};

class ExistingDocAttachmentChecker<F> {
constructor(opts: ExistingDocAttachmentCheckerOpts<F>) {
const existingDocsStream = opts.replica.getQueryStream(
{ orderBy: "localIndex ASC" },
"existing",
opts.formats,
);

const formatLookup = getFormatLookup(opts.formats);
const handleDownload = opts.transferManager.handleDownload.bind(
opts.transferManager,
);

existingDocsStream.pipeTo(
new WritableStream<
QuerySourceEvent<FormatDocType<F>>
>({
start(controller) {
opts.replica.onEvent((event) => {
if (event.kind === "willClose") {
controller.error();
}
});
},
async write(event) {
if (event.kind === "existing" || event.kind === "success") {
// Get the right format here...
const format = formatLookup[event.doc.format];

const res = await opts.replica.getAttachment(event.doc, format);

if (isErr(res)) {
// This doc can't have an attachment attached. Do nothing.
return;
} else if (res === undefined) {
const downloadResult = await handleDownload(
event.doc,
opts.replica,
opts.counterpartId,
);

// Direct download not supported, send an upload request instead.
if (isErr(downloadResult)) {
const attachmentInfo = format.getAttachmentInfo(
event.doc,
) as { size: number; hash: string };

opts.transferManager.registerExpectedTransfer(
opts.replica.share,
attachmentInfo.hash,
);

opts.outboundEventQueue.push({
kind: "WANT_ATTACHMENT",
attachmentHash: attachmentInfo.hash,
doc: event.doc,
shareAddress: opts.replica.share,
});
}
}
}
},
}),
).then(() => {
// We are done checking existing docs
opts.hasCheckedAllExistingDocsForAttachments.resolve();
}).catch(() => {
// The replica was closed (probably because syncing was cancelled).
opts.hasCheckedAllExistingDocsForAttachments.resolve();
});
}
}

/** Tracks which documents a SyncAgent has sent WANT messages for and how many have actually been received. */
class WantTracker {
private isSealed = false;
Expand Down
16 changes: 16 additions & 0 deletions src/syncer/syncer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import { TransferManager } from "./transfer_manager.ts";
import { MultiDeferred } from "./multi_deferred.ts";
import { AsyncQueue, deferred } from "../../deps.ts";
import { SyncerManager } from "./syncer_manager.ts";
import { BumpingTimeout } from "./bumping_timeout.ts";
import { TIMEOUT_MS } from "./constants.ts";

/** Syncs the data of a Peer's replicas with that of another peer.
*
Expand Down Expand Up @@ -56,6 +58,8 @@ export class Syncer<IncomingTransferSourceType, FormatsType = DefaultFormats> {
private partnerIsFulfilled = deferred<true>();
private isDoneMultiDeferred = new MultiDeferred();
private heartbeatInterval: number;
// Used to timeout when we haven't heard from the other peer in a while.
private bumpingTimeout: BumpingTimeout;

constructor(opts: SyncerOpts<FormatsType, IncomingTransferSourceType>) {
// Have to do this because we'll be using these values in a context where 'this' is different
Expand Down Expand Up @@ -124,6 +128,8 @@ export class Syncer<IncomingTransferSourceType, FormatsType = DefaultFormats> {

await this.partner.closeConnection();

this.bumpingTimeout.close();

this.isDoneMultiDeferred.resolve();
});

Expand Down Expand Up @@ -158,6 +164,12 @@ export class Syncer<IncomingTransferSourceType, FormatsType = DefaultFormats> {
});
});
});

this.bumpingTimeout = new BumpingTimeout(() => {
this.cancel(
"No communication from the other peer in the last three seconds.",
);
}, TIMEOUT_MS);
}

private addShare(
Expand Down Expand Up @@ -226,6 +238,8 @@ export class Syncer<IncomingTransferSourceType, FormatsType = DefaultFormats> {

/** Handle inbound events from the other peer. */
private async handleIncomingEvent(event: SyncerEvent) {
this.bumpingTimeout.bump();

switch (event.kind) {
// Handle an incoming salted handsake
case "DISCLOSE": {
Expand Down Expand Up @@ -349,6 +363,8 @@ export class Syncer<IncomingTransferSourceType, FormatsType = DefaultFormats> {

/** Stop syncing. */
async cancel(reason?: Error | string) {
this.bumpingTimeout.close();

this.isDoneMultiDeferred.reject(reason);

for (const [_addr, agent] of this.syncAgents) {
Expand Down
52 changes: 10 additions & 42 deletions src/syncer/transfer_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
FormatsArg,
} from "../formats/format_types.ts";
import { getFormatLookup } from "../formats/util.ts";
import { QuerySourceEvent } from "../replica/replica-types.ts";
import { Replica } from "../replica/replica.ts";
import { BlockingBus } from "../streams/stream_utils.ts";
import { AuthorAddress, Path, ShareAddress } from "../util/doc-types.ts";
Expand Down Expand Up @@ -62,44 +61,6 @@ export class TransferManager<FormatsType, IncomingAttachmentSourceType> {

// pass a syncagent's isDone to this
registerSyncAgent(agent: SyncAgent<FormatsType>) {
// create a sealed thing for when all syncagents are done syncing docs.

const existingDocsStream = agent.replica.getQueryStream(
{ orderBy: "localIndex ASC" },
"existing",
this.formats,
);

const { formatsLookup } = this;
const handleDownload = this.handleDownload.bind(this);

const pipedExistingDocsToManager = existingDocsStream.pipeTo(
new WritableStream<
QuerySourceEvent<FormatDocType<FormatsType>>
>({
async write(event) {
if (event.kind === "existing" || event.kind === "success") {
// Get the right format here...
const format = formatsLookup[event.doc.format];

const res = await agent.replica.getAttachment(event.doc, format);

if (isErr(res)) {
// This doc can't have an attachment attached. Do nothing.
return;
} else if (res === undefined) {
await handleDownload(
event.doc,
agent.replica,
agent.counterpartId,
);
}
}
},
}),
);

this.madeAllAttachmentRequestsEnroller.enrol(pipedExistingDocsToManager);
this.madeAllAttachmentRequestsEnroller.enrol(agent.isDone());
}

Expand All @@ -110,10 +71,15 @@ export class TransferManager<FormatsType, IncomingAttachmentSourceType> {
}

registerExpectedTransfer(share: ShareAddress, hash: string) {
const promise = deferred<void>();
const key = `${share}_${hash}`;
this.expectedTransferPromises.set(key, promise);

if (this.expectedTransferPromises.has(key)) {
// We're already expecting this transfer, no need to add another promise.
return;
}

const promise = deferred<void>();
this.expectedTransferPromises.set(key, promise);
this.receivedAllExpectedTransfersEnroller.enrol(promise);
}

Expand Down Expand Up @@ -150,14 +116,16 @@ export class TransferManager<FormatsType, IncomingAttachmentSourceType> {
attachmentHash: attachmentInfo.hash,
});

// The partner doesn't have it.
if (result === undefined) {
/* I don't think this is needed...
const key = `${replica.share}_${attachmentInfo.hash}`;
const promise = deferred<void>();
this.expectedTransferPromises.set(key, promise);
promise.resolve();
*/

// The sync agent will send a blob req.
return "no_attachment";
}

Expand Down
Loading

0 comments on commit 2fae6f9

Please sign in to comment.