Skip to content

Commit

Permalink
Allow Miniflare#reload() with queue consumers set, closes cloudflar…
Browse files Browse the repository at this point in the history
…e#560

Previously, Miniflare would crash with a message saying a queue
consumer was already set if configured on reload. This change
clears consumers before reloading, and then adds them all back.
It also adds some tests for this behaviour.
  • Loading branch information
mrbbot committed May 4, 2023
1 parent 0e48a64 commit 3bfd4ab
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 13 deletions.
3 changes: 3 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,9 @@ export class MiniflareCore<
const newWatchPaths = new Set<string>();
if (this.#wranglerConfigPath) newWatchPaths.add(this.#wranglerConfigPath);

// Reset all queue consumers, they'll be added back in `beforeReload()`s
this.#ctx.queueBroker.resetConsumers();

// Run all before reload hooks, including mounts if we have any
await this.#runAllBeforeReloads();
if (!this.#isMount) {
Expand Down
64 changes: 63 additions & 1 deletion packages/core/test/index.mounts.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,26 @@ import {
MiniflareCore,
MiniflareCoreContext,
MiniflareCoreError,
MiniflareCoreOptions,
ReloadEvent,
} from "@miniflare/core";
import { DurableObjectsPlugin } from "@miniflare/durable-objects";
import { HTTPPlugin, createServer } from "@miniflare/http-server";
import { KVPlugin } from "@miniflare/kv";
import { QueueBroker } from "@miniflare/queues";
import {
MessageBatch,
QueueBroker,
QueueError,
QueuesPlugin,
} from "@miniflare/queues";
import { VMScriptRunner } from "@miniflare/runner-vm";
import { LogLevel, NoOpLog, StoredValueMeta } from "@miniflare/shared";
import {
AsyncTestLog,
MemoryStorageFactory,
TestLog,
TestPlugin,
triggerPromise,
useMiniflare,
useTmp,
waitForReload,
Expand Down Expand Up @@ -526,6 +533,61 @@ test("MiniflareCore: dispatches scheduled event to mount", async (t) => {
t.deepEqual(waitUntil, ["mount", 1000, "* * * * *"]);
});

test("MiniflareCore: consumes queue in mount", async (t) => {
const opts: MiniflareCoreOptions<{
CorePlugin: typeof CorePlugin;
BindingsPlugin: typeof BindingsPlugin;
QueuesPlugin: typeof QueuesPlugin;
}> = {
queueBindings: [{ name: "QUEUE", queueName: "queue" }],
modules: true,
script: `export default {
async fetch(request, env, ctx) {
env.QUEUE.send("message");
return new Response();
}
}`,
mounts: {
a: {
bindings: {
REPORTER(batch: MessageBatch) {
trigger(batch);
},
},
queueConsumers: [{ queueName: "queue", maxWaitMs: 0 }],
modules: true,
script: `export default {
queue(batch, env, ctx) {
env.REPORTER(batch);
}
}`,
},
},
};

// Check consumes messages sent in different mount
let [trigger, promise] = triggerPromise<MessageBatch>();
const mf = useMiniflare({ BindingsPlugin, QueuesPlugin }, opts);
await mf.dispatchFetch("http://localhost");
let batch = await promise;
t.is(batch.messages.length, 1);
t.is(batch.messages[0].body, "message");
// ...even after reload (https://github.com/cloudflare/miniflare/issues/560)
await mf.reload();
[trigger, promise] = triggerPromise<MessageBatch>();
await mf.dispatchFetch("http://localhost");
batch = await promise;
t.is(batch.messages.length, 1);
t.is(batch.messages[0].body, "message");

// Check queue can have at most one consumer
opts.queueConsumers = ["queue"]; // (adding parent as consumer too)
await t.throwsAsync(mf.setOptions(opts), {
instanceOf: QueueError,
code: "ERR_CONSUMER_ALREADY_SET",
});
});

// Shared storage persistence tests
type PersistOptions = Pick<
MiniflareOptions,
Expand Down
13 changes: 12 additions & 1 deletion packages/queues/src/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,14 @@ export class WorkerQueue<Body = unknown> implements QueueInterface<Body> {
}
}

[kSetConsumer](consumer: Consumer) {
[kSetConsumer](consumer?: Consumer) {
if (consumer === undefined) {
clearTimeout(this.#timeout);
this.#pendingFlush = FlushType.NONE;
this.#consumer = undefined;
return;
}

// only allow one subscription per queue (for now)
if (this.#consumer) {
throw new QueueError("ERR_CONSUMER_ALREADY_SET");
Expand Down Expand Up @@ -279,6 +286,10 @@ export class QueueBroker implements QueueBrokerInterface {
return queue;
}

resetConsumers() {
for (const queue of this.#queues.values()) queue[kSetConsumer]();
}

setConsumer(queue: WorkerQueue, consumer: Consumer) {
queue[kSetConsumer](consumer);
}
Expand Down
28 changes: 18 additions & 10 deletions packages/queues/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ export class QueuesPlugin
})
queueConsumers?: (string | ConsumerOptions)[];

readonly #consumers: Consumer[];

constructor(ctx: PluginContext, options?: QueuesOptions) {
super(ctx);
this.assignOptions(options);
Expand All @@ -90,10 +92,8 @@ export class QueuesPlugin
"Queues are experimental. There may be breaking changes in the future."
);
}
}

async setup(_storageFactory: StorageFactory): Promise<SetupResult> {
for (const entry of this.queueConsumers ?? []) {
this.#consumers = (this.queueConsumers ?? []).map((entry) => {
let opts: ConsumerOptions;
if (typeof entry === "string") {
opts = {
Expand All @@ -103,28 +103,36 @@ export class QueuesPlugin
opts = entry;
}

const consumer: Consumer = {
return {
queueName: opts.queueName,
maxBatchSize: opts.maxBatchSize ?? DEFAULT_BATCH_SIZE,
maxWaitMs: opts.maxWaitMs ?? DEFAULT_WAIT_MS,
maxRetries: opts.maxRetries ?? DEFAULT_RETRIES,
deadLetterQueue: opts.deadLetterQueue,
dispatcher: this.ctx.queueEventDispatcher,
};
});
}

const queue = this.ctx.queueBroker.getOrCreateQueue(opts.queueName);
this.ctx.queueBroker.setConsumer(queue, consumer);
}

async setup(_storageFactory: StorageFactory): Promise<SetupResult> {
const bindings: Context = {};
for (const binding of this.queueBindings ?? []) {
bindings[binding.name] = this.ctx.queueBroker.getOrCreateQueue(
binding.queueName
);
}

const requiresModuleExports =
this.queueConsumers !== undefined && this.queueConsumers.length > 0;
const requiresModuleExports = this.#consumers.length > 0;
return { bindings, requiresModuleExports };
}

beforeReload() {
// Register consumers on every reload, we'll reset them all before running
// `beforeReload()` hooks. This allows us to detect duplicate consumers
// across mounts with different `QueuesPlugin` instances.
for (const consumer of this.#consumers) {
const queue = this.ctx.queueBroker.getOrCreateQueue(consumer.queueName);
this.ctx.queueBroker.setConsumer(queue, consumer);
}
}
}
2 changes: 2 additions & 0 deletions packages/queues/test/plugin.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ test("QueuesPlugin: parses options from argv", async (t) => {
const queueBroker = new QueueBroker();
const plugin = new QueuesPlugin({ ...ctx, queueBroker }, options);
await plugin.setup(factory);
await plugin.beforeReload();

const queue1 = queueBroker.getOrCreateQueue("queue1");
t.deepEqual(queue1[kGetConsumer]()?.maxBatchSize, DEFAULT_BATCH_SIZE);
Expand Down Expand Up @@ -103,6 +104,7 @@ test("QueuesPlugin: parses options from wrangler config", async (t) => {
const queueBroker = new QueueBroker();
const plugin = new QueuesPlugin({ ...ctx, queueBroker }, options);
await plugin.setup(factory);
await plugin.beforeReload();

// queue1 uses defaults
const queue1 = queueBroker.getOrCreateQueue("queue1");
Expand Down
3 changes: 2 additions & 1 deletion packages/shared/src/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface QueueBroker {
getOrCreateQueue(name: string): Queue;

setConsumer(queue: Queue, consumer: Consumer): void;
resetConsumers(): void;
}

export interface Consumer {
Expand All @@ -33,7 +34,7 @@ export interface Queue<Body = unknown> {
send(message: Body, options?: MessageSendOptions): Promise<void>;
sendBatch(batch: Iterable<MessageSendRequest<Body>>): Promise<void>;

[kSetConsumer](consumer: Consumer): void;
[kSetConsumer](consumer?: Consumer): void;
[kGetConsumer](): Consumer | null;
}

Expand Down

0 comments on commit 3bfd4ab

Please sign in to comment.