From 7a99fba556c183194bdf64e7dc4aed5026a3f55a Mon Sep 17 00:00:00 2001 From: luissantosjs Date: Wed, 20 Aug 2025 12:29:45 +0100 Subject: [PATCH 1/2] feat: enhance RabbitMQ controller with improved connection management and shutdown procedures --- .../event/rabbitmq/rabbitmq.controller.ts | 367 +++++++++++++----- 1 file changed, 263 insertions(+), 104 deletions(-) diff --git a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts index be73b157b..a6d1e565a 100644 --- a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts +++ b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts @@ -14,6 +14,9 @@ export class RabbitmqController extends EventController implements EventControll private maxReconnectAttempts = 10; private reconnectDelay = 5000; // 5 seconds private isReconnecting = false; + private reconnectTimer: NodeJS.Timeout | null = null; + private connectionStatus: 'connected' | 'disconnected' | 'connecting' | 'reconnecting' = 'disconnected'; + private isShuttingDown = false; constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { super(prismaRepository, waMonitor, configService.get('RABBITMQ')?.ENABLED, 'rabbitmq'); @@ -27,7 +30,91 @@ export class RabbitmqController extends EventController implements EventControll await this.connect(); } + public async shutdown(): Promise { + this.logger.info('Shutting down RabbitMQ controller...'); + this.isShuttingDown = true; + + // Clear any pending reconnect timer + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + + // Close channel and connection gracefully + await this.closeConnection(); + this.logger.info('RabbitMQ controller shutdown complete'); + } + + private async closeConnection(): Promise { + try { + if (this.amqpChannel) { + await new Promise((resolve) => { + this.amqpChannel?.close((err) => { + if (err) { + this.logger.warn(`Error closing channel: ${err.message}`); + } + resolve(); + }); + }); + this.amqpChannel = null; + } + + if (this.amqpConnection) { + await new Promise((resolve) => { + this.amqpConnection?.close((err) => { + if (err) { + this.logger.warn(`Error closing connection: ${err.message}`); + } + resolve(); + }); + }); + this.amqpConnection = null; + } + } catch (error) { + this.logger.error({ + local: 'RabbitmqController.closeConnection', + message: 'Error during connection cleanup', + error: error.message || error, + }); + } + } + + public getConnectionStatus(): string { + return this.connectionStatus; + } + + public isConnected(): boolean { + return this.connectionStatus === 'connected' && this.amqpChannel !== null && this.amqpConnection !== null; + } + + public async forceReconnect(): Promise { + this.logger.info('Force reconnect requested'); + + // Reset reconnect attempts for forced reconnect + this.reconnectAttempts = 0; + + // Close existing connections + await this.closeConnection(); + + // Clear any pending reconnect + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + + this.isReconnecting = false; + + // Attempt immediate reconnection + await this.connect(); + } + private async connect(): Promise { + if (this.isShuttingDown) { + return; + } + + this.connectionStatus = this.reconnectAttempts > 0 ? 'reconnecting' : 'connecting'; + return new Promise((resolve, reject) => { const uri = configService.get('RABBITMQ').URI; const frameMax = configService.get('RABBITMQ').FRAME_MAX; @@ -47,6 +134,7 @@ export class RabbitmqController extends EventController implements EventControll amqp.connect(connectionOptions, (error, connection) => { if (error) { + this.connectionStatus = 'disconnected'; this.logger.error({ local: 'RabbitmqController.connect', message: 'Failed to connect to RabbitMQ', @@ -63,16 +151,25 @@ export class RabbitmqController extends EventController implements EventControll message: 'RabbitMQ connection error', error: err.message || err, }); - this.handleConnectionLoss(); + this.handleConnectionLoss('connection_error', err); }); connection.on('close', () => { this.logger.warn('RabbitMQ connection closed'); - this.handleConnectionLoss(); + this.handleConnectionLoss('connection_closed'); + }); + + connection.on('blocked', (reason) => { + this.logger.warn(`RabbitMQ connection blocked: ${reason}`); + }); + + connection.on('unblocked', () => { + this.logger.info('RabbitMQ connection unblocked'); }); connection.createChannel((channelError, channel) => { if (channelError) { + this.connectionStatus = 'disconnected'; this.logger.error({ local: 'RabbitmqController.createChannel', message: 'Failed to create RabbitMQ channel', @@ -89,12 +186,21 @@ export class RabbitmqController extends EventController implements EventControll message: 'RabbitMQ channel error', error: err.message || err, }); - this.handleConnectionLoss(); + this.handleConnectionLoss('channel_error', err); }); channel.on('close', () => { this.logger.warn('RabbitMQ channel closed'); - this.handleConnectionLoss(); + this.handleConnectionLoss('channel_closed'); + }); + + channel.on('return', (msg) => { + this.logger.warn('RabbitMQ message returned' + JSON.stringify({ + exchange: msg.fields.exchange, + routingKey: msg.fields.routingKey, + replyCode: msg.fields.replyCode, + replyText: msg.fields.replyText, + })); }); const exchangeName = rabbitmqExchangeName; @@ -102,25 +208,37 @@ export class RabbitmqController extends EventController implements EventControll channel.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, - }); - - this.amqpConnection = connection; - this.amqpChannel = channel; - this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection - this.isReconnecting = false; + }, (exchangeError) => { + if (exchangeError) { + this.connectionStatus = 'disconnected'; + this.logger.error({ + local: 'RabbitmqController.assertExchange', + message: 'Failed to assert exchange', + error: exchangeError.message || exchangeError, + }); + reject(exchangeError); + return; + } - this.logger.info('AMQP initialized successfully'); + this.amqpConnection = connection; + this.amqpChannel = channel; + this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection + this.isReconnecting = false; + this.connectionStatus = 'connected'; - resolve(); + this.logger.info('AMQP initialized successfully'); + resolve(); + }); }); }); }) .then(() => { if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) { - this.initGlobalQueues(); + return this.initGlobalQueues(); } }) .catch((error) => { + this.connectionStatus = 'disconnected'; this.logger.error({ local: 'RabbitmqController.init', message: 'Failed to initialize AMQP', @@ -131,21 +249,30 @@ export class RabbitmqController extends EventController implements EventControll }); } - private handleConnectionLoss(): void { - if (this.isReconnecting) { - return; // Already attempting to reconnect + private handleConnectionLoss(reason?: string, error?: any): void { + if (this.isReconnecting || this.isShuttingDown) { + return; // Already attempting to reconnect or shutting down } + this.logger.warn(`Connection lost due to: ${reason || 'unknown reason'}` + JSON.stringify(error ? { error: error.message || error } : {})); + + this.connectionStatus = 'disconnected'; this.amqpChannel = null; this.amqpConnection = null; + this.scheduleReconnect(); } private scheduleReconnect(): void { + if (this.isShuttingDown) { + return; + } + if (this.reconnectAttempts >= this.maxReconnectAttempts) { this.logger.error( `Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`, ); + this.connectionStatus = 'disconnected'; return; } @@ -162,7 +289,11 @@ export class RabbitmqController extends EventController implements EventControll `Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`, ); - setTimeout(async () => { + this.reconnectTimer = setTimeout(async () => { + if (this.isShuttingDown) { + return; + } + try { this.logger.info( `Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`, @@ -177,6 +308,8 @@ export class RabbitmqController extends EventController implements EventControll }); this.isReconnecting = false; this.scheduleReconnect(); + } finally { + this.reconnectTimer = null; } }, delay); } @@ -190,9 +323,9 @@ export class RabbitmqController extends EventController implements EventControll } private async ensureConnection(): Promise { - if (!this.amqpChannel) { + if (!this.amqpChannel || !this.isConnected()) { this.logger.warn('AMQP channel is not available, attempting to reconnect...'); - if (!this.isReconnecting) { + if (!this.isReconnecting && !this.isShuttingDown) { this.scheduleReconnect(); } return false; @@ -200,6 +333,25 @@ export class RabbitmqController extends EventController implements EventControll return true; } + public async waitForConnection(timeoutMs: number = 30000): Promise { + const startTime = Date.now(); + + while (Date.now() - startTime < timeoutMs) { + if (this.isConnected()) { + return true; + } + + if (this.isShuttingDown) { + return false; + } + + // Wait 100ms before checking again + await new Promise(resolve => setTimeout(resolve, 100)); + } + + return false; + } + public async emit({ instanceName, origin, @@ -246,106 +398,113 @@ export class RabbitmqController extends EventController implements EventControll if (instanceRabbitmq?.enabled && this.amqpChannel) { if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { const exchangeName = instanceName ?? rabbitmqExchangeName; - - let retry = 0; - - while (retry < 3) { - try { - await this.amqpChannel.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); - - const eventName = event.replace(/_/g, '.').toLowerCase(); - - const queueName = `${instanceName}.${eventName}`; - - await this.amqpChannel.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); - - await this.amqpChannel.bindQueue(queueName, exchangeName, eventName); - - await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); - - if (logEnabled) { - const logData = { - local: `${origin}.sendData-RabbitMQ`, - ...message, - }; - - this.logger.log(logData); - } - - break; - } catch (error) { - this.logger.error({ - local: 'RabbitmqController.emit', - message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`, - error: error.message || error, - }); - retry++; - if (retry >= 3) { - this.handleConnectionLoss(); - } - } - } + await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'local'); } } if (rabbitmqGlobal && rabbitmqEvents[we] && this.amqpChannel) { const exchangeName = rabbitmqExchangeName; + await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'global'); + } + } - let retry = 0; + private async publishMessage( + exchangeName: string, + event: string, + message: any, + instanceName: string, + origin: string, + logEnabled: boolean, + type: 'local' | 'global' + ): Promise { + let retry = 0; + const maxRetries = 3; + + while (retry < maxRetries) { + try { + if (!(await this.ensureConnection())) { + throw new Error('No AMQP connection available'); + } - while (retry < 3) { - try { - await this.amqpChannel.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); + await this.amqpChannel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); - const queueName = prefixKey + let queueName: string; + let routingKey: string; + + if (type === 'local') { + const eventName = event.replace(/_/g, '.').toLowerCase(); + queueName = `${instanceName}.${eventName}`; + routingKey = eventName; + } else { + const prefixKey = configService.get('RABBITMQ').PREFIX_KEY; + queueName = prefixKey ? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}` : event.replace(/_/g, '.').toLowerCase(); + routingKey = event; + } - await this.amqpChannel.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); + await this.amqpChannel.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); - await this.amqpChannel.bindQueue(queueName, exchangeName, event); + await this.amqpChannel.bindQueue(queueName, exchangeName, routingKey); + + const published = await new Promise((resolve) => { + const success = this.amqpChannel.publish( + exchangeName, + routingKey, + Buffer.from(JSON.stringify(message)), + { persistent: true }, + (err) => { + if (err) { + resolve(false); + } else { + resolve(true); + } + } + ); + + if (!success) { + resolve(false); + } + }); - await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + if (!published) { + throw new Error('Failed to publish message - channel write buffer full'); + } - if (logEnabled) { - const logData = { - local: `${origin}.sendData-RabbitMQ-Global`, - ...message, - }; + if (logEnabled) { + const logData = { + local: `${origin}.sendData-RabbitMQ${type === 'global' ? '-Global' : ''}`, + ...message, + }; - this.logger.log(logData); - } + this.logger.log(logData); + } - break; - } catch (error) { - this.logger.error({ - local: 'RabbitmqController.emit', - message: `Error publishing global RabbitMQ message (attempt ${retry + 1}/3)`, - error: error.message || error, - }); - retry++; - if (retry >= 3) { - this.handleConnectionLoss(); - } + break; // Success, exit retry loop + } catch (error) { + this.logger.error({ + local: 'RabbitmqController.publishMessage', + message: `Error publishing ${type} RabbitMQ message (attempt ${retry + 1}/${maxRetries})`, + error: error.message || error, + }); + retry++; + + if (retry >= maxRetries) { + this.handleConnectionLoss('publish_error', error); + throw error; } + + // Wait before retry + await new Promise(resolve => setTimeout(resolve, 1000 * retry)); } } } @@ -401,9 +560,9 @@ export class RabbitmqController extends EventController implements EventControll message: `Failed to initialize global queue for event ${event}`, error: error.message || error, }); - this.handleConnectionLoss(); + this.handleConnectionLoss('queue_init_error', error); break; } } } -} +} \ No newline at end of file From 4681576cfca1a6042dc615ba724ce59ba4a3579d Mon Sep 17 00:00:00 2001 From: luissantosjs Date: Fri, 22 Aug 2025 22:49:40 +0100 Subject: [PATCH 2/2] up --- .../event/rabbitmq/rabbitmq.controller.ts | 365 +++++------------- 1 file changed, 103 insertions(+), 262 deletions(-) diff --git a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts index a6d1e565a..24468ac87 100644 --- a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts +++ b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts @@ -14,9 +14,6 @@ export class RabbitmqController extends EventController implements EventControll private maxReconnectAttempts = 10; private reconnectDelay = 5000; // 5 seconds private isReconnecting = false; - private reconnectTimer: NodeJS.Timeout | null = null; - private connectionStatus: 'connected' | 'disconnected' | 'connecting' | 'reconnecting' = 'disconnected'; - private isShuttingDown = false; constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { super(prismaRepository, waMonitor, configService.get('RABBITMQ')?.ENABLED, 'rabbitmq'); @@ -30,91 +27,7 @@ export class RabbitmqController extends EventController implements EventControll await this.connect(); } - public async shutdown(): Promise { - this.logger.info('Shutting down RabbitMQ controller...'); - this.isShuttingDown = true; - - // Clear any pending reconnect timer - if (this.reconnectTimer) { - clearTimeout(this.reconnectTimer); - this.reconnectTimer = null; - } - - // Close channel and connection gracefully - await this.closeConnection(); - this.logger.info('RabbitMQ controller shutdown complete'); - } - - private async closeConnection(): Promise { - try { - if (this.amqpChannel) { - await new Promise((resolve) => { - this.amqpChannel?.close((err) => { - if (err) { - this.logger.warn(`Error closing channel: ${err.message}`); - } - resolve(); - }); - }); - this.amqpChannel = null; - } - - if (this.amqpConnection) { - await new Promise((resolve) => { - this.amqpConnection?.close((err) => { - if (err) { - this.logger.warn(`Error closing connection: ${err.message}`); - } - resolve(); - }); - }); - this.amqpConnection = null; - } - } catch (error) { - this.logger.error({ - local: 'RabbitmqController.closeConnection', - message: 'Error during connection cleanup', - error: error.message || error, - }); - } - } - - public getConnectionStatus(): string { - return this.connectionStatus; - } - - public isConnected(): boolean { - return this.connectionStatus === 'connected' && this.amqpChannel !== null && this.amqpConnection !== null; - } - - public async forceReconnect(): Promise { - this.logger.info('Force reconnect requested'); - - // Reset reconnect attempts for forced reconnect - this.reconnectAttempts = 0; - - // Close existing connections - await this.closeConnection(); - - // Clear any pending reconnect - if (this.reconnectTimer) { - clearTimeout(this.reconnectTimer); - this.reconnectTimer = null; - } - - this.isReconnecting = false; - - // Attempt immediate reconnection - await this.connect(); - } - private async connect(): Promise { - if (this.isShuttingDown) { - return; - } - - this.connectionStatus = this.reconnectAttempts > 0 ? 'reconnecting' : 'connecting'; - return new Promise((resolve, reject) => { const uri = configService.get('RABBITMQ').URI; const frameMax = configService.get('RABBITMQ').FRAME_MAX; @@ -134,7 +47,6 @@ export class RabbitmqController extends EventController implements EventControll amqp.connect(connectionOptions, (error, connection) => { if (error) { - this.connectionStatus = 'disconnected'; this.logger.error({ local: 'RabbitmqController.connect', message: 'Failed to connect to RabbitMQ', @@ -151,25 +63,16 @@ export class RabbitmqController extends EventController implements EventControll message: 'RabbitMQ connection error', error: err.message || err, }); - this.handleConnectionLoss('connection_error', err); + this.handleConnectionLoss(); }); connection.on('close', () => { this.logger.warn('RabbitMQ connection closed'); - this.handleConnectionLoss('connection_closed'); - }); - - connection.on('blocked', (reason) => { - this.logger.warn(`RabbitMQ connection blocked: ${reason}`); - }); - - connection.on('unblocked', () => { - this.logger.info('RabbitMQ connection unblocked'); + this.handleConnectionLoss(); }); connection.createChannel((channelError, channel) => { if (channelError) { - this.connectionStatus = 'disconnected'; this.logger.error({ local: 'RabbitmqController.createChannel', message: 'Failed to create RabbitMQ channel', @@ -186,21 +89,12 @@ export class RabbitmqController extends EventController implements EventControll message: 'RabbitMQ channel error', error: err.message || err, }); - this.handleConnectionLoss('channel_error', err); + this.handleConnectionLoss(); }); channel.on('close', () => { this.logger.warn('RabbitMQ channel closed'); - this.handleConnectionLoss('channel_closed'); - }); - - channel.on('return', (msg) => { - this.logger.warn('RabbitMQ message returned' + JSON.stringify({ - exchange: msg.fields.exchange, - routingKey: msg.fields.routingKey, - replyCode: msg.fields.replyCode, - replyText: msg.fields.replyText, - })); + this.handleConnectionLoss(); }); const exchangeName = rabbitmqExchangeName; @@ -208,37 +102,25 @@ export class RabbitmqController extends EventController implements EventControll channel.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, - }, (exchangeError) => { - if (exchangeError) { - this.connectionStatus = 'disconnected'; - this.logger.error({ - local: 'RabbitmqController.assertExchange', - message: 'Failed to assert exchange', - error: exchangeError.message || exchangeError, - }); - reject(exchangeError); - return; - } + }); - this.amqpConnection = connection; - this.amqpChannel = channel; - this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection - this.isReconnecting = false; - this.connectionStatus = 'connected'; + this.amqpConnection = connection; + this.amqpChannel = channel; + this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection + this.isReconnecting = false; - this.logger.info('AMQP initialized successfully'); - resolve(); - }); + this.logger.info('AMQP initialized successfully'); + + resolve(); }); }); }) .then(() => { if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) { - return this.initGlobalQueues(); + this.initGlobalQueues(); } }) .catch((error) => { - this.connectionStatus = 'disconnected'; this.logger.error({ local: 'RabbitmqController.init', message: 'Failed to initialize AMQP', @@ -249,30 +131,21 @@ export class RabbitmqController extends EventController implements EventControll }); } - private handleConnectionLoss(reason?: string, error?: any): void { - if (this.isReconnecting || this.isShuttingDown) { - return; // Already attempting to reconnect or shutting down + private handleConnectionLoss(): void { + if (this.isReconnecting) { + return; // Already attempting to reconnect } - this.logger.warn(`Connection lost due to: ${reason || 'unknown reason'}` + JSON.stringify(error ? { error: error.message || error } : {})); - - this.connectionStatus = 'disconnected'; this.amqpChannel = null; this.amqpConnection = null; - this.scheduleReconnect(); } private scheduleReconnect(): void { - if (this.isShuttingDown) { - return; - } - if (this.reconnectAttempts >= this.maxReconnectAttempts) { this.logger.error( `Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`, ); - this.connectionStatus = 'disconnected'; return; } @@ -289,11 +162,7 @@ export class RabbitmqController extends EventController implements EventControll `Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`, ); - this.reconnectTimer = setTimeout(async () => { - if (this.isShuttingDown) { - return; - } - + setTimeout(async () => { try { this.logger.info( `Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`, @@ -308,8 +177,6 @@ export class RabbitmqController extends EventController implements EventControll }); this.isReconnecting = false; this.scheduleReconnect(); - } finally { - this.reconnectTimer = null; } }, delay); } @@ -323,9 +190,9 @@ export class RabbitmqController extends EventController implements EventControll } private async ensureConnection(): Promise { - if (!this.amqpChannel || !this.isConnected()) { + if (!this.amqpChannel) { this.logger.warn('AMQP channel is not available, attempting to reconnect...'); - if (!this.isReconnecting && !this.isShuttingDown) { + if (!this.isReconnecting) { this.scheduleReconnect(); } return false; @@ -333,25 +200,6 @@ export class RabbitmqController extends EventController implements EventControll return true; } - public async waitForConnection(timeoutMs: number = 30000): Promise { - const startTime = Date.now(); - - while (Date.now() - startTime < timeoutMs) { - if (this.isConnected()) { - return true; - } - - if (this.isShuttingDown) { - return false; - } - - // Wait 100ms before checking again - await new Promise(resolve => setTimeout(resolve, 100)); - } - - return false; - } - public async emit({ instanceName, origin, @@ -398,113 +246,106 @@ export class RabbitmqController extends EventController implements EventControll if (instanceRabbitmq?.enabled && this.amqpChannel) { if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { const exchangeName = instanceName ?? rabbitmqExchangeName; - await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'local'); + + let retry = 0; + + while (retry < 3) { + try { + await this.amqpChannel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); + + const eventName = event.replace(/_/g, '.').toLowerCase(); + + const queueName = `${instanceName}.${eventName}`; + + await this.amqpChannel.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); + + await this.amqpChannel.bindQueue(queueName, exchangeName, eventName); + + await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + + if (logEnabled) { + const logData = { + local: `${origin}.sendData-RabbitMQ`, + ...message, + }; + + this.logger.log(logData); + } + + break; + } catch (error) { + this.logger.error({ + local: 'RabbitmqController.emit', + message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`, + error: error.message || error, + }); + retry++; + if (retry >= 3) { + this.handleConnectionLoss(); + } + } + } } } if (rabbitmqGlobal && rabbitmqEvents[we] && this.amqpChannel) { const exchangeName = rabbitmqExchangeName; - await this.publishMessage(exchangeName, event, message, instanceName, origin, logEnabled, 'global'); - } - } - private async publishMessage( - exchangeName: string, - event: string, - message: any, - instanceName: string, - origin: string, - logEnabled: boolean, - type: 'local' | 'global' - ): Promise { - let retry = 0; - const maxRetries = 3; - - while (retry < maxRetries) { - try { - if (!(await this.ensureConnection())) { - throw new Error('No AMQP connection available'); - } + let retry = 0; - await this.amqpChannel.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); - - let queueName: string; - let routingKey: string; + while (retry < 3) { + try { + await this.amqpChannel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); - if (type === 'local') { - const eventName = event.replace(/_/g, '.').toLowerCase(); - queueName = `${instanceName}.${eventName}`; - routingKey = eventName; - } else { - const prefixKey = configService.get('RABBITMQ').PREFIX_KEY; - queueName = prefixKey + const queueName = prefixKey ? `${prefixKey}.${event.replace(/_/g, '.').toLowerCase()}` : event.replace(/_/g, '.').toLowerCase(); - routingKey = event; - } - await this.amqpChannel.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); + await this.amqpChannel.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); - await this.amqpChannel.bindQueue(queueName, exchangeName, routingKey); - - const published = await new Promise((resolve) => { - const success = this.amqpChannel.publish( - exchangeName, - routingKey, - Buffer.from(JSON.stringify(message)), - { persistent: true }, - (err) => { - if (err) { - resolve(false); - } else { - resolve(true); - } - } - ); - - if (!success) { - resolve(false); - } - }); + await this.amqpChannel.bindQueue(queueName, exchangeName, event); - if (!published) { - throw new Error('Failed to publish message - channel write buffer full'); - } + await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); - if (logEnabled) { - const logData = { - local: `${origin}.sendData-RabbitMQ${type === 'global' ? '-Global' : ''}`, - ...message, - }; + if (logEnabled) { + const logData = { + local: `${origin}.sendData-RabbitMQ-Global`, + ...message, + }; - this.logger.log(logData); - } + this.logger.log(logData); + } - break; // Success, exit retry loop - } catch (error) { - this.logger.error({ - local: 'RabbitmqController.publishMessage', - message: `Error publishing ${type} RabbitMQ message (attempt ${retry + 1}/${maxRetries})`, - error: error.message || error, - }); - retry++; - - if (retry >= maxRetries) { - this.handleConnectionLoss('publish_error', error); - throw error; + break; + } catch (error) { + this.logger.error({ + local: 'RabbitmqController.emit', + message: `Error publishing global RabbitMQ message (attempt ${retry + 1}/3)`, + error: error.message || error, + }); + retry++; + if (retry >= 3) { + this.handleConnectionLoss(); + } } - - // Wait before retry - await new Promise(resolve => setTimeout(resolve, 1000 * retry)); } } } @@ -560,7 +401,7 @@ export class RabbitmqController extends EventController implements EventControll message: `Failed to initialize global queue for event ${event}`, error: error.message || error, }); - this.handleConnectionLoss('queue_init_error', error); + this.handleConnectionLoss(); break; } }