Skip to content

fix(v3): adopt v4 dequeue strategy to increase perf #2436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 22, 2025

Conversation

ericallam
Copy link
Member

We will now dequeue into a redis list as many messages as we possibly
can from actual queues, trying to dequeue up to 10 runs at a time for
each queue in an env, in separate queue consumers. Then the
SharedQueueConsumer can simply lpop off that list and process a run to
be dequeued very quickly. This should get runs through the platform side
dequeue system very quickly.

We will now dequeue into a redis list as many messages as we possibly 
can from actual queues, trying to dequeue up to 10 runs at a time for 
each queue in an env, in separate queue consumers. Then the 
SharedQueueConsumer can simply `lpop` off that list and process a run to
be dequeued very quickly. This should get runs through the platform side
dequeue system very quickly.
Copy link

changeset-bot bot commented Aug 21, 2025

⚠️ No Changeset found

Latest commit: 1bdf927

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Aug 21, 2025

Walkthrough

Adds two MARQS environment variables for consumer interval and max message count. Refactors MarQS to support batch dequeue (dequeueMessages) and introduces a shared worker queue with new Redis commands (popMessageFromWorkerQueue) and Lua script changes. Implements a background consumer loop to move messages from per-environment queues into the shared worker queue, with new options and logging/trace updates. Renames dequeueMessageInSharedQueue to dequeueMessageFromSharedWorkerQueue. Extends key producer with sharedWorkerQueueKey and updates the MarQS types to require it. SharedQueueConsumer now starts/stops the worker queue consumer and switches to the new dequeue method.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/v3-dequeue-performance

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (1)

177-177: Consider documenting the rationale for the 1-second timeout.

The reduction from 10 seconds to 1 second might affect trace collection completeness. While this change likely improves responsiveness, consider adding a comment explaining why 1 second is now sufficient.

-      traceTimeoutSeconds: options.traceTimeoutSeconds ?? 1,
+      // Reduced from 10s to 1s to improve trace responsiveness with the new dequeue strategy
+      traceTimeoutSeconds: options.traceTimeoutSeconds ?? 1,
apps/webapp/app/env.server.ts (1)

413-414: Document the performance implications of these settings.

The new environment variables control critical performance characteristics of the dequeue system. Consider adding comments to explain:

  1. The trade-off between interval frequency and CPU usage
  2. The impact of max message count on throughput vs latency
+  // Interval between worker queue consumer iterations (ms). Lower values increase throughput but use more CPU.
   MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(250),
+  // Maximum messages to dequeue per iteration. Higher values improve throughput but may increase latency.
   MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT: z.coerce.number().int().default(10),
apps/webapp/app/v3/marqs/index.server.ts (1)

676-734: Consider adding metrics for monitoring consumer health.

The background consumer implementation is solid, but consider adding:

  1. Metrics for successful vs failed iterations
  2. Alerts if the consumer stops unexpectedly
  3. Configurable backoff on repeated failures
 async #startSharedWorkerQueueConsumer(consumerId: string, abortController: AbortController) {
   let lastProcessedAt = Date.now();
   let processedCount = 0;
+  let consecutiveErrors = 0;
+  const MAX_CONSECUTIVE_ERRORS = 10;

   try {
     for await (const _ of setInterval(
       this.options.sharedWorkerQueueConsumerIntervalMs ?? 500,
       null,
       {
         signal: abortController.signal,
       }
     )) {
       // ... existing code ...

       const [error, results] = await tryCatch(this.#processSharedWorkerQueue(consumerId));

       if (error) {
+        consecutiveErrors++;
         logger.error(`Failed to process shared worker queue`, {
           error,
           service: this.name,
           consumerId,
+          consecutiveErrors,
         });

+        if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) {
+          logger.error(`Too many consecutive errors, stopping consumer`, {
+            consumerId,
+            consecutiveErrors,
+          });
+          throw new Error(`Consumer stopped after ${consecutiveErrors} consecutive errors`);
+        }

         continue;
       }

+      consecutiveErrors = 0; // Reset on success
       // ... rest of the code ...
     }
   } catch (error) {
     // ... existing error handling ...
   }
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 5b40c44 and 1bdf927.

📒 Files selected for processing (5)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/index.server.ts (19 hunks)
  • apps/webapp/app/v3/marqs/marqsKeyProducer.ts (2 hunks)
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (4 hunks)
  • apps/webapp/app/v3/marqs/types.ts (1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/marqs/types.ts
  • apps/webapp/app/v3/marqs/marqsKeyProducer.ts
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
  • apps/webapp/app/v3/marqs/index.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/marqs/types.ts
  • apps/webapp/app/v3/marqs/marqsKeyProducer.ts
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
  • apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: In the webapp, all environment variables must be accessed through the env export of env.server.ts, instead of directly accessing process.env.
When importing from @trigger.dev/core in the webapp, never import from the root @trigger.dev/core path; always use one of the subpath exports as defined in the package's package.json.

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/marqs/types.ts
  • apps/webapp/app/v3/marqs/marqsKeyProducer.ts
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
  • apps/webapp/app/v3/marqs/index.server.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
🔇 Additional comments (18)
apps/webapp/app/v3/marqs/types.ts (1)

39-39: LGTM! Interface extension is well-designed.

The addition of sharedWorkerQueueKey(): string follows the established pattern and integrates cleanly with the existing key producer interface.

apps/webapp/app/v3/marqs/marqsKeyProducer.ts (2)

5-5: LGTM! Constant follows existing naming conventions.

The SHARED_WORKER_QUEUE constant is appropriately placed and follows the established naming pattern.


112-114: LGTM! Clean implementation of the interface requirement.

The method correctly returns the constant and maintains consistency with other similar methods in the class.

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (4)

169-169: LGTM! Proper cleanup function storage.

The addition of _stopWorkerQueueConsumer follows the established pattern for managing lifecycle functions.


260-262: LGTM! Proper initialization and logging.

The consumer startup logic correctly stores the stop function and provides helpful console output.


440-440: API rename is consistent with the new architecture.

The method name change from dequeueMessageInSharedQueue to dequeueMessageFromSharedWorkerQueue better reflects the new dequeue strategy.


237-239: No additional cleanup needed for startup errors

Our investigation shows that marqs.startSharedWorkerQueueConsumer always returns a valid “stop” callback synchronously and internally catches any asynchronous startup errors:

  • In index.server.ts, startSharedWorkerQueueConsumer constructs an AbortController, kicks off the async loop (catching and logging any non-AbortError exceptions), and then immediately returns the cleanup function () => abortController.abort()—so even if the consumer loop fails, the abort callback remains available.
  • In SharedQueueConsumer#enable (sharedQueueConsumer.server.ts), _stopWorkerQueueConsumer is set to that callback before any work begins, and stop() always invokes it before resetting _enabled = false, ensuring proper teardown.

Since there are no synchronous failure paths that bypass assignment of _stopWorkerQueueConsumer, and all async errors are logged and don’t leave the consumer running, no extra error-handling or state cleanup is required.

Likely an incorrect or invalid review comment.

apps/webapp/app/v3/marqs/index.server.ts (11)

45-46: LGTM! Proper imports for the new async functionality.

The addition of setInterval from node:timers/promises and tryCatch utilities supports the new background consumer implementation.


75-76: LGTM! Options properly integrated into MarQS configuration.

The new options for controlling the shared worker queue consumer are well-placed and follow the existing naming conventions.


495-499: Consider handling edge cases in batch dequeue.

When changing from single to batch dequeue, ensure that:

  1. Empty arrays are handled correctly
  2. The transition maintains backward compatibility

566-658: Well-structured implementation of the new dequeue strategy.

The new dequeueMessageFromSharedWorkerQueue method:

  • Properly traces operations with OpenTelemetry
  • Correctly handles the two-step process (pop from worker queue, then read message)
  • Maintains compatibility with the subscriber pattern
  • Includes proper error handling

660-674: Robust consumer lifecycle management.

The startSharedWorkerQueueConsumer method properly:

  • Uses AbortController for clean shutdown
  • Handles errors without crashing
  • Returns a cleanup function

739-871: Excellent implementation of the batch processing logic.

The #processSharedWorkerQueue method:

  • Efficiently batches messages from multiple environments
  • Properly traces all operations
  • Handles errors gracefully per queue
  • Uses RPUSH for O(1) insertion into the worker queue

1478-1550: Clean adaptation to support batch dequeuing.

The changes to #callDequeueMessages:

  • Properly handle the maxCount parameter
  • Correctly parse the flattened array response from Redis
  • Maintain backward compatibility with single message dequeue (maxCount=1)

1980-2062: Well-designed Lua script for batch dequeue operations.

The dequeueMessages Lua script efficiently:

  • Calculates available capacity across both queue and environment limits
  • Atomically dequeues up to the calculated limit
  • Maintains all concurrency tracking correctly
  • Returns results in a flat array format for efficiency

2064-2082: Simple and efficient worker queue pop implementation.

The popMessageFromWorkerQueue Lua script correctly implements LPOP with queue length tracking.


2327-2347: TypeScript declarations properly updated.

The ioredis module augmentation correctly reflects the new method signatures and return types.


2483-2484: Configuration properly wired from environment variables.

The new options are correctly passed from environment variables to the MarQS configuration.

end

-- Remove the messages from the queue and update concurrency
redis.call('ZREM', queueKey, unpack(messageIds))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unpack is cool

@ericallam ericallam merged commit c16cc57 into main Aug 22, 2025
31 checks passed
@ericallam ericallam deleted the fix/v3-dequeue-performance branch August 22, 2025 06:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants