Skip to content

feat: enhance RabbitMQ controller with improved connection management… #1855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from

Conversation

LuisSantosJS
Copy link

@LuisSantosJS LuisSantosJS commented Aug 20, 2025

… and shutdown procedures

Summary by Sourcery

Enhance the RabbitMQ controller by adding lifecycle management, connection status tracking, and refactored publishing logic

New Features:

  • Add shutdown() and closeConnection() methods for graceful controller termination
  • Expose getConnectionStatus(), isConnected(), forceReconnect(), and waitForConnection() for explicit connection control

Enhancements:

  • Introduce detailed connectionStatus states and integrate shutdown awareness into scheduled reconnection logic
  • Refactor emit logic into publishMessage() supporting both local and global publishing with durable setup, retries, and unified error handling
  • Add logging for connection events (blocked, unblocked, message returns) and detailed loss reasons

Copy link
Contributor

sourcery-ai bot commented Aug 20, 2025

Reviewer's Guide

This PR enhances the RabbitMQ controller by introducing explicit connection state tracking, graceful shutdown, improved reconnection management, extended AMQP event handling, and refactors message publishing into a unified, retry-capable helper.

Sequence diagram for RabbitmqController shutdown and connection cleanup

sequenceDiagram
  participant App
  participant RabbitmqController
  participant AMQPConnection
  participant AMQPChannel

  App->>RabbitmqController: shutdown()
  RabbitmqController->>RabbitmqController: Set isShuttingDown = true
  RabbitmqController->>RabbitmqController: Clear reconnectTimer
  RabbitmqController->>AMQPChannel: close()
  AMQPChannel-->>RabbitmqController: Channel closed
  RabbitmqController->>AMQPConnection: close()
  AMQPConnection-->>RabbitmqController: Connection closed
  RabbitmqController->>App: Shutdown complete
Loading

Sequence diagram for improved reconnection and connection state tracking

sequenceDiagram
  participant RabbitmqController
  participant AMQPConnection
  participant AMQPChannel

  RabbitmqController->>RabbitmqController: connect()
  alt isShuttingDown
    RabbitmqController-->>RabbitmqController: Abort connection
  else
    RabbitmqController->>AMQPConnection: connect()
    AMQPConnection-->>RabbitmqController: on error/close
    RabbitmqController->>RabbitmqController: handleConnectionLoss(reason, error)
    RabbitmqController->>RabbitmqController: scheduleReconnect()
    RabbitmqController->>RabbitmqController: set reconnectTimer
    RabbitmqController->>AMQPConnection: connect() (on timer)
    AMQPConnection-->>RabbitmqController: on success
    RabbitmqController->>AMQPChannel: createChannel()
    AMQPChannel-->>RabbitmqController: on success
    RabbitmqController->>RabbitmqController: connectionStatus = 'connected'
  end
Loading

Sequence diagram for unified message publishing with retries

sequenceDiagram
  participant RabbitmqController
  participant AMQPChannel

  loop up to 3 retries
    RabbitmqController->>RabbitmqController: ensureConnection()
    alt Connection available
      RabbitmqController->>AMQPChannel: assertExchange()
      RabbitmqController->>AMQPChannel: assertQueue()
      RabbitmqController->>AMQPChannel: bindQueue()
      RabbitmqController->>AMQPChannel: publish()
      AMQPChannel-->>RabbitmqController: publish result
      alt publish failed
        RabbitmqController->>RabbitmqController: handleConnectionLoss('publish_error', error)
      end
    else Connection not available
      RabbitmqController->>RabbitmqController: handleConnectionLoss('publish_error', error)
    end
  end
Loading

File-Level Changes

Change Details Files
Connection state tracking and graceful shutdown
  • Add connectionStatus, reconnectTimer and isShuttingDown fields
  • Implement shutdown() and closeConnection() for clean teardown
  • Guard connect/scheduleReconnect flows when shutting down
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts
Enhanced reconnection logic
  • Extend handleConnectionLoss to accept reason and prevent duplicate attempts
  • Track reconnect attempts, delay and cancel via reconnectTimer
  • Add forceReconnect() to reset attempts and trigger immediate reconnect
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts
Extended AMQP event listeners and status updates
  • Listen for connection blocked/unblocked events with logging
  • Update connectionStatus on connect, error, close and channel events
  • Log and handle channel/connection errors with specific reasons
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts
Unified, retryable message publishing
  • Extract publishMessage() helper for local/global paths
  • Implement assertExchange/Queue/bind and publish with callback and persistent flag
  • Retry up to 3 times with delay, trigger handleConnectionLoss on final failure
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts
Utility methods for connection health
  • Add getConnectionStatus() and isConnected() getters
  • Implement waitForConnection() polling until connected or timeout
src/api/integrations/event/rabbitmq/rabbitmq.controller.ts

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:116` </location>
<code_context>
+      return;
+    }
+
+    this.connectionStatus = this.reconnectAttempts > 0 ? 'reconnecting' : 'connecting';
+
     return new Promise<void>((resolve, reject) => {
</code_context>

<issue_to_address>
Connection status is set before async connect logic, which may lead to race conditions.

This approach can cause incorrect status if multiple connections are initiated simultaneously. Recommend updating connectionStatus only after the connection outcome, or preventing concurrent connect attempts.
</issue_to_address>

### Comment 2
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:51` </location>
<code_context>
+  private async closeConnection(): Promise<void> {
+    try {
+      if (this.amqpChannel) {
+        await new Promise<void>((resolve) => {
+          this.amqpChannel?.close((err) => {
+            if (err) {
+              this.logger.warn(`Error closing channel: ${err.message}`);
+            }
+            resolve();
+          });
+        });
</code_context>

<issue_to_address>
Callback-based close methods are wrapped in Promises but do not handle errors robustly.

Currently, errors during channel closure are only logged, not propagated. Please reject the Promise on error so upstream code can handle failures, or clearly document this behavior.

Suggested implementation:

```typescript
        await new Promise<void>((resolve, reject) => {
          this.amqpChannel?.close((err) => {
            if (err) {
              this.logger.warn(`Error closing channel: ${err.message}`);
              reject(err);
              return;
            }
            resolve();
          });
        });

```

```typescript
        await new Promise<void>((resolve, reject) => {
          this.amqpConnection?.close((err) => {
            if (err) {
              this.logger.warn(`Error closing connection: ${err.message}`);
              reject(err);
              return;
            }

```
</issue_to_address>

### Comment 3
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:90` </location>
<code_context>
+    return this.connectionStatus === 'connected' && this.amqpChannel !== null && this.amqpConnection !== null;
+  }
+
+  public async forceReconnect(): Promise<void> {
+    this.logger.info('Force reconnect requested');
+    
</code_context>

<issue_to_address>
forceReconnect resets reconnectAttempts and calls connect, but may conflict with ongoing reconnection logic.

Consider adding checks or canceling any active reconnection attempts within forceReconnect to prevent race conditions with isReconnecting and reconnectTimer.
</issue_to_address>

### Comment 4
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:211` </location>
<code_context>
-          this.amqpChannel = channel;
-          this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection
-          this.isReconnecting = false;
+          }, (exchangeError) => {
+            if (exchangeError) {
+              this.connectionStatus = 'disconnected';
</code_context>

<issue_to_address>
Callback error handling for assertExchange may not propagate errors to the connection logic.

Ensure that, if assertExchange fails, all resources are properly cleaned up or the connection is closed to avoid leaving the system in a partially initialized state.
</issue_to_address>

### Comment 5
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:292` </location>
<code_context>
     );

-    setTimeout(async () => {
+    this.reconnectTimer = setTimeout(async () => {
+      if (this.isShuttingDown) {
+        return;
</code_context>

<issue_to_address>
Reconnect timer is cleared in finally block, but may be overwritten by concurrent calls.

Concurrent calls to scheduleReconnect can overwrite reconnectTimer, causing lost references and possible memory leaks. Use a safer approach to manage timer instances.
</issue_to_address>

### Comment 6
<location> `src/api/integrations/event/rabbitmq/rabbitmq.controller.ts:459` </location>
<code_context>
-          await this.amqpChannel.bindQueue(queueName, exchangeName, event);
+        await this.amqpChannel.bindQueue(queueName, exchangeName, routingKey);
+
+        const published = await new Promise<boolean>((resolve) => {
+          const success = this.amqpChannel.publish(
+            exchangeName,
</code_context>

<issue_to_address>
publishMessage uses a callback for publish, but may not handle all error scenarios.

If the channel closes or the connection drops, the callback may not trigger, causing the promise to hang. Add error handling or a timeout to ensure the promise resolves in all scenarios.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

return;
}

this.connectionStatus = this.reconnectAttempts > 0 ? 'reconnecting' : 'connecting';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Connection status is set before async connect logic, which may lead to race conditions.

This approach can cause incorrect status if multiple connections are initiated simultaneously. Recommend updating connectionStatus only after the connection outcome, or preventing concurrent connect attempts.

Comment on lines 51 to 56
await new Promise<void>((resolve) => {
this.amqpChannel?.close((err) => {
if (err) {
this.logger.warn(`Error closing channel: ${err.message}`);
}
resolve();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Callback-based close methods are wrapped in Promises but do not handle errors robustly.

Currently, errors during channel closure are only logged, not propagated. Please reject the Promise on error so upstream code can handle failures, or clearly document this behavior.

Suggested implementation:

        await new Promise<void>((resolve, reject) => {
          this.amqpChannel?.close((err) => {
            if (err) {
              this.logger.warn(`Error closing channel: ${err.message}`);
              reject(err);
              return;
            }
            resolve();
          });
        });
        await new Promise<void>((resolve, reject) => {
          this.amqpConnection?.close((err) => {
            if (err) {
              this.logger.warn(`Error closing connection: ${err.message}`);
              reject(err);
              return;
            }

return this.connectionStatus === 'connected' && this.amqpChannel !== null && this.amqpConnection !== null;
}

public async forceReconnect(): Promise<void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): forceReconnect resets reconnectAttempts and calls connect, but may conflict with ongoing reconnection logic.

Consider adding checks or canceling any active reconnection attempts within forceReconnect to prevent race conditions with isReconnecting and reconnectTimer.

this.amqpChannel = channel;
this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection
this.isReconnecting = false;
}, (exchangeError) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Callback error handling for assertExchange may not propagate errors to the connection logic.

Ensure that, if assertExchange fails, all resources are properly cleaned up or the connection is closed to avoid leaving the system in a partially initialized state.

@@ -162,7 +289,11 @@ export class RabbitmqController extends EventController implements EventControll
`Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`,
);

setTimeout(async () => {
this.reconnectTimer = setTimeout(async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Reconnect timer is cleared in finally block, but may be overwritten by concurrent calls.

Concurrent calls to scheduleReconnect can overwrite reconnectTimer, causing lost references and possible memory leaks. Use a safer approach to manage timer instances.

await this.amqpChannel.bindQueue(queueName, exchangeName, event);
await this.amqpChannel.bindQueue(queueName, exchangeName, routingKey);

const published = await new Promise<boolean>((resolve) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: publishMessage uses a callback for publish, but may not handle all error scenarios.

If the channel closes or the connection drops, the callback may not trigger, causing the promise to hang. Add error handling or a timeout to ensure the promise resolves in all scenarios.

Comment on lines 398 to 403
if (instanceRabbitmq?.enabled && this.amqpChannel) {
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) {
const exchangeName = instanceName ?? rabbitmqExchangeName;

let retry = 0;

while (retry < 3) {
try {
await this.amqpChannel.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});

const eventName = event.replace(/_/g, '.').toLowerCase();

const queueName = `${instanceName}.${eventName}`;

await this.amqpChannel.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});

await this.amqpChannel.bindQueue(queueName, exchangeName, eventName);

await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));

if (logEnabled) {
const logData = {
local: `${origin}.sendData-RabbitMQ`,
...message,
};

this.logger.log(logData);
}

break;
} catch (error) {
this.logger.error({
local: 'RabbitmqController.emit',
message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`,
error: error.message || error,
});
retry++;
if (retry >= 3) {
this.handleConnectionLoss();
}
}
}
await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'local');
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Merge nested if conditions (merge-nested-ifs)

Suggested change
if (instanceRabbitmq?.enabled && this.amqpChannel) {
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) {
const exchangeName = instanceName ?? rabbitmqExchangeName;
let retry = 0;
while (retry < 3) {
try {
await this.amqpChannel.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});
const eventName = event.replace(/_/g, '.').toLowerCase();
const queueName = `${instanceName}.${eventName}`;
await this.amqpChannel.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});
await this.amqpChannel.bindQueue(queueName, exchangeName, eventName);
await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
if (logEnabled) {
const logData = {
local: `${origin}.sendData-RabbitMQ`,
...message,
};
this.logger.log(logData);
}
break;
} catch (error) {
this.logger.error({
local: 'RabbitmqController.emit',
message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`,
error: error.message || error,
});
retry++;
if (retry >= 3) {
this.handleConnectionLoss();
}
}
}
await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'local');
}
}
if (instanceRabbitmq?.enabled && this.amqpChannel && (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we))) {
const exchangeName = instanceName ?? rabbitmqExchangeName;
await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'local');
}


ExplanationReading deeply nested conditional code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two if conditions can be combined using
and is an easy win.

@DavidsonGomes DavidsonGomes changed the base branch from main to develop August 22, 2025 01:17
@emadland10
Copy link

Requsted and rejected or delete or all community dosc closed go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants