Skip to content

fix(v3): eagerly dequeue messages from a queue when that queue is added to or removed from (v4 backport) #2438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 22, 2025

Conversation

ericallam
Copy link
Member

No description provided.

Copy link

changeset-bot bot commented Aug 22, 2025

⚠️ No Changeset found

Latest commit: d44a525

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Aug 22, 2025

Walkthrough

Adds eight MARQS-related environment keys to apps/webapp/app/env.server.ts for worker enablement and tuning (eager dequeuing flag, enable flag, worker count, concurrency limits, tasks-per-worker, poll intervals, shutdown timeout). In apps/webapp/app/v3/marqs/index.server.ts, integrates a Redis-backed Worker with optional eager dequeuing, adds a worker catalog and a processQueueForWorkerQueue job, extends MarQSOptions with workerOptions and eagerDequeuingEnabled, conditionally starts the worker, and updates enqueue/acknowledge flows to schedule worker processing (500ms dedupe). Also adds zod validation and logging and wires RedisOptions from environment variables.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/v3-eager-dequeue

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (5)
apps/webapp/app/env.server.ts (1)

416-423: Optional: consider BoolEnv for the two boolean-like flags for consistency.

Most toggles here are "0"/"1" strings, but a subset uses BoolEnv. If you want stricter typing and easier consumption, you could switch MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED and MARQS_WORKER_ENABLED to BoolEnv and adjust call sites. If maintaining the existing "0"/"1" convention is intentional, ignore this.

apps/webapp/app/v3/marqs/index.server.ts (4)

47-50: Use named zod import for consistency with the codebase.

Other files in this repo/webapp import as import { z } from "zod". Align this to reduce friction.

-import z from "zod";
+import { z } from "zod";

136-138: Optional: expose a stop/close hook.

You start the worker when enabled; consider exposing a stop() that calls this.worker.stop() to support graceful shutdowns in tests or process tear-down.


933-990: Consider draining more per job to reduce scheduling overhead.

Current job enqueues itself again if it hit maxCount, with a fixed 500ms delay. For hot queues this increases Redis ops and latency. Optionally: loop until fewer than maxCount dequeued or a small time budget is exceeded.

Sketch:

-      // If we dequeued the max count, we need to enqueue another job to dequeue the next batch
-      if (dequeuedMessages.length === maxCount) {
+      // If we dequeued the max count, we need to enqueue another job to dequeue the next batch
+      if (dequeuedMessages.length === maxCount) {
         await this.worker.enqueueOnce({
           id: queueKey,
           job: "processQueueForWorkerQueue",
           payload: {
             queueKey,
             parentQueueKey,
           },
-          availableAt: new Date(Date.now() + 500), // 500ms from now
+          availableAt: new Date(Date.now() + EAGER_DEQUEUE_DEBOUNCE_MS),
         });
       }

117-135: Optional: add lightweight instrumentation around enqueueOnce failures.

If Redis is transiently unavailable, these eager-dequeue nudges will be dropped silently. Consider wrapping in a small try/catch with a warn-level log and maybe a counter metric.

Also applies to: 328-341, 933-990, 1022-1034, 2617-2629

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between c16cc57 and 33d134f.

📒 Files selected for processing (2)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/index.server.ts (8 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/marqs/index.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: In the webapp, all environment variables must be accessed through the env export of env.server.ts, instead of directly accessing process.env.
When importing from @trigger.dev/core in the webapp, never import from the root @trigger.dev/core path; always use one of the subpath exports as defined in the package's package.json.

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/marqs/index.server.ts
🧠 Learnings (1)
📚 Learning: 2025-07-18T17:49:47.180Z
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-07-18T17:49:47.180Z
Learning: Applies to apps/webapp/**/*.{ts,tsx} : In the webapp, all environment variables must be accessed through the `env` export of `env.server.ts`, instead of directly accessing `process.env`.

Applied to files:

  • apps/webapp/app/env.server.ts
🧬 Code graph analysis (1)
apps/webapp/app/v3/marqs/index.server.ts (5)
packages/redis-worker/src/worker.ts (2)
  • WorkerConcurrencyOptions (62-66)
  • Worker (876-876)
internal-packages/run-engine/src/engine/workerCatalog.ts (1)
  • workerCatalog (3-56)
apps/webapp/app/env.server.ts (1)
  • env (1078-1078)
apps/webapp/app/v3/marqs/marqsKeyProducer.ts (1)
  • sharedQueueKey (108-110)
internal-packages/run-engine/src/run-queue/keyProducer.ts (1)
  • workerQueueKey (41-43)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (4)
apps/webapp/app/v3/marqs/index.server.ts (4)

80-88: Worker options shape looks good.

The added eagerDequeuingEnabled and workerOptions provide a clean feature toggle and tuning surface. No issues.


90-98: Catalog entry is well-typed and scoped.

Defining processQueueForWorkerQueue in a local catalog with a 30s visibility timeout makes sense for this workload.


117-135: No remaining direct process.env.MARQS_ references found

A full Ripgrep scan across all .ts and .tsx files did not surface any direct process.env.MARQS_… usages, confirming that all environment‐flag access has been properly routed through the env module.


1022-1034: Gate ack-triggered eager enqueue on worker enabled & extract debounce constant

We should avoid scheduling jobs when the worker is disabled, just as we do on the enqueue path, and replace the magic “500” ms debounce with a named constant.

• Define a new constant in apps/webapp/app/v3/marqs/constants.server.ts

 export const MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS = /*...*/;
+export const MARQS_EAGER_DEQUEUE_DEBOUNCE_IN_MS = 500;

• Update the ack‐triggered eager‐enqueue at lines 1024–1034 of index.server.ts

-        if (this.options.eagerDequeuingEnabled && message.parentQueue === sharedQueueKey) {
+        if (
+          this.options.eagerDequeuingEnabled &&
+          this.options.workerOptions?.enabled &&
+          message.parentQueue === sharedQueueKey
+        ) {
           await this.worker.enqueueOnce({
             id: message.queue,
             job: "processQueueForWorkerQueue",
             payload: {
               queueKey: message.queue,
               parentQueueKey: message.parentQueue,
             },
-            availableAt: new Date(Date.now() + 500), // 500ms from now
+            availableAt: new Date(Date.now() + MARQS_EAGER_DEQUEUE_DEBOUNCE_IN_MS),
           });
         }

• Apply the same guard and constant at the reservation path (around line 328):

-        if (env.type !== "DEVELOPMENT" && this.options.eagerDequeuingEnabled) {
+        if (
+          env.type !== "DEVELOPMENT" &&
+          this.options.eagerDequeuingEnabled &&
+          this.options.workerOptions?.enabled
+        ) {
           // …
           availableAt: new Date(Date.now() + MARQS_EAGER_DEQUEUE_DEBOUNCE_IN_MS),
         }

@ericallam ericallam merged commit 5e4756f into main Aug 22, 2025
26 of 28 checks passed
@ericallam ericallam deleted the fix/v3-eager-dequeue branch August 22, 2025 12:23
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
apps/webapp/app/env.server.ts (1)

416-423: Split “workers” from “limit” and align env var naming with existing conventions.

You’ve added a distinct workers count (good), but it’s named MARQS_WORKER_COUNT, which diverges from the pattern used elsewhere (e.g., COMMON_WORKER_CONCURRENCY_WORKERS, ADMIN_WORKER_CONCURRENCY_WORKERS, etc.). Introduce MARQS_WORKER_CONCURRENCY_WORKERS (with a default that falls back to MARQS_WORKER_COUNT for backward compatibility) and wire the consumer to use it. This prevents accidental coupling between “workers” and “limit” and keeps naming consistent across the codebase.

Apply in this file:

   MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED: z.string().default("0"),
   MARQS_WORKER_ENABLED: z.string().default("0"),
-  MARQS_WORKER_COUNT: z.coerce.number().int().default(2),
+  MARQS_WORKER_COUNT: z.coerce.number().int().default(2), // legacy alias
+  // Preferred name to match existing convention (e.g., *_WORKER_CONCURRENCY_WORKERS)
+  MARQS_WORKER_CONCURRENCY_WORKERS: z.coerce
+    .number()
+    .int()
+    .default(process.env.MARQS_WORKER_COUNT ? parseInt(process.env.MARQS_WORKER_COUNT) : 2),
   MARQS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
   MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(5),
   MARQS_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().default(100),
   MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS: z.coerce.number().int().default(100),
   MARQS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),

Follow-up: update the consumer in apps/webapp/app/v3/marqs/index.server.ts to read env.MARQS_WORKER_CONCURRENCY_WORKERS first, with a fallback to env.MARQS_WORKER_COUNT.

Run this to find any other MARQS worker vars that should follow the convention:

#!/bin/bash
# Inspect MARQS worker env naming consistency
rg -n --color=never -C2 -e 'MARQS_.*WORKER.*(COUNT|CONCURRENCY_WORKERS|CONCURRENCY_LIMIT|TASKS_PER_WORKER)'
apps/webapp/app/v3/marqs/index.server.ts (2)

2616-2636: Normalize worker keyPrefix and prefer the new workers var with legacy fallback.

  • Worker keys share KEY_PREFIX with core MarQS keys; add “:worker” to isolate the worker’s namespace and avoid potential collisions.
  • Read MARQS_WORKER_CONCURRENCY_WORKERS first (consistent naming), falling back to MARQS_WORKER_COUNT for backward compatibility.
     workerOptions: {
       enabled: env.MARQS_WORKER_ENABLED === "1",
       pollIntervalMs: env.MARQS_WORKER_POLL_INTERVAL_MS,
       immediatePollIntervalMs: env.MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS,
       shutdownTimeoutMs: env.MARQS_WORKER_SHUTDOWN_TIMEOUT_MS,
       concurrency: {
-        workers: env.MARQS_WORKER_COUNT,
+        workers: env.MARQS_WORKER_CONCURRENCY_WORKERS ?? env.MARQS_WORKER_COUNT,
         tasksPerWorker: env.MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER,
         limit: env.MARQS_WORKER_CONCURRENCY_LIMIT,
       },
       redisOptions: {
-        keyPrefix: KEY_PREFIX,
+        // ensure worker keys are isolated and avoid "marqs::worker"
+        keyPrefix: `${KEY_PREFIX.replace(/:$/, "")}:worker`,
         port: env.REDIS_PORT ?? undefined,
         host: env.REDIS_HOST ?? undefined,
         username: env.REDIS_USERNAME ?? undefined,
         password: env.REDIS_PASSWORD ?? undefined,
         enableAutoPipelining: true,
         ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
       },
     },

Quick checks:

#!/bin/bash
# 1) Confirm we don't already use a dedicated marqs worker prefix
rg -n -C2 -F 'marqs::worker' || true
rg -n -C2 -F ':worker' apps/webapp/app/v3/marqs/index.server.ts

# 2) Verify consumers of workers vs limit
rg -n -C2 'MARQS_WORKER_(COUNT|CONCURRENCY_WORKERS|CONCURRENCY_LIMIT|TASKS_PER_WORKER)' apps/webapp/app

# 3) Ensure all 500ms occurrences are replaced
rg -n -C2 'Date\\.now\\(\\) \\+ 500' apps/webapp/app/v3/marqs/index.server.ts

1021-1034: Avoid enqueueing worker jobs when the worker is disabled; reuse debounce constant.

Prevents job backlog if the worker is off, and removes another magic 500.

-        if (this.options.eagerDequeuingEnabled && message.parentQueue === sharedQueueKey) {
+        if (
+          this.options.eagerDequeuingEnabled &&
+          this.options.workerOptions?.enabled &&
+          message.parentQueue === sharedQueueKey
+        ) {
           await this.worker.enqueueOnce({
             id: message.queue,
             job: "processQueueForWorkerQueue",
             payload: {
               queueKey: message.queue,
               parentQueueKey: message.parentQueue,
             },
-            availableAt: new Date(Date.now() + 500), // 500ms from now
+            availableAt: new Date(Date.now() + EAGER_DEQUEUE_DEBOUNCE_MS),
           });
         }
🧹 Nitpick comments (1)
apps/webapp/app/v3/marqs/index.server.ts (1)

932-988: Use the same debounce constant when chaining batches.

Keep the re-enqueue delay consistent with the enqueue path.

       // If we dequeued the max count, we need to enqueue another job to dequeue the next batch
       if (dequeuedMessages.length === maxCount) {
         await this.worker.enqueueOnce({
           id: queueKey,
           job: "processQueueForWorkerQueue",
           payload: {
             queueKey,
             parentQueueKey,
           },
-          availableAt: new Date(Date.now() + 500), // 500ms from now
+          availableAt: new Date(Date.now() + EAGER_DEQUEUE_DEBOUNCE_MS),
         });
       }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 33d134f and d44a525.

📒 Files selected for processing (2)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/index.server.ts (9 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/marqs/index.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/marqs/index.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: In the webapp, all environment variables must be accessed through the env export of env.server.ts, instead of directly accessing process.env.
When importing from @trigger.dev/core in the webapp, never import from the root @trigger.dev/core path; always use one of the subpath exports as defined in the package's package.json.

Files:

  • apps/webapp/app/env.server.ts
  • apps/webapp/app/v3/marqs/index.server.ts
🧬 Code graph analysis (1)
apps/webapp/app/v3/marqs/index.server.ts (2)
packages/redis-worker/src/worker.ts (2)
  • WorkerConcurrencyOptions (62-66)
  • Worker (876-876)
internal-packages/redis/src/index.ts (1)
  • RedisOptions (4-4)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
apps/webapp/app/v3/marqs/index.server.ts (1)

119-137: Worker boot lifecycle looks good.

Worker is constructed with catalog and only started when enabled; jobs route to a private method with schema-validated payload. No issues spotted here.

Comment on lines +327 to +340
if (env.type !== "DEVELOPMENT" && this.options.eagerDequeuingEnabled) {
// This will move the message to the worker queue so it can be dequeued
await this.worker.enqueueOnce({
id: messageQueue, // dedupe by environment, queue, and concurrency key
job: "processQueueForWorkerQueue",
payload: {
queueKey: messageQueue,
parentQueueKey: parentQueue,
},
// Add a small delay to dedupe messages so at most one of these will processed,
// every 500ms per queue, concurrency key, and environment
availableAt: new Date(Date.now() + 500), // 500ms from now
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Gate eager-dequeue behind worker enabled and DRY the 500ms magic number.

If the worker is disabled, enqueueOnce will accumulate jobs that never run. Also, replace the hardcoded 500ms with a shared constant.

Apply:

-        if (env.type !== "DEVELOPMENT" && this.options.eagerDequeuingEnabled) {
+        if (
+          env.type !== "DEVELOPMENT" &&
+          this.options.eagerDequeuingEnabled &&
+          this.options.workerOptions?.enabled
+        ) {
           // This will move the message to the worker queue so it can be dequeued
           await this.worker.enqueueOnce({
             id: messageQueue, // dedupe by environment, queue, and concurrency key
             job: "processQueueForWorkerQueue",
             payload: {
               queueKey: messageQueue,
               parentQueueKey: parentQueue,
             },
             // Add a small delay to dedupe messages so at most one of these will processed,
             // every 500ms per queue, concurrency key, and environment
-            availableAt: new Date(Date.now() + 500), // 500ms from now
+            availableAt: new Date(Date.now() + EAGER_DEQUEUE_DEBOUNCE_MS),
           });
         }

Add once near KEY_PREFIX:

const EAGER_DEQUEUE_DEBOUNCE_MS = 500;
🤖 Prompt for AI Agents
In apps/webapp/app/v3/marqs/index.server.ts around lines 327-340, the
eager-dequeue enqueueOnce call is executed even when the worker is disabled and
uses a hardcoded 500ms; add a constant EAGER_DEQUEUE_DEBOUNCE_MS = 500 near the
existing KEY_PREFIX declaration and replace the inline 500 with that constant,
and expand the condition to require the worker to be enabled (e.g. check
this.worker?.enabled or this.options.workerEnabled depending on the codebase) so
the enqueueOnce is only called when env.type !== "DEVELOPMENT",
eagerDequeuingEnabled is true, and the worker is enabled.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants