-
-
Notifications
You must be signed in to change notification settings - Fork 790
fix(v3): prevent saturated queues from dominating dequeue attempts by adding a cooloff period of successive failed dequeues #2439
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… adding a cooloff period of successive failed dequeues
|
Caution Review failedThe pull request is closed. WalkthroughAdds per-queue cooldown controls to the shared worker queue consumer: new MarQSOptions fields Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (2)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (6)
apps/webapp/app/v3/marqs/index.server.ts (6)
112-115
: Cooloff state maps: pruning strategy and lifecycle
- Clearing both maps every 10 minutes resets active cooloffs and counters, which can re-expose saturated queues sooner than intended. Prefer pruning only expired cooloffs while keeping counters (or age-based pruning) to preserve behavior and still bound growth.
- The interval is never cleared. If multiple MarQS instances ever get created (tests, hot reloads), this can leak timers.
Prune precisely-expired entries and keep counts:
- this.clearCooloffPeriodInterval = setInterval(() => { - this.queueDequeueCooloffCounts.clear(); - this.queueDequeueCooloffPeriod.clear(); - }, 60_000 * 10); // 10 minutes + this.clearCooloffPeriodInterval = setInterval(() => { + const now = Date.now(); + for (const [queue, until] of this.queueDequeueCooloffPeriod) { + if (until <= now) this.queueDequeueCooloffPeriod.delete(queue); + } + // Optional: cap counts map size + if (this.queueDequeueCooloffCounts.size > 50_000) { + this.queueDequeueCooloffCounts.clear(); + } + }, 600_000); // 10 minutesAnd add a small disposer to avoid timer leaks:
public dispose() { clearInterval(this.clearCooloffPeriodInterval); }(You can call dispose() from your app shutdown hook if available.)
Also applies to: 124-129
844-857
: Use .has() to check map membership and avoid truthiness pitfallsThe current truthy check would skip a 0 timestamp (unlikely here), and it reads clearer to check existence then compare.
Apply:
- const cooloffPeriod = this.queueDequeueCooloffPeriod.get(messageQueue); - // If the queue is in a cooloff period, skip attempting to dequeue from it - if (cooloffPeriod) { - // If the cooloff period is still active, skip attempting to dequeue from it - if (cooloffPeriod > Date.now()) { + const cooloffUntil = this.queueDequeueCooloffPeriod.get(messageQueue); + if (cooloffUntil !== undefined) { + if (cooloffUntil > Date.now()) { coolOffPeriodCount++; continue; - } else { - // If the cooloff period is over, delete the cooloff period and attempt to dequeue from the queue - this.queueDequeueCooloffPeriod.delete(messageQueue); - } - } + } + // Cooloff period over; remove and attempt dequeue + this.queueDequeueCooloffPeriod.delete(messageQueue); + }
967-968
: Expose threshold/period in telemetry to aid tuningYou already expose cooloff_period_count. Add the active threshold and period to spans so we can correlate tuning with effect.
Example:
span.setAttribute("attempted_envs", attemptedEnvs); span.setAttribute("message_count", messageCount); span.setAttribute("cooloff_period_count", coolOffPeriodCount); +span.setAttribute("cooloff_threshold", Math.max(10, this.options.sharedWorkerQueueCooloffCountThreshold ?? 10)); +span.setAttribute("cooloff_period_ms", this.options.sharedWorkerQueueCooloffPeriodMs ?? 10_000);
844-910
: (Optional) Avoid cooling off queues that are about to become availableEmpty dequeues can be due to:
- hard concurrency caps (intended to cool off), or
- scheduled messages just a few hundred ms in the future.
Cooling off the latter can delay near-ready work by the full cooloff period. If practical, consider checking the oldest score (e.g., zrange queueKey WITHSCORES 0 0) and only cool off if (oldestScore - now) exceeds a small grace window (e.g., > 2s). This is an extra Redis call, so apply behind a feature flag or only after N consecutive empties.
Happy to sketch a variant that consults oldestScore with a bounded cost and caches it for the scan pass.
835-910
: Cluster-wide cooldown semantics (optional)The cooldown state is process-local. If multiple webapp instances run the consumer, each will independently probe the same saturated queues. If this becomes an issue at scale, move the cooldown to Redis (e.g., SET queue:cooloff: with PX and NX) so all consumers respect it.
I can provide a minimal Redis-based design (SETNX + TTL) that keeps the same behavior with negligible overhead.
81-83
: Naming nit: “cooldown” vs “cooloff”We use “cooldown” elsewhere in infra contexts; consider standardizing. Variable naming is currently mixed (coolOffPeriodCount vs queueDequeueCooloffPeriod). Purely cosmetic.
Also applies to: 844-910
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
apps/webapp/app/v3/marqs/index.server.ts
(9 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}
: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/v3/marqs/index.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
apps/webapp/**/*.{ts,tsx}
: In the webapp, all environment variables must be accessed through theenv
export ofenv.server.ts
, instead of directly accessingprocess.env
.
When importing from@trigger.dev/core
in the webapp, never import from the root@trigger.dev/core
path; always use one of the subpath exports as defined in the package's package.json.
Files:
apps/webapp/app/v3/marqs/index.server.ts
🧬 Code graph analysis (1)
apps/webapp/app/v3/marqs/index.server.ts (1)
internal-packages/run-engine/src/run-queue/index.ts (1)
messageQueue
(1348-1443)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (3)
apps/webapp/app/v3/marqs/index.server.ts (3)
46-46
: Good switch to timers/promises setIntervalImporting setInterval from node:timers/promises enables an abortable, non-overlapping loop. Nice.
751-757
: Non-overlapping consumer loop is saferfor-await over setIntervalAsync with an AbortSignal prevents overlapping ticks. This should reduce contention and surprise concurrency.
915-916
: Resetting the counter on success is correctClearing the per-queue counter on successful dequeue prevents inadvertent cooloffs after activity resumes.
sharedWorkerQueueCooloffPeriodMs?: number; | ||
sharedWorkerQueueCooloffCountThreshold?: number; | ||
eagerDequeuingEnabled?: boolean; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Public options added — document semantics and (optionally) wire to env/server config
Two new knobs look good. Please:
- Document defaults and intent (e.g., “defaults: threshold min=10, period=10s”) in the type/README.
- Consider wiring these to env so ops can tune without a deploy.
If you want them configurable now, append in getMarQSClient:
sharedWorkerQueueConsumerIntervalMs: env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS,
sharedWorkerQueueMaxMessageCount: env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT,
+ sharedWorkerQueueCooloffPeriodMs: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS,
+ sharedWorkerQueueCooloffCountThreshold: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD,
Run to see if env.server.ts defines these keys:
🏁 Script executed:
#!/bin/bash
rg -nP 'MARQS_SHARED_WORKER_QUEUE_COOLOFF_(PERIOD_MS|COUNT_THRESHOLD)' apps/webapp/app/env.server.ts
Length of output: 102
🏁 Script executed:
#!/bin/bash
# Locate env.server.ts in the repository
echo "Locating env.server.ts files:"
fd -HI 'env.server.ts'
# If found, search for MARQS_SHARED_WORKER_QUEUE entries
ENV_FILE=$(fd -HI 'env.server.ts' | head -n 1)
if [ -n "$ENV_FILE" ] && [ -f "$ENV_FILE" ]; then
echo -e "\nSearching for MARQS_SHARED_WORKER_QUEUE_ in $ENV_FILE:"
rg -n 'MARQS_SHARED_WORKER_QUEUE_' -C3 "$ENV_FILE"
else
echo "env.server.ts not found or path could not be resolved."
fi
Length of output: 1186
Add missing environment variables and document their defaults
The two new knobs (sharedWorkerQueueCooloffPeriodMs
and sharedWorkerQueueCooloffCountThreshold
) aren’t defined in your Zod‐backed env.server.ts
, so they won’t be configurable via environment. Please:
-
In
apps/webapp/app/env.server.ts
, alongside the existingMARQS_SHARED_WORKER_QUEUE_*
entries, add:MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10000), // 10 s cool-off period MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10), // 10 messages before cool-off
-
In
apps/webapp/app/v3/marqs/index.server.ts
, wire them intogetMarQSClient
:sharedWorkerQueueConsumerIntervalMs: env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS, sharedWorkerQueueMaxMessageCount: env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT, + sharedWorkerQueueCooloffPeriodMs: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS, + sharedWorkerQueueCooloffCountThreshold: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD,
-
Update the type/README to document each option’s intent and defaults (e.g., “defaults: cool-off period = 10000 ms; count threshold = 10 messages”).
🤖 Prompt for AI Agents
In apps/webapp/app/v3/marqs/index.server.ts around lines 81-83 the new options
sharedWorkerQueueCooloffPeriodMs and sharedWorkerQueueCooloffCountThreshold are
present but not exposed via environment; add two entries to
apps/webapp/app/env.server.ts named MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS
and MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD using
z.coerce.number().int() with defaults 10000 and 10 respectively, then read those
env keys and pass their values into getMarQSClient in
apps/webapp/app/v3/marqs/index.server.ts where the other
MARQS_SHARED_WORKER_QUEUE_* options are wired, and finally update the type
definitions/README to document both options and their defaults (cool-off period
= 10000 ms; count threshold = 10 messages).
No description provided.