Skip to content

fix(v3): prevent saturated queues from dominating dequeue attempts by adding a cooloff period of successive failed dequeues #2439

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 22, 2025

Conversation

ericallam
Copy link
Member

No description provided.

… adding a cooloff period of successive failed dequeues
Copy link

changeset-bot bot commented Aug 22, 2025

⚠️ No Changeset found

Latest commit: dafbf67

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Aug 22, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

Adds per-queue cooldown controls to the shared worker queue consumer: new MarQSOptions fields sharedWorkerQueueCooloffPeriodMs and sharedWorkerQueueCooloffCountThreshold; internal maps track per-queue cooldown end timestamps and consecutive empty-dequeue counts; when a queue hits the empty-dequeue threshold it is placed into a cooldown period and skipped until the cooldown expires; counters are cleared on successful dequeues. Replaces setInterval with setIntervalAsync for periodic loops, adds a periodic cleanup of cooldown maps, integrates try/catch error handling, and registers SIGTERM/SIGINT shutdown handlers. Environment schema exposes the two new settings.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.


📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 88326b3 and dafbf67.

📒 Files selected for processing (2)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/index.server.ts (12 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/v3-dequeue-cooloff

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (6)
apps/webapp/app/v3/marqs/index.server.ts (6)

112-115: Cooloff state maps: pruning strategy and lifecycle

  • Clearing both maps every 10 minutes resets active cooloffs and counters, which can re-expose saturated queues sooner than intended. Prefer pruning only expired cooloffs while keeping counters (or age-based pruning) to preserve behavior and still bound growth.
  • The interval is never cleared. If multiple MarQS instances ever get created (tests, hot reloads), this can leak timers.

Prune precisely-expired entries and keep counts:

- this.clearCooloffPeriodInterval = setInterval(() => {
-   this.queueDequeueCooloffCounts.clear();
-   this.queueDequeueCooloffPeriod.clear();
- }, 60_000 * 10); // 10 minutes
+ this.clearCooloffPeriodInterval = setInterval(() => {
+   const now = Date.now();
+   for (const [queue, until] of this.queueDequeueCooloffPeriod) {
+     if (until <= now) this.queueDequeueCooloffPeriod.delete(queue);
+   }
+   // Optional: cap counts map size
+   if (this.queueDequeueCooloffCounts.size > 50_000) {
+     this.queueDequeueCooloffCounts.clear();
+   }
+ }, 600_000); // 10 minutes

And add a small disposer to avoid timer leaks:

public dispose() {
  clearInterval(this.clearCooloffPeriodInterval);
}

(You can call dispose() from your app shutdown hook if available.)

Also applies to: 124-129


844-857: Use .has() to check map membership and avoid truthiness pitfalls

The current truthy check would skip a 0 timestamp (unlikely here), and it reads clearer to check existence then compare.

Apply:

- const cooloffPeriod = this.queueDequeueCooloffPeriod.get(messageQueue);
- // If the queue is in a cooloff period, skip attempting to dequeue from it
- if (cooloffPeriod) {
-   // If the cooloff period is still active, skip attempting to dequeue from it
-   if (cooloffPeriod > Date.now()) {
+ const cooloffUntil = this.queueDequeueCooloffPeriod.get(messageQueue);
+ if (cooloffUntil !== undefined) {
+   if (cooloffUntil > Date.now()) {
      coolOffPeriodCount++;
      continue;
-   } else {
-     // If the cooloff period is over, delete the cooloff period and attempt to dequeue from the queue
-     this.queueDequeueCooloffPeriod.delete(messageQueue);
-   }
- }
+   }
+   // Cooloff period over; remove and attempt dequeue
+   this.queueDequeueCooloffPeriod.delete(messageQueue);
+ }

967-968: Expose threshold/period in telemetry to aid tuning

You already expose cooloff_period_count. Add the active threshold and period to spans so we can correlate tuning with effect.

Example:

 span.setAttribute("attempted_envs", attemptedEnvs);
 span.setAttribute("message_count", messageCount);
 span.setAttribute("cooloff_period_count", coolOffPeriodCount);
+span.setAttribute("cooloff_threshold", Math.max(10, this.options.sharedWorkerQueueCooloffCountThreshold ?? 10));
+span.setAttribute("cooloff_period_ms", this.options.sharedWorkerQueueCooloffPeriodMs ?? 10_000);

844-910: (Optional) Avoid cooling off queues that are about to become available

Empty dequeues can be due to:

  • hard concurrency caps (intended to cool off), or
  • scheduled messages just a few hundred ms in the future.

Cooling off the latter can delay near-ready work by the full cooloff period. If practical, consider checking the oldest score (e.g., zrange queueKey WITHSCORES 0 0) and only cool off if (oldestScore - now) exceeds a small grace window (e.g., > 2s). This is an extra Redis call, so apply behind a feature flag or only after N consecutive empties.

Happy to sketch a variant that consults oldestScore with a bounded cost and caches it for the scan pass.


835-910: Cluster-wide cooldown semantics (optional)

The cooldown state is process-local. If multiple webapp instances run the consumer, each will independently probe the same saturated queues. If this becomes an issue at scale, move the cooldown to Redis (e.g., SET queue:cooloff: with PX and NX) so all consumers respect it.

I can provide a minimal Redis-based design (SETNX + TTL) that keeps the same behavior with negligible overhead.


81-83: Naming nit: “cooldown” vs “cooloff”

We use “cooldown” elsewhere in infra contexts; consider standardizing. Variable naming is currently mixed (coolOffPeriodCount vs queueDequeueCooloffPeriod). Purely cosmetic.

Also applies to: 844-910

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 5e4756f and 88326b3.

📒 Files selected for processing (1)
  • apps/webapp/app/v3/marqs/index.server.ts (9 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • apps/webapp/app/v3/marqs/index.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: In the webapp, all environment variables must be accessed through the env export of env.server.ts, instead of directly accessing process.env.
When importing from @trigger.dev/core in the webapp, never import from the root @trigger.dev/core path; always use one of the subpath exports as defined in the package's package.json.

Files:

  • apps/webapp/app/v3/marqs/index.server.ts
🧬 Code graph analysis (1)
apps/webapp/app/v3/marqs/index.server.ts (1)
internal-packages/run-engine/src/run-queue/index.ts (1)
  • messageQueue (1348-1443)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (3)
apps/webapp/app/v3/marqs/index.server.ts (3)

46-46: Good switch to timers/promises setInterval

Importing setInterval from node:timers/promises enables an abortable, non-overlapping loop. Nice.


751-757: Non-overlapping consumer loop is safer

for-await over setIntervalAsync with an AbortSignal prevents overlapping ticks. This should reduce contention and surprise concurrency.


915-916: Resetting the counter on success is correct

Clearing the per-queue counter on successful dequeue prevents inadvertent cooloffs after activity resumes.

Comment on lines +81 to 83
sharedWorkerQueueCooloffPeriodMs?: number;
sharedWorkerQueueCooloffCountThreshold?: number;
eagerDequeuingEnabled?: boolean;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Public options added — document semantics and (optionally) wire to env/server config

Two new knobs look good. Please:

  • Document defaults and intent (e.g., “defaults: threshold min=10, period=10s”) in the type/README.
  • Consider wiring these to env so ops can tune without a deploy.

If you want them configurable now, append in getMarQSClient:

     sharedWorkerQueueConsumerIntervalMs: env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS,
     sharedWorkerQueueMaxMessageCount: env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT,
+    sharedWorkerQueueCooloffPeriodMs: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS,
+    sharedWorkerQueueCooloffCountThreshold: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD,

Run to see if env.server.ts defines these keys:


🏁 Script executed:

#!/bin/bash
rg -nP 'MARQS_SHARED_WORKER_QUEUE_COOLOFF_(PERIOD_MS|COUNT_THRESHOLD)' apps/webapp/app/env.server.ts

Length of output: 102


🏁 Script executed:

#!/bin/bash
# Locate env.server.ts in the repository
echo "Locating env.server.ts files:"
fd -HI 'env.server.ts'

# If found, search for MARQS_SHARED_WORKER_QUEUE entries
ENV_FILE=$(fd -HI 'env.server.ts' | head -n 1)
if [ -n "$ENV_FILE" ] && [ -f "$ENV_FILE" ]; then
  echo -e "\nSearching for MARQS_SHARED_WORKER_QUEUE_ in $ENV_FILE:"
  rg -n 'MARQS_SHARED_WORKER_QUEUE_' -C3 "$ENV_FILE"
else
  echo "env.server.ts not found or path could not be resolved."
fi

Length of output: 1186


Add missing environment variables and document their defaults

The two new knobs (sharedWorkerQueueCooloffPeriodMs and sharedWorkerQueueCooloffCountThreshold) aren’t defined in your Zod‐backed env.server.ts, so they won’t be configurable via environment. Please:

  • In apps/webapp/app/env.server.ts, alongside the existing MARQS_SHARED_WORKER_QUEUE_* entries, add:

    MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS:          z.coerce.number().int().default(10000), // 10 s cool-off period
    MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD:   z.coerce.number().int().default(10),    // 10 messages before cool-off
  • In apps/webapp/app/v3/marqs/index.server.ts, wire them into getMarQSClient:

         sharedWorkerQueueConsumerIntervalMs:      env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS,
         sharedWorkerQueueMaxMessageCount:         env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT,
    +    sharedWorkerQueueCooloffPeriodMs:         env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS,
    +    sharedWorkerQueueCooloffCountThreshold:  env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD,
  • Update the type/README to document each option’s intent and defaults (e.g., “defaults: cool-off period = 10000 ms; count threshold = 10 messages”).

🤖 Prompt for AI Agents
In apps/webapp/app/v3/marqs/index.server.ts around lines 81-83 the new options
sharedWorkerQueueCooloffPeriodMs and sharedWorkerQueueCooloffCountThreshold are
present but not exposed via environment; add two entries to
apps/webapp/app/env.server.ts named MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS
and MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD using
z.coerce.number().int() with defaults 10000 and 10 respectively, then read those
env keys and pass their values into getMarQSClient in
apps/webapp/app/v3/marqs/index.server.ts where the other
MARQS_SHARED_WORKER_QUEUE_* options are wired, and finally update the type
definitions/README to document both options and their defaults (cool-off period
= 10000 ms; count threshold = 10 messages).

@ericallam ericallam merged commit ad26cde into main Aug 22, 2025
26 of 27 checks passed
@ericallam ericallam deleted the fix/v3-dequeue-cooloff branch August 22, 2025 13:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants