-
-
Notifications
You must be signed in to change notification settings - Fork 791
fix(v3): eagerly dequeue messages from a queue when that queue is added to or removed from (v4 backport) #2438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ed to or removed from (v4 backport)
|
WalkthroughAdds eight MARQS-related environment keys to apps/webapp/app/env.server.ts for worker enablement and tuning (eager dequeuing flag, enable flag, worker count, concurrency limits, tasks-per-worker, poll intervals, shutdown timeout). In apps/webapp/app/v3/marqs/index.server.ts, integrates a Redis-backed Worker with optional eager dequeuing, adds a worker catalog and a processQueueForWorkerQueue job, extends MarQSOptions with workerOptions and eagerDequeuingEnabled, conditionally starts the worker, and updates enqueue/acknowledge flows to schedule worker processing (500ms dedupe). Also adds zod validation and logging and wires RedisOptions from environment variables. Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (5)
apps/webapp/app/env.server.ts (1)
416-423
: Optional: consider BoolEnv for the two boolean-like flags for consistency.Most toggles here are
"0"/"1"
strings, but a subset usesBoolEnv
. If you want stricter typing and easier consumption, you could switchMARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED
andMARQS_WORKER_ENABLED
toBoolEnv
and adjust call sites. If maintaining the existing"0"/"1"
convention is intentional, ignore this.apps/webapp/app/v3/marqs/index.server.ts (4)
47-50
: Use named zod import for consistency with the codebase.Other files in this repo/webapp import as
import { z } from "zod"
. Align this to reduce friction.-import z from "zod"; +import { z } from "zod";
136-138
: Optional: expose a stop/close hook.You start the worker when
enabled
; consider exposing astop()
that callsthis.worker.stop()
to support graceful shutdowns in tests or process tear-down.
933-990
: Consider draining more per job to reduce scheduling overhead.Current job enqueues itself again if it hit
maxCount
, with a fixed 500ms delay. For hot queues this increases Redis ops and latency. Optionally: loop until fewer thanmaxCount
dequeued or a small time budget is exceeded.Sketch:
- // If we dequeued the max count, we need to enqueue another job to dequeue the next batch - if (dequeuedMessages.length === maxCount) { + // If we dequeued the max count, we need to enqueue another job to dequeue the next batch + if (dequeuedMessages.length === maxCount) { await this.worker.enqueueOnce({ id: queueKey, job: "processQueueForWorkerQueue", payload: { queueKey, parentQueueKey, }, - availableAt: new Date(Date.now() + 500), // 500ms from now + availableAt: new Date(Date.now() + EAGER_DEQUEUE_DEBOUNCE_MS), }); }
117-135
: Optional: add lightweight instrumentation aroundenqueueOnce
failures.If Redis is transiently unavailable, these eager-dequeue nudges will be dropped silently. Consider wrapping in a small try/catch with a warn-level log and maybe a counter metric.
Also applies to: 328-341, 933-990, 1022-1034, 2617-2629
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
apps/webapp/app/env.server.ts
(1 hunks)apps/webapp/app/v3/marqs/index.server.ts
(8 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}
: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/env.server.ts
apps/webapp/app/v3/marqs/index.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/env.server.ts
apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
apps/webapp/**/*.{ts,tsx}
: In the webapp, all environment variables must be accessed through theenv
export ofenv.server.ts
, instead of directly accessingprocess.env
.
When importing from@trigger.dev/core
in the webapp, never import from the root@trigger.dev/core
path; always use one of the subpath exports as defined in the package's package.json.
Files:
apps/webapp/app/env.server.ts
apps/webapp/app/v3/marqs/index.server.ts
🧠 Learnings (1)
📚 Learning: 2025-07-18T17:49:47.180Z
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-07-18T17:49:47.180Z
Learning: Applies to apps/webapp/**/*.{ts,tsx} : In the webapp, all environment variables must be accessed through the `env` export of `env.server.ts`, instead of directly accessing `process.env`.
Applied to files:
apps/webapp/app/env.server.ts
🧬 Code graph analysis (1)
apps/webapp/app/v3/marqs/index.server.ts (5)
packages/redis-worker/src/worker.ts (2)
WorkerConcurrencyOptions
(62-66)Worker
(876-876)internal-packages/run-engine/src/engine/workerCatalog.ts (1)
workerCatalog
(3-56)apps/webapp/app/env.server.ts (1)
env
(1078-1078)apps/webapp/app/v3/marqs/marqsKeyProducer.ts (1)
sharedQueueKey
(108-110)internal-packages/run-engine/src/run-queue/keyProducer.ts (1)
workerQueueKey
(41-43)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (4)
apps/webapp/app/v3/marqs/index.server.ts (4)
80-88
: Worker options shape looks good.The added
eagerDequeuingEnabled
andworkerOptions
provide a clean feature toggle and tuning surface. No issues.
90-98
: Catalog entry is well-typed and scoped.Defining
processQueueForWorkerQueue
in a local catalog with a 30s visibility timeout makes sense for this workload.
117-135
: No remaining directprocess.env.MARQS_
references foundA full Ripgrep scan across all
.ts
and.tsx
files did not surface any directprocess.env.MARQS_…
usages, confirming that all environment‐flag access has been properly routed through theenv
module.
1022-1034
: Gate ack-triggered eager enqueue on worker enabled & extract debounce constantWe should avoid scheduling jobs when the worker is disabled, just as we do on the enqueue path, and replace the magic “500” ms debounce with a named constant.
• Define a new constant in
apps/webapp/app/v3/marqs/constants.server.ts
export const MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS = /*...*/; +export const MARQS_EAGER_DEQUEUE_DEBOUNCE_IN_MS = 500;
• Update the ack‐triggered eager‐enqueue at lines 1024–1034 of
index.server.ts
- if (this.options.eagerDequeuingEnabled && message.parentQueue === sharedQueueKey) { + if ( + this.options.eagerDequeuingEnabled && + this.options.workerOptions?.enabled && + message.parentQueue === sharedQueueKey + ) { await this.worker.enqueueOnce({ id: message.queue, job: "processQueueForWorkerQueue", payload: { queueKey: message.queue, parentQueueKey: message.parentQueue, }, - availableAt: new Date(Date.now() + 500), // 500ms from now + availableAt: new Date(Date.now() + MARQS_EAGER_DEQUEUE_DEBOUNCE_IN_MS), }); }• Apply the same guard and constant at the reservation path (around line 328):
- if (env.type !== "DEVELOPMENT" && this.options.eagerDequeuingEnabled) { + if ( + env.type !== "DEVELOPMENT" && + this.options.eagerDequeuingEnabled && + this.options.workerOptions?.enabled + ) { // … availableAt: new Date(Date.now() + MARQS_EAGER_DEQUEUE_DEBOUNCE_IN_MS), }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
apps/webapp/app/env.server.ts (1)
416-423
: Split “workers” from “limit” and align env var naming with existing conventions.You’ve added a distinct workers count (good), but it’s named MARQS_WORKER_COUNT, which diverges from the pattern used elsewhere (e.g., COMMON_WORKER_CONCURRENCY_WORKERS, ADMIN_WORKER_CONCURRENCY_WORKERS, etc.). Introduce MARQS_WORKER_CONCURRENCY_WORKERS (with a default that falls back to MARQS_WORKER_COUNT for backward compatibility) and wire the consumer to use it. This prevents accidental coupling between “workers” and “limit” and keeps naming consistent across the codebase.
Apply in this file:
MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED: z.string().default("0"), MARQS_WORKER_ENABLED: z.string().default("0"), - MARQS_WORKER_COUNT: z.coerce.number().int().default(2), + MARQS_WORKER_COUNT: z.coerce.number().int().default(2), // legacy alias + // Preferred name to match existing convention (e.g., *_WORKER_CONCURRENCY_WORKERS) + MARQS_WORKER_CONCURRENCY_WORKERS: z.coerce + .number() + .int() + .default(process.env.MARQS_WORKER_COUNT ? parseInt(process.env.MARQS_WORKER_COUNT) : 2), MARQS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50), MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(5), MARQS_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().default(100), MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS: z.coerce.number().int().default(100), MARQS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),Follow-up: update the consumer in apps/webapp/app/v3/marqs/index.server.ts to read env.MARQS_WORKER_CONCURRENCY_WORKERS first, with a fallback to env.MARQS_WORKER_COUNT.
Run this to find any other MARQS worker vars that should follow the convention:
#!/bin/bash # Inspect MARQS worker env naming consistency rg -n --color=never -C2 -e 'MARQS_.*WORKER.*(COUNT|CONCURRENCY_WORKERS|CONCURRENCY_LIMIT|TASKS_PER_WORKER)'apps/webapp/app/v3/marqs/index.server.ts (2)
2616-2636
: Normalize worker keyPrefix and prefer the new workers var with legacy fallback.
- Worker keys share KEY_PREFIX with core MarQS keys; add “:worker” to isolate the worker’s namespace and avoid potential collisions.
- Read MARQS_WORKER_CONCURRENCY_WORKERS first (consistent naming), falling back to MARQS_WORKER_COUNT for backward compatibility.
workerOptions: { enabled: env.MARQS_WORKER_ENABLED === "1", pollIntervalMs: env.MARQS_WORKER_POLL_INTERVAL_MS, immediatePollIntervalMs: env.MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS, shutdownTimeoutMs: env.MARQS_WORKER_SHUTDOWN_TIMEOUT_MS, concurrency: { - workers: env.MARQS_WORKER_COUNT, + workers: env.MARQS_WORKER_CONCURRENCY_WORKERS ?? env.MARQS_WORKER_COUNT, tasksPerWorker: env.MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER, limit: env.MARQS_WORKER_CONCURRENCY_LIMIT, }, redisOptions: { - keyPrefix: KEY_PREFIX, + // ensure worker keys are isolated and avoid "marqs::worker" + keyPrefix: `${KEY_PREFIX.replace(/:$/, "")}:worker`, port: env.REDIS_PORT ?? undefined, host: env.REDIS_HOST ?? undefined, username: env.REDIS_USERNAME ?? undefined, password: env.REDIS_PASSWORD ?? undefined, enableAutoPipelining: true, ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }, },Quick checks:
#!/bin/bash # 1) Confirm we don't already use a dedicated marqs worker prefix rg -n -C2 -F 'marqs::worker' || true rg -n -C2 -F ':worker' apps/webapp/app/v3/marqs/index.server.ts # 2) Verify consumers of workers vs limit rg -n -C2 'MARQS_WORKER_(COUNT|CONCURRENCY_WORKERS|CONCURRENCY_LIMIT|TASKS_PER_WORKER)' apps/webapp/app # 3) Ensure all 500ms occurrences are replaced rg -n -C2 'Date\\.now\\(\\) \\+ 500' apps/webapp/app/v3/marqs/index.server.ts
1021-1034
: Avoid enqueueing worker jobs when the worker is disabled; reuse debounce constant.Prevents job backlog if the worker is off, and removes another magic 500.
- if (this.options.eagerDequeuingEnabled && message.parentQueue === sharedQueueKey) { + if ( + this.options.eagerDequeuingEnabled && + this.options.workerOptions?.enabled && + message.parentQueue === sharedQueueKey + ) { await this.worker.enqueueOnce({ id: message.queue, job: "processQueueForWorkerQueue", payload: { queueKey: message.queue, parentQueueKey: message.parentQueue, }, - availableAt: new Date(Date.now() + 500), // 500ms from now + availableAt: new Date(Date.now() + EAGER_DEQUEUE_DEBOUNCE_MS), }); }
🧹 Nitpick comments (1)
apps/webapp/app/v3/marqs/index.server.ts (1)
932-988
: Use the same debounce constant when chaining batches.Keep the re-enqueue delay consistent with the enqueue path.
// If we dequeued the max count, we need to enqueue another job to dequeue the next batch if (dequeuedMessages.length === maxCount) { await this.worker.enqueueOnce({ id: queueKey, job: "processQueueForWorkerQueue", payload: { queueKey, parentQueueKey, }, - availableAt: new Date(Date.now() + 500), // 500ms from now + availableAt: new Date(Date.now() + EAGER_DEQUEUE_DEBOUNCE_MS), }); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
apps/webapp/app/env.server.ts
(1 hunks)apps/webapp/app/v3/marqs/index.server.ts
(9 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}
: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/env.server.ts
apps/webapp/app/v3/marqs/index.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/env.server.ts
apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
apps/webapp/**/*.{ts,tsx}
: In the webapp, all environment variables must be accessed through theenv
export ofenv.server.ts
, instead of directly accessingprocess.env
.
When importing from@trigger.dev/core
in the webapp, never import from the root@trigger.dev/core
path; always use one of the subpath exports as defined in the package's package.json.
Files:
apps/webapp/app/env.server.ts
apps/webapp/app/v3/marqs/index.server.ts
🧬 Code graph analysis (1)
apps/webapp/app/v3/marqs/index.server.ts (2)
packages/redis-worker/src/worker.ts (2)
WorkerConcurrencyOptions
(62-66)Worker
(876-876)internal-packages/redis/src/index.ts (1)
RedisOptions
(4-4)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
apps/webapp/app/v3/marqs/index.server.ts (1)
119-137
: Worker boot lifecycle looks good.Worker is constructed with catalog and only started when enabled; jobs route to a private method with schema-validated payload. No issues spotted here.
if (env.type !== "DEVELOPMENT" && this.options.eagerDequeuingEnabled) { | ||
// This will move the message to the worker queue so it can be dequeued | ||
await this.worker.enqueueOnce({ | ||
id: messageQueue, // dedupe by environment, queue, and concurrency key | ||
job: "processQueueForWorkerQueue", | ||
payload: { | ||
queueKey: messageQueue, | ||
parentQueueKey: parentQueue, | ||
}, | ||
// Add a small delay to dedupe messages so at most one of these will processed, | ||
// every 500ms per queue, concurrency key, and environment | ||
availableAt: new Date(Date.now() + 500), // 500ms from now | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gate eager-dequeue behind worker enabled and DRY the 500ms magic number.
If the worker is disabled, enqueueOnce will accumulate jobs that never run. Also, replace the hardcoded 500ms with a shared constant.
Apply:
- if (env.type !== "DEVELOPMENT" && this.options.eagerDequeuingEnabled) {
+ if (
+ env.type !== "DEVELOPMENT" &&
+ this.options.eagerDequeuingEnabled &&
+ this.options.workerOptions?.enabled
+ ) {
// This will move the message to the worker queue so it can be dequeued
await this.worker.enqueueOnce({
id: messageQueue, // dedupe by environment, queue, and concurrency key
job: "processQueueForWorkerQueue",
payload: {
queueKey: messageQueue,
parentQueueKey: parentQueue,
},
// Add a small delay to dedupe messages so at most one of these will processed,
// every 500ms per queue, concurrency key, and environment
- availableAt: new Date(Date.now() + 500), // 500ms from now
+ availableAt: new Date(Date.now() + EAGER_DEQUEUE_DEBOUNCE_MS),
});
}
Add once near KEY_PREFIX:
const EAGER_DEQUEUE_DEBOUNCE_MS = 500;
🤖 Prompt for AI Agents
In apps/webapp/app/v3/marqs/index.server.ts around lines 327-340, the
eager-dequeue enqueueOnce call is executed even when the worker is disabled and
uses a hardcoded 500ms; add a constant EAGER_DEQUEUE_DEBOUNCE_MS = 500 near the
existing KEY_PREFIX declaration and replace the inline 500 with that constant,
and expand the condition to require the worker to be enabled (e.g. check
this.worker?.enabled or this.options.workerEnabled depending on the codebase) so
the enqueueOnce is only called when env.type !== "DEVELOPMENT",
eagerDequeuingEnabled is true, and the worker is enabled.
No description provided.