Skip to content

Commit

Permalink
cancel duplicate operations with an error; allows the user to react t…
Browse files Browse the repository at this point in the history
…o / clean up after a deduped operation
  • Loading branch information
Jonas committed Aug 1, 2018
1 parent cffe1d7 commit b17db2c
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 17 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ const offlineLink = new QueueLink({
});

this.link = ApolloLink.from([
new RetryLink({
attempts: {
// We don't want to retry deduped operations
retryIf: (error, _operation) => !!error && error.name !== 'DedupedByQueueError'
}
}),
offlineLink,
new BatchHttpLink({ uri: URI_TO_YOUR_GRAPHQL_SERVER }),
]);
Expand Down
6 changes: 5 additions & 1 deletion src/QueueLink.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import QueueLink from './QueueLink';
import QueueLink, {
DedupedByQueueError
} from './QueueLink';
import {
assertObservableSequence,
executeMultiple,
Expand Down Expand Up @@ -112,6 +114,7 @@ describe('OnOffLink', () => {
return assertObservableSequence(
executeMultiple(myLink, op, op2, op),
[
{ type: 'error', value: new DedupedByQueueError()},
{ type: 'next', value: testResponse2 },
{ type: 'next', value: testResponse },
{ type: 'complete' },
Expand All @@ -132,6 +135,7 @@ describe('OnOffLink', () => {
return assertObservableSequence(
executeMultiple(myLink, op, op2, op),
[
{ type: 'error', value: new DedupedByQueueError()},
{ type: 'next', value: testResponse },
{ type: 'next', value: testResponse2 },
{ type: 'complete' },
Expand Down
11 changes: 9 additions & 2 deletions src/QueueLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ const defaultOptions: QueueLink.Options = {
isDuplicate: (a: Operation, b: Operation) => a.toKey() === b.toKey()
};

export class DedupedByQueueError extends Error {
constructor() {
super('Operation got deduplicated by apollo-link-queue.');
Object.defineProperty(this, 'name', { value: 'DedupedByQueueError' });
}
}

export default class QueueLink extends ApolloLink {
private opQueue: OperationQueueEntry[] = [];
private isOpen: boolean = true;
Expand Down Expand Up @@ -93,7 +100,7 @@ export default class QueueLink extends ApolloLink {
const alreadyInQueue = this.opQueue.some(isDuplicate);
if (alreadyInQueue) {
// if there is already a duplicate entry the new one gets ignored
entry.observer.complete()
entry.observer.error(new DedupedByQueueError());
} else {
this.opQueue.push(entry);
}
Expand All @@ -103,7 +110,7 @@ export default class QueueLink extends ApolloLink {
if (index !== -1) {
// if there is already a duplicate entry it gets removed
const [duplicate] = this.opQueue.splice(index, 1);
duplicate.observer.complete();
duplicate.observer.error(new DedupedByQueueError());
}
this.opQueue.push(entry);
break;
Expand Down
28 changes: 14 additions & 14 deletions src/TestUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export interface Unsubscribable {
}

export const assertObservableSequence = (
observable: Observable<ExecutionResult>,
observable: Observable<ExecutionResult | Error>,
sequence: ObservableValue[],
initializer: (sub: Unsubscribable) => void = () => undefined,
): Promise<boolean | Error> => {
Expand All @@ -54,7 +54,8 @@ export const assertObservableSequence = (
return new Promise((resolve, reject) => {
const sub = observable.subscribe({
next: (value) => {
expect({ type: 'next', value }).toEqual(sequence[index]);
const type = value instanceof Error ? 'error' : 'next';
expect({ type, value }).toEqual(sequence[index]);
index++;
if (index === sequence.length) {
resolve(true);
Expand Down Expand Up @@ -82,16 +83,15 @@ export const assertObservableSequence = (
};

export function executeMultiple(link: ApolloLink, ...operations: GraphQLRequest[]) {
return new Observable(sub => {
let i = 0;
const s = {
next: (v: any) => sub.next(v),
error: (e: any) => sub.error(e),
complete() {
i++;
if (i === operations.length) sub.complete()
}
};
operations.forEach(op => execute(link, op).subscribe(s))
});
return Observable.from(operations.map(op => execute(link, op)))
.flatMap(o => new Observable(sub => {
o.subscribe({
next: (v: any) => sub.next(v),
error: (e: any) => {
sub.next(e);
sub.complete();
},
complete: () => sub.complete()
})
}));
}

0 comments on commit b17db2c

Please sign in to comment.