-
Notifications
You must be signed in to change notification settings - Fork 4k
feat: enhance RabbitMQ controller with improved connection management… #1855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
feat: enhance RabbitMQ controller with improved connection management… #1855
Conversation
… and shutdown procedures
Reviewer's GuideThis PR enhances the RabbitMQ controller by introducing explicit connection state tracking, graceful shutdown, improved reconnection management, extended AMQP event handling, and refactors message publishing into a unified, retry-capable helper. Sequence diagram for RabbitmqController shutdown and connection cleanupsequenceDiagram
participant App
participant RabbitmqController
participant AMQPConnection
participant AMQPChannel
App->>RabbitmqController: shutdown()
RabbitmqController->>RabbitmqController: Set isShuttingDown = true
RabbitmqController->>RabbitmqController: Clear reconnectTimer
RabbitmqController->>AMQPChannel: close()
AMQPChannel-->>RabbitmqController: Channel closed
RabbitmqController->>AMQPConnection: close()
AMQPConnection-->>RabbitmqController: Connection closed
RabbitmqController->>App: Shutdown complete
Sequence diagram for improved reconnection and connection state trackingsequenceDiagram
participant RabbitmqController
participant AMQPConnection
participant AMQPChannel
RabbitmqController->>RabbitmqController: connect()
alt isShuttingDown
RabbitmqController-->>RabbitmqController: Abort connection
else
RabbitmqController->>AMQPConnection: connect()
AMQPConnection-->>RabbitmqController: on error/close
RabbitmqController->>RabbitmqController: handleConnectionLoss(reason, error)
RabbitmqController->>RabbitmqController: scheduleReconnect()
RabbitmqController->>RabbitmqController: set reconnectTimer
RabbitmqController->>AMQPConnection: connect() (on timer)
AMQPConnection-->>RabbitmqController: on success
RabbitmqController->>AMQPChannel: createChannel()
AMQPChannel-->>RabbitmqController: on success
RabbitmqController->>RabbitmqController: connectionStatus = 'connected'
end
Sequence diagram for unified message publishing with retriessequenceDiagram
participant RabbitmqController
participant AMQPChannel
loop up to 3 retries
RabbitmqController->>RabbitmqController: ensureConnection()
alt Connection available
RabbitmqController->>AMQPChannel: assertExchange()
RabbitmqController->>AMQPChannel: assertQueue()
RabbitmqController->>AMQPChannel: bindQueue()
RabbitmqController->>AMQPChannel: publish()
AMQPChannel-->>RabbitmqController: publish result
alt publish failed
RabbitmqController->>RabbitmqController: handleConnectionLoss('publish_error', error)
end
else Connection not available
RabbitmqController->>RabbitmqController: handleConnectionLoss('publish_error', error)
end
end
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:116` </location>
<code_context>
+ return;
+ }
+
+ this.connectionStatus = this.reconnectAttempts > 0 ? 'reconnecting' : 'connecting';
+
return new Promise<void>((resolve, reject) => {
</code_context>
<issue_to_address>
Connection status is set before async connect logic, which may lead to race conditions.
This approach can cause incorrect status if multiple connections are initiated simultaneously. Recommend updating connectionStatus only after the connection outcome, or preventing concurrent connect attempts.
</issue_to_address>
### Comment 2
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:51` </location>
<code_context>
+ private async closeConnection(): Promise<void> {
+ try {
+ if (this.amqpChannel) {
+ await new Promise<void>((resolve) => {
+ this.amqpChannel?.close((err) => {
+ if (err) {
+ this.logger.warn(`Error closing channel: ${err.message}`);
+ }
+ resolve();
+ });
+ });
</code_context>
<issue_to_address>
Callback-based close methods are wrapped in Promises but do not handle errors robustly.
Currently, errors during channel closure are only logged, not propagated. Please reject the Promise on error so upstream code can handle failures, or clearly document this behavior.
Suggested implementation:
```typescript
await new Promise<void>((resolve, reject) => {
this.amqpChannel?.close((err) => {
if (err) {
this.logger.warn(`Error closing channel: ${err.message}`);
reject(err);
return;
}
resolve();
});
});
```
```typescript
await new Promise<void>((resolve, reject) => {
this.amqpConnection?.close((err) => {
if (err) {
this.logger.warn(`Error closing connection: ${err.message}`);
reject(err);
return;
}
```
</issue_to_address>
### Comment 3
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:90` </location>
<code_context>
+ return this.connectionStatus === 'connected' && this.amqpChannel !== null && this.amqpConnection !== null;
+ }
+
+ public async forceReconnect(): Promise<void> {
+ this.logger.info('Force reconnect requested');
+
</code_context>
<issue_to_address>
forceReconnect resets reconnectAttempts and calls connect, but may conflict with ongoing reconnection logic.
Consider adding checks or canceling any active reconnection attempts within forceReconnect to prevent race conditions with isReconnecting and reconnectTimer.
</issue_to_address>
### Comment 4
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:211` </location>
<code_context>
- this.amqpChannel = channel;
- this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection
- this.isReconnecting = false;
+ }, (exchangeError) => {
+ if (exchangeError) {
+ this.connectionStatus = 'disconnected';
</code_context>
<issue_to_address>
Callback error handling for assertExchange may not propagate errors to the connection logic.
Ensure that, if assertExchange fails, all resources are properly cleaned up or the connection is closed to avoid leaving the system in a partially initialized state.
</issue_to_address>
### Comment 5
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:292` </location>
<code_context>
);
- setTimeout(async () => {
+ this.reconnectTimer = setTimeout(async () => {
+ if (this.isShuttingDown) {
+ return;
</code_context>
<issue_to_address>
Reconnect timer is cleared in finally block, but may be overwritten by concurrent calls.
Concurrent calls to scheduleReconnect can overwrite reconnectTimer, causing lost references and possible memory leaks. Use a safer approach to manage timer instances.
</issue_to_address>
### Comment 6
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:459` </location>
<code_context>
- await this.amqpChannel.bindQueue(queueName, exchangeName, event);
+ await this.amqpChannel.bindQueue(queueName, exchangeName, routingKey);
+
+ const published = await new Promise<boolean>((resolve) => {
+ const success = this.amqpChannel.publish(
+ exchangeName,
</code_context>
<issue_to_address>
publishMessage uses a callback for publish, but may not handle all error scenarios.
If the channel closes or the connection drops, the callback may not trigger, causing the promise to hang. Add error handling or a timeout to ensure the promise resolves in all scenarios.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
return; | ||
} | ||
|
||
this.connectionStatus = this.reconnectAttempts > 0 ? 'reconnecting' : 'connecting'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Connection status is set before async connect logic, which may lead to race conditions.
This approach can cause incorrect status if multiple connections are initiated simultaneously. Recommend updating connectionStatus only after the connection outcome, or preventing concurrent connect attempts.
await new Promise<void>((resolve) => { | ||
this.amqpChannel?.close((err) => { | ||
if (err) { | ||
this.logger.warn(`Error closing channel: ${err.message}`); | ||
} | ||
resolve(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Callback-based close methods are wrapped in Promises but do not handle errors robustly.
Currently, errors during channel closure are only logged, not propagated. Please reject the Promise on error so upstream code can handle failures, or clearly document this behavior.
Suggested implementation:
await new Promise<void>((resolve, reject) => {
this.amqpChannel?.close((err) => {
if (err) {
this.logger.warn(`Error closing channel: ${err.message}`);
reject(err);
return;
}
resolve();
});
});
await new Promise<void>((resolve, reject) => {
this.amqpConnection?.close((err) => {
if (err) {
this.logger.warn(`Error closing connection: ${err.message}`);
reject(err);
return;
}
return this.connectionStatus === 'connected' && this.amqpChannel !== null && this.amqpConnection !== null; | ||
} | ||
|
||
public async forceReconnect(): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): forceReconnect resets reconnectAttempts and calls connect, but may conflict with ongoing reconnection logic.
Consider adding checks or canceling any active reconnection attempts within forceReconnect to prevent race conditions with isReconnecting and reconnectTimer.
this.amqpChannel = channel; | ||
this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection | ||
this.isReconnecting = false; | ||
}, (exchangeError) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Callback error handling for assertExchange may not propagate errors to the connection logic.
Ensure that, if assertExchange fails, all resources are properly cleaned up or the connection is closed to avoid leaving the system in a partially initialized state.
@@ -162,7 +289,11 @@ export class RabbitmqController extends EventController implements EventControll | |||
`Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`, | |||
); | |||
|
|||
setTimeout(async () => { | |||
this.reconnectTimer = setTimeout(async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Reconnect timer is cleared in finally block, but may be overwritten by concurrent calls.
Concurrent calls to scheduleReconnect can overwrite reconnectTimer, causing lost references and possible memory leaks. Use a safer approach to manage timer instances.
await this.amqpChannel.bindQueue(queueName, exchangeName, event); | ||
await this.amqpChannel.bindQueue(queueName, exchangeName, routingKey); | ||
|
||
const published = await new Promise<boolean>((resolve) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: publishMessage uses a callback for publish, but may not handle all error scenarios.
If the channel closes or the connection drops, the callback may not trigger, causing the promise to hang. Add error handling or a timeout to ensure the promise resolves in all scenarios.
if (instanceRabbitmq?.enabled && this.amqpChannel) { | ||
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { | ||
const exchangeName = instanceName ?? rabbitmqExchangeName; | ||
|
||
let retry = 0; | ||
|
||
while (retry < 3) { | ||
try { | ||
await this.amqpChannel.assertExchange(exchangeName, 'topic', { | ||
durable: true, | ||
autoDelete: false, | ||
}); | ||
|
||
const eventName = event.replace(/_/g, '.').toLowerCase(); | ||
|
||
const queueName = `${instanceName}.${eventName}`; | ||
|
||
await this.amqpChannel.assertQueue(queueName, { | ||
durable: true, | ||
autoDelete: false, | ||
arguments: { | ||
'x-queue-type': 'quorum', | ||
}, | ||
}); | ||
|
||
await this.amqpChannel.bindQueue(queueName, exchangeName, eventName); | ||
|
||
await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); | ||
|
||
if (logEnabled) { | ||
const logData = { | ||
local: `${origin}.sendData-RabbitMQ`, | ||
...message, | ||
}; | ||
|
||
this.logger.log(logData); | ||
} | ||
|
||
break; | ||
} catch (error) { | ||
this.logger.error({ | ||
local: 'RabbitmqController.emit', | ||
message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`, | ||
error: error.message || error, | ||
}); | ||
retry++; | ||
if (retry >= 3) { | ||
this.handleConnectionLoss(); | ||
} | ||
} | ||
} | ||
await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'local'); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Merge nested if conditions (merge-nested-ifs
)
if (instanceRabbitmq?.enabled && this.amqpChannel) { | |
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { | |
const exchangeName = instanceName ?? rabbitmqExchangeName; | |
let retry = 0; | |
while (retry < 3) { | |
try { | |
await this.amqpChannel.assertExchange(exchangeName, 'topic', { | |
durable: true, | |
autoDelete: false, | |
}); | |
const eventName = event.replace(/_/g, '.').toLowerCase(); | |
const queueName = `${instanceName}.${eventName}`; | |
await this.amqpChannel.assertQueue(queueName, { | |
durable: true, | |
autoDelete: false, | |
arguments: { | |
'x-queue-type': 'quorum', | |
}, | |
}); | |
await this.amqpChannel.bindQueue(queueName, exchangeName, eventName); | |
await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); | |
if (logEnabled) { | |
const logData = { | |
local: `${origin}.sendData-RabbitMQ`, | |
...message, | |
}; | |
this.logger.log(logData); | |
} | |
break; | |
} catch (error) { | |
this.logger.error({ | |
local: 'RabbitmqController.emit', | |
message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`, | |
error: error.message || error, | |
}); | |
retry++; | |
if (retry >= 3) { | |
this.handleConnectionLoss(); | |
} | |
} | |
} | |
await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'local'); | |
} | |
} | |
if (instanceRabbitmq?.enabled && this.amqpChannel && (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we))) { | |
const exchangeName = instanceName ?? rabbitmqExchangeName; | |
await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'local'); | |
} | |
Explanation
Reading deeply nested conditional code is confusing, since you have to keep track of whichconditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two
if
conditions can be combined usingand
is an easy win.
Requsted and rejected or delete or all community dosc closed go |
… and shutdown procedures
Summary by Sourcery
Enhance the RabbitMQ controller by adding lifecycle management, connection status tracking, and refactored publishing logic
New Features:
Enhancements: