-
-
Notifications
You must be signed in to change notification settings - Fork 790
fix(v3): adopt v4 dequeue strategy to increase perf #2436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
We will now dequeue into a redis list as many messages as we possibly can from actual queues, trying to dequeue up to 10 runs at a time for each queue in an env, in separate queue consumers. Then the SharedQueueConsumer can simply `lpop` off that list and process a run to be dequeued very quickly. This should get runs through the platform side dequeue system very quickly.
|
WalkthroughAdds two MARQS environment variables for consumer interval and max message count. Refactors MarQS to support batch dequeue (dequeueMessages) and introduces a shared worker queue with new Redis commands (popMessageFromWorkerQueue) and Lua script changes. Implements a background consumer loop to move messages from per-environment queues into the shared worker queue, with new options and logging/trace updates. Renames dequeueMessageInSharedQueue to dequeueMessageFromSharedWorkerQueue. Extends key producer with sharedWorkerQueueKey and updates the MarQS types to require it. SharedQueueConsumer now starts/stops the worker queue consumer and switches to the new dequeue method. Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (1)
177-177
: Consider documenting the rationale for the 1-second timeout.The reduction from 10 seconds to 1 second might affect trace collection completeness. While this change likely improves responsiveness, consider adding a comment explaining why 1 second is now sufficient.
- traceTimeoutSeconds: options.traceTimeoutSeconds ?? 1, + // Reduced from 10s to 1s to improve trace responsiveness with the new dequeue strategy + traceTimeoutSeconds: options.traceTimeoutSeconds ?? 1,apps/webapp/app/env.server.ts (1)
413-414
: Document the performance implications of these settings.The new environment variables control critical performance characteristics of the dequeue system. Consider adding comments to explain:
- The trade-off between interval frequency and CPU usage
- The impact of max message count on throughput vs latency
+ // Interval between worker queue consumer iterations (ms). Lower values increase throughput but use more CPU. MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(250), + // Maximum messages to dequeue per iteration. Higher values improve throughput but may increase latency. MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT: z.coerce.number().int().default(10),apps/webapp/app/v3/marqs/index.server.ts (1)
676-734
: Consider adding metrics for monitoring consumer health.The background consumer implementation is solid, but consider adding:
- Metrics for successful vs failed iterations
- Alerts if the consumer stops unexpectedly
- Configurable backoff on repeated failures
async #startSharedWorkerQueueConsumer(consumerId: string, abortController: AbortController) { let lastProcessedAt = Date.now(); let processedCount = 0; + let consecutiveErrors = 0; + const MAX_CONSECUTIVE_ERRORS = 10; try { for await (const _ of setInterval( this.options.sharedWorkerQueueConsumerIntervalMs ?? 500, null, { signal: abortController.signal, } )) { // ... existing code ... const [error, results] = await tryCatch(this.#processSharedWorkerQueue(consumerId)); if (error) { + consecutiveErrors++; logger.error(`Failed to process shared worker queue`, { error, service: this.name, consumerId, + consecutiveErrors, }); + if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) { + logger.error(`Too many consecutive errors, stopping consumer`, { + consumerId, + consecutiveErrors, + }); + throw new Error(`Consumer stopped after ${consecutiveErrors} consecutive errors`); + } continue; } + consecutiveErrors = 0; // Reset on success // ... rest of the code ... } } catch (error) { // ... existing error handling ... } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
apps/webapp/app/env.server.ts
(1 hunks)apps/webapp/app/v3/marqs/index.server.ts
(19 hunks)apps/webapp/app/v3/marqs/marqsKeyProducer.ts
(2 hunks)apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
(4 hunks)apps/webapp/app/v3/marqs/types.ts
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}
: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/env.server.ts
apps/webapp/app/v3/marqs/types.ts
apps/webapp/app/v3/marqs/marqsKeyProducer.ts
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
apps/webapp/app/v3/marqs/index.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/env.server.ts
apps/webapp/app/v3/marqs/types.ts
apps/webapp/app/v3/marqs/marqsKeyProducer.ts
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
apps/webapp/**/*.{ts,tsx}
: In the webapp, all environment variables must be accessed through theenv
export ofenv.server.ts
, instead of directly accessingprocess.env
.
When importing from@trigger.dev/core
in the webapp, never import from the root@trigger.dev/core
path; always use one of the subpath exports as defined in the package's package.json.
Files:
apps/webapp/app/env.server.ts
apps/webapp/app/v3/marqs/types.ts
apps/webapp/app/v3/marqs/marqsKeyProducer.ts
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
apps/webapp/app/v3/marqs/index.server.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
🔇 Additional comments (18)
apps/webapp/app/v3/marqs/types.ts (1)
39-39
: LGTM! Interface extension is well-designed.The addition of
sharedWorkerQueueKey(): string
follows the established pattern and integrates cleanly with the existing key producer interface.apps/webapp/app/v3/marqs/marqsKeyProducer.ts (2)
5-5
: LGTM! Constant follows existing naming conventions.The
SHARED_WORKER_QUEUE
constant is appropriately placed and follows the established naming pattern.
112-114
: LGTM! Clean implementation of the interface requirement.The method correctly returns the constant and maintains consistency with other similar methods in the class.
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (4)
169-169
: LGTM! Proper cleanup function storage.The addition of
_stopWorkerQueueConsumer
follows the established pattern for managing lifecycle functions.
260-262
: LGTM! Proper initialization and logging.The consumer startup logic correctly stores the stop function and provides helpful console output.
440-440
: API rename is consistent with the new architecture.The method name change from
dequeueMessageInSharedQueue
todequeueMessageFromSharedWorkerQueue
better reflects the new dequeue strategy.
237-239
: No additional cleanup needed for startup errorsOur investigation shows that
marqs.startSharedWorkerQueueConsumer
always returns a valid “stop” callback synchronously and internally catches any asynchronous startup errors:
- In
index.server.ts
,startSharedWorkerQueueConsumer
constructs anAbortController
, kicks off the async loop (catching and logging any non-AbortError
exceptions), and then immediately returns the cleanup function() => abortController.abort()
—so even if the consumer loop fails, the abort callback remains available.- In
SharedQueueConsumer#enable
(sharedQueueConsumer.server.ts),_stopWorkerQueueConsumer
is set to that callback before any work begins, andstop()
always invokes it before resetting_enabled = false
, ensuring proper teardown.Since there are no synchronous failure paths that bypass assignment of
_stopWorkerQueueConsumer
, and all async errors are logged and don’t leave the consumer running, no extra error-handling or state cleanup is required.Likely an incorrect or invalid review comment.
apps/webapp/app/v3/marqs/index.server.ts (11)
45-46
: LGTM! Proper imports for the new async functionality.The addition of
setInterval
fromnode:timers/promises
andtryCatch
utilities supports the new background consumer implementation.
75-76
: LGTM! Options properly integrated into MarQS configuration.The new options for controlling the shared worker queue consumer are well-placed and follow the existing naming conventions.
495-499
: Consider handling edge cases in batch dequeue.When changing from single to batch dequeue, ensure that:
- Empty arrays are handled correctly
- The transition maintains backward compatibility
566-658
: Well-structured implementation of the new dequeue strategy.The new
dequeueMessageFromSharedWorkerQueue
method:
- Properly traces operations with OpenTelemetry
- Correctly handles the two-step process (pop from worker queue, then read message)
- Maintains compatibility with the subscriber pattern
- Includes proper error handling
660-674
: Robust consumer lifecycle management.The
startSharedWorkerQueueConsumer
method properly:
- Uses AbortController for clean shutdown
- Handles errors without crashing
- Returns a cleanup function
739-871
: Excellent implementation of the batch processing logic.The
#processSharedWorkerQueue
method:
- Efficiently batches messages from multiple environments
- Properly traces all operations
- Handles errors gracefully per queue
- Uses RPUSH for O(1) insertion into the worker queue
1478-1550
: Clean adaptation to support batch dequeuing.The changes to
#callDequeueMessages
:
- Properly handle the maxCount parameter
- Correctly parse the flattened array response from Redis
- Maintain backward compatibility with single message dequeue (maxCount=1)
1980-2062
: Well-designed Lua script for batch dequeue operations.The
dequeueMessages
Lua script efficiently:
- Calculates available capacity across both queue and environment limits
- Atomically dequeues up to the calculated limit
- Maintains all concurrency tracking correctly
- Returns results in a flat array format for efficiency
2064-2082
: Simple and efficient worker queue pop implementation.The
popMessageFromWorkerQueue
Lua script correctly implements LPOP with queue length tracking.
2327-2347
: TypeScript declarations properly updated.The ioredis module augmentation correctly reflects the new method signatures and return types.
2483-2484
: Configuration properly wired from environment variables.The new options are correctly passed from environment variables to the MarQS configuration.
end | ||
|
||
-- Remove the messages from the queue and update concurrency | ||
redis.call('ZREM', queueKey, unpack(messageIds)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unpack is cool
We will now dequeue into a redis list as many messages as we possibly
can from actual queues, trying to dequeue up to 10 runs at a time for
each queue in an env, in separate queue consumers. Then the
SharedQueueConsumer can simply
lpop
off that list and process a run tobe dequeued very quickly. This should get runs through the platform side
dequeue system very quickly.