Skip to content

Commit 33d134f

Browse files
committed
fix(v3): eagerly dequeue messages from a queue when that queue is added to or removed from (v4 backport)
1 parent c16cc57 commit 33d134f

File tree

2 files changed

+152
-0
lines changed

2 files changed

+152
-0
lines changed

apps/webapp/app/env.server.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,14 @@ const EnvironmentSchema = z.object({
413413
MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(250),
414414
MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT: z.coerce.number().int().default(10),
415415

416+
MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED: z.string().default("0"),
417+
MARQS_WORKER_ENABLED: z.string().default("0"),
418+
MARQS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(10),
419+
MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
420+
MARQS_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().default(100),
421+
MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS: z.coerce.number().int().default(100),
422+
MARQS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
423+
416424
PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),
417425

418426
VERBOSE_GRAPHILE_LOGGING: z.string().default("false"),

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ import {
4444
} from "./constants.server";
4545
import { setInterval } from "node:timers/promises";
4646
import { tryCatch } from "@trigger.dev/core/utils";
47+
import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker";
48+
import z from "zod";
49+
import { Logger } from "@trigger.dev/core/logger";
4750

4851
const KEY_PREFIX = "marqs:";
4952

@@ -74,6 +77,24 @@ export type MarQSOptions = {
7477
subscriber?: MessageQueueSubscriber;
7578
sharedWorkerQueueConsumerIntervalMs?: number;
7679
sharedWorkerQueueMaxMessageCount?: number;
80+
eagerDequeuingEnabled?: boolean;
81+
workerOptions: {
82+
pollIntervalMs?: number;
83+
immediatePollIntervalMs?: number;
84+
shutdownTimeoutMs?: number;
85+
concurrency?: WorkerConcurrencyOptions;
86+
enabled?: boolean;
87+
};
88+
};
89+
90+
const workerCatalog = {
91+
processQueueForWorkerQueue: {
92+
schema: z.object({
93+
queueKey: z.string(),
94+
parentQueueKey: z.string(),
95+
}),
96+
visibilityTimeoutMs: 30_000,
97+
},
7798
};
7899

79100
/**
@@ -83,6 +104,7 @@ export class MarQS {
83104
private redis: Redis;
84105
public keys: MarQSKeyProducer;
85106
#rebalanceWorkers: Array<AsyncWorker> = [];
107+
private worker: Worker<typeof workerCatalog>;
86108

87109
constructor(private readonly options: MarQSOptions) {
88110
this.redis = options.redis;
@@ -91,6 +113,29 @@ export class MarQS {
91113

92114
this.#startRebalanceWorkers();
93115
this.#registerCommands();
116+
117+
this.worker = new Worker({
118+
name: "marqs-worker",
119+
redisOptions: {
120+
...options.redis.options,
121+
keyPrefix: `${options.redis.options.keyPrefix}:worker`,
122+
},
123+
catalog: workerCatalog,
124+
concurrency: options.workerOptions?.concurrency,
125+
pollIntervalMs: options.workerOptions?.pollIntervalMs ?? 1000,
126+
immediatePollIntervalMs: options.workerOptions?.immediatePollIntervalMs ?? 100,
127+
shutdownTimeoutMs: options.workerOptions?.shutdownTimeoutMs ?? 10_000,
128+
logger: new Logger("MarQSWorker", "info"),
129+
jobs: {
130+
processQueueForWorkerQueue: async (job) => {
131+
await this.#processQueueForWorkerQueue(job.payload.queueKey, job.payload.parentQueueKey);
132+
},
133+
},
134+
});
135+
136+
if (options.workerOptions?.enabled) {
137+
this.worker.start();
138+
}
94139
}
95140

96141
get name() {
@@ -280,6 +325,21 @@ export class MarQS {
280325
span.setAttribute("reserve_recursive_queue", reserve.recursiveQueue);
281326
}
282327

328+
if (env.type !== "DEVELOPMENT" && this.options.eagerDequeuingEnabled) {
329+
// This will move the message to the worker queue so it can be dequeued
330+
await this.worker.enqueueOnce({
331+
id: messageQueue, // dedupe by environment, queue, and concurrency key
332+
job: "processQueueForWorkerQueue",
333+
payload: {
334+
queueKey: messageQueue,
335+
parentQueueKey: parentQueue,
336+
},
337+
// Add a small delay to dedupe messages so at most one of these will processed,
338+
// every 500ms per queue, concurrency key, and environment
339+
availableAt: new Date(Date.now() + 500), // 500ms from now
340+
});
341+
}
342+
283343
const result = await this.#callEnqueueMessage(messagePayload, reserve);
284344

285345
if (result) {
@@ -870,6 +930,64 @@ export class MarQS {
870930
);
871931
}
872932

933+
async #processQueueForWorkerQueue(queueKey: string, parentQueueKey: string) {
934+
return this.#trace("processQueueForWorkerQueue", async (span) => {
935+
span.setAttributes({
936+
[SemanticAttributes.QUEUE]: queueKey,
937+
[SemanticAttributes.PARENT_QUEUE]: parentQueueKey,
938+
});
939+
940+
const maxCount = this.options.sharedWorkerQueueMaxMessageCount ?? 10;
941+
942+
const dequeuedMessages = await this.#callDequeueMessages({
943+
messageQueue: queueKey,
944+
parentQueue: parentQueueKey,
945+
maxCount,
946+
});
947+
948+
if (!dequeuedMessages || dequeuedMessages.length === 0) {
949+
return;
950+
}
951+
952+
await this.#trace(
953+
"addToWorkerQueue",
954+
async (addToWorkerQueueSpan) => {
955+
const workerQueueKey = this.keys.sharedWorkerQueueKey();
956+
957+
addToWorkerQueueSpan.setAttributes({
958+
message_count: dequeuedMessages.length,
959+
[SemanticAttributes.PARENT_QUEUE]: workerQueueKey,
960+
});
961+
962+
await this.redis.rpush(
963+
workerQueueKey,
964+
...dequeuedMessages.map((message) => message.messageId)
965+
);
966+
},
967+
{
968+
kind: SpanKind.INTERNAL,
969+
attributes: {
970+
[SEMATTRS_MESSAGING_OPERATION]: "receive",
971+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
972+
},
973+
}
974+
);
975+
976+
// If we dequeued the max count, we need to enqueue another job to dequeue the next batch
977+
if (dequeuedMessages.length === maxCount) {
978+
await this.worker.enqueueOnce({
979+
id: queueKey,
980+
job: "processQueueForWorkerQueue",
981+
payload: {
982+
queueKey,
983+
parentQueueKey,
984+
},
985+
availableAt: new Date(Date.now() + 500), // 500ms from now
986+
});
987+
}
988+
});
989+
}
990+
873991
public async acknowledgeMessage(messageId: string, reason: string = "unknown") {
874992
return this.#trace(
875993
"acknowledgeMessage",
@@ -901,6 +1019,20 @@ export class MarQS {
9011019
messageId,
9021020
});
9031021

1022+
const sharedQueueKey = this.keys.sharedQueueKey();
1023+
1024+
if (this.options.eagerDequeuingEnabled && message.parentQueue === sharedQueueKey) {
1025+
await this.worker.enqueueOnce({
1026+
id: message.queue,
1027+
job: "processQueueForWorkerQueue",
1028+
payload: {
1029+
queueKey: message.queue,
1030+
parentQueueKey: message.parentQueue,
1031+
},
1032+
availableAt: new Date(Date.now() + 500), // 500ms from now
1033+
});
1034+
}
1035+
9041036
await this.options.subscriber?.messageAcked(message);
9051037
},
9061038
{
@@ -2482,5 +2614,17 @@ function getMarQSClient() {
24822614
subscriber: concurrencyTracker,
24832615
sharedWorkerQueueConsumerIntervalMs: env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS,
24842616
sharedWorkerQueueMaxMessageCount: env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT,
2617+
eagerDequeuingEnabled: env.MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED === "1",
2618+
workerOptions: {
2619+
enabled: env.MARQS_WORKER_ENABLED === "1",
2620+
pollIntervalMs: env.MARQS_WORKER_POLL_INTERVAL_MS,
2621+
immediatePollIntervalMs: env.MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS,
2622+
shutdownTimeoutMs: env.MARQS_WORKER_SHUTDOWN_TIMEOUT_MS,
2623+
concurrency: {
2624+
workers: env.MARQS_WORKER_CONCURRENCY_LIMIT,
2625+
tasksPerWorker: env.MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER,
2626+
limit: env.MARQS_WORKER_CONCURRENCY_LIMIT,
2627+
},
2628+
},
24852629
});
24862630
}

0 commit comments

Comments
 (0)