@@ -44,6 +44,9 @@ import {
44
44
} from "./constants.server" ;
45
45
import { setInterval } from "node:timers/promises" ;
46
46
import { tryCatch } from "@trigger.dev/core/utils" ;
47
+ import { Worker , type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker" ;
48
+ import z from "zod" ;
49
+ import { Logger } from "@trigger.dev/core/logger" ;
47
50
48
51
const KEY_PREFIX = "marqs:" ;
49
52
@@ -74,6 +77,24 @@ export type MarQSOptions = {
74
77
subscriber ?: MessageQueueSubscriber ;
75
78
sharedWorkerQueueConsumerIntervalMs ?: number ;
76
79
sharedWorkerQueueMaxMessageCount ?: number ;
80
+ eagerDequeuingEnabled ?: boolean ;
81
+ workerOptions : {
82
+ pollIntervalMs ?: number ;
83
+ immediatePollIntervalMs ?: number ;
84
+ shutdownTimeoutMs ?: number ;
85
+ concurrency ?: WorkerConcurrencyOptions ;
86
+ enabled ?: boolean ;
87
+ } ;
88
+ } ;
89
+
90
+ const workerCatalog = {
91
+ processQueueForWorkerQueue : {
92
+ schema : z . object ( {
93
+ queueKey : z . string ( ) ,
94
+ parentQueueKey : z . string ( ) ,
95
+ } ) ,
96
+ visibilityTimeoutMs : 30_000 ,
97
+ } ,
77
98
} ;
78
99
79
100
/**
@@ -83,6 +104,7 @@ export class MarQS {
83
104
private redis : Redis ;
84
105
public keys : MarQSKeyProducer ;
85
106
#rebalanceWorkers: Array < AsyncWorker > = [ ] ;
107
+ private worker : Worker < typeof workerCatalog > ;
86
108
87
109
constructor ( private readonly options : MarQSOptions ) {
88
110
this . redis = options . redis ;
@@ -91,6 +113,29 @@ export class MarQS {
91
113
92
114
this . #startRebalanceWorkers( ) ;
93
115
this . #registerCommands( ) ;
116
+
117
+ this . worker = new Worker ( {
118
+ name : "marqs-worker" ,
119
+ redisOptions : {
120
+ ...options . redis . options ,
121
+ keyPrefix : `${ options . redis . options . keyPrefix } :worker` ,
122
+ } ,
123
+ catalog : workerCatalog ,
124
+ concurrency : options . workerOptions ?. concurrency ,
125
+ pollIntervalMs : options . workerOptions ?. pollIntervalMs ?? 1000 ,
126
+ immediatePollIntervalMs : options . workerOptions ?. immediatePollIntervalMs ?? 100 ,
127
+ shutdownTimeoutMs : options . workerOptions ?. shutdownTimeoutMs ?? 10_000 ,
128
+ logger : new Logger ( "MarQSWorker" , "info" ) ,
129
+ jobs : {
130
+ processQueueForWorkerQueue : async ( job ) => {
131
+ await this . #processQueueForWorkerQueue( job . payload . queueKey , job . payload . parentQueueKey ) ;
132
+ } ,
133
+ } ,
134
+ } ) ;
135
+
136
+ if ( options . workerOptions ?. enabled ) {
137
+ this . worker . start ( ) ;
138
+ }
94
139
}
95
140
96
141
get name ( ) {
@@ -280,6 +325,21 @@ export class MarQS {
280
325
span . setAttribute ( "reserve_recursive_queue" , reserve . recursiveQueue ) ;
281
326
}
282
327
328
+ if ( env . type !== "DEVELOPMENT" && this . options . eagerDequeuingEnabled ) {
329
+ // This will move the message to the worker queue so it can be dequeued
330
+ await this . worker . enqueueOnce ( {
331
+ id : messageQueue , // dedupe by environment, queue, and concurrency key
332
+ job : "processQueueForWorkerQueue" ,
333
+ payload : {
334
+ queueKey : messageQueue ,
335
+ parentQueueKey : parentQueue ,
336
+ } ,
337
+ // Add a small delay to dedupe messages so at most one of these will processed,
338
+ // every 500ms per queue, concurrency key, and environment
339
+ availableAt : new Date ( Date . now ( ) + 500 ) , // 500ms from now
340
+ } ) ;
341
+ }
342
+
283
343
const result = await this . #callEnqueueMessage( messagePayload , reserve ) ;
284
344
285
345
if ( result ) {
@@ -870,6 +930,64 @@ export class MarQS {
870
930
) ;
871
931
}
872
932
933
+ async #processQueueForWorkerQueue( queueKey : string , parentQueueKey : string ) {
934
+ return this . #trace( "processQueueForWorkerQueue" , async ( span ) => {
935
+ span . setAttributes ( {
936
+ [ SemanticAttributes . QUEUE ] : queueKey ,
937
+ [ SemanticAttributes . PARENT_QUEUE ] : parentQueueKey ,
938
+ } ) ;
939
+
940
+ const maxCount = this . options . sharedWorkerQueueMaxMessageCount ?? 10 ;
941
+
942
+ const dequeuedMessages = await this . #callDequeueMessages( {
943
+ messageQueue : queueKey ,
944
+ parentQueue : parentQueueKey ,
945
+ maxCount,
946
+ } ) ;
947
+
948
+ if ( ! dequeuedMessages || dequeuedMessages . length === 0 ) {
949
+ return ;
950
+ }
951
+
952
+ await this . #trace(
953
+ "addToWorkerQueue" ,
954
+ async ( addToWorkerQueueSpan ) => {
955
+ const workerQueueKey = this . keys . sharedWorkerQueueKey ( ) ;
956
+
957
+ addToWorkerQueueSpan . setAttributes ( {
958
+ message_count : dequeuedMessages . length ,
959
+ [ SemanticAttributes . PARENT_QUEUE ] : workerQueueKey ,
960
+ } ) ;
961
+
962
+ await this . redis . rpush (
963
+ workerQueueKey ,
964
+ ...dequeuedMessages . map ( ( message ) => message . messageId )
965
+ ) ;
966
+ } ,
967
+ {
968
+ kind : SpanKind . INTERNAL ,
969
+ attributes : {
970
+ [ SEMATTRS_MESSAGING_OPERATION ] : "receive" ,
971
+ [ SEMATTRS_MESSAGING_SYSTEM ] : "marqs" ,
972
+ } ,
973
+ }
974
+ ) ;
975
+
976
+ // If we dequeued the max count, we need to enqueue another job to dequeue the next batch
977
+ if ( dequeuedMessages . length === maxCount ) {
978
+ await this . worker . enqueueOnce ( {
979
+ id : queueKey ,
980
+ job : "processQueueForWorkerQueue" ,
981
+ payload : {
982
+ queueKey,
983
+ parentQueueKey,
984
+ } ,
985
+ availableAt : new Date ( Date . now ( ) + 500 ) , // 500ms from now
986
+ } ) ;
987
+ }
988
+ } ) ;
989
+ }
990
+
873
991
public async acknowledgeMessage ( messageId : string , reason : string = "unknown" ) {
874
992
return this . #trace(
875
993
"acknowledgeMessage" ,
@@ -901,6 +1019,20 @@ export class MarQS {
901
1019
messageId,
902
1020
} ) ;
903
1021
1022
+ const sharedQueueKey = this . keys . sharedQueueKey ( ) ;
1023
+
1024
+ if ( this . options . eagerDequeuingEnabled && message . parentQueue === sharedQueueKey ) {
1025
+ await this . worker . enqueueOnce ( {
1026
+ id : message . queue ,
1027
+ job : "processQueueForWorkerQueue" ,
1028
+ payload : {
1029
+ queueKey : message . queue ,
1030
+ parentQueueKey : message . parentQueue ,
1031
+ } ,
1032
+ availableAt : new Date ( Date . now ( ) + 500 ) , // 500ms from now
1033
+ } ) ;
1034
+ }
1035
+
904
1036
await this . options . subscriber ?. messageAcked ( message ) ;
905
1037
} ,
906
1038
{
@@ -2482,5 +2614,17 @@ function getMarQSClient() {
2482
2614
subscriber : concurrencyTracker ,
2483
2615
sharedWorkerQueueConsumerIntervalMs : env . MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS ,
2484
2616
sharedWorkerQueueMaxMessageCount : env . MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT ,
2617
+ eagerDequeuingEnabled : env . MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED === "1" ,
2618
+ workerOptions : {
2619
+ enabled : env . MARQS_WORKER_ENABLED === "1" ,
2620
+ pollIntervalMs : env . MARQS_WORKER_POLL_INTERVAL_MS ,
2621
+ immediatePollIntervalMs : env . MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS ,
2622
+ shutdownTimeoutMs : env . MARQS_WORKER_SHUTDOWN_TIMEOUT_MS ,
2623
+ concurrency : {
2624
+ workers : env . MARQS_WORKER_CONCURRENCY_LIMIT ,
2625
+ tasksPerWorker : env . MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER ,
2626
+ limit : env . MARQS_WORKER_CONCURRENCY_LIMIT ,
2627
+ } ,
2628
+ } ,
2485
2629
} ) ;
2486
2630
}
0 commit comments