diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md index 4c1ade4ea05d4..982f4a7c45fd5 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md @@ -17,6 +17,7 @@ CHANGELOG 5.2.0 ----- + * Introduced support for multiple bindings on the same queue. * Add option to confirm message delivery * DSN now support AMQPS out-of-the-box. diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index 0df199e046a17..5eb526aef983e 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -17,6 +17,7 @@ use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp; use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection; use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\Exception\LogicException; /** * @requires extension amqp @@ -38,8 +39,8 @@ public function testItCanBeConstructedWithDefaults() { $this->assertEquals( new Connection([ - 'host' => 'localhost', - 'port' => 5672, + 'host' => 'localhost', + 'port' => 5672, 'vhost' => '/', ], [ 'name' => self::DEFAULT_EXCHANGE_NAME, @@ -54,16 +55,16 @@ public function testItCanBeConstructedWithAnAmqpsDsn() { $this->assertEquals( new Connection([ - 'host' => 'localhost', - 'port' => 5671, - 'vhost' => '/', + 'host' => 'localhost', + 'port' => 5671, + 'vhost' => '/', 'cacert' => '/etc/ssl/certs', ], [ 'name' => self::DEFAULT_EXCHANGE_NAME, ], [ self::DEFAULT_EXCHANGE_NAME => [], ]), - Connection::fromDsn('amqps://localhost?'. + Connection::fromDsn('amqps://localhost?' . 'cacert=/etc/ssl/certs') ); } @@ -72,8 +73,8 @@ public function testItGetsParametersFromTheDsn() { $this->assertEquals( new Connection([ - 'host' => 'host', - 'port' => 5672, + 'host' => 'host', + 'port' => 5672, 'vhost' => '/', ], [ 'name' => 'custom', @@ -88,10 +89,10 @@ public function testOverrideOptionsViaQueryParameters() { $this->assertEquals( new Connection([ - 'host' => 'localhost', - 'port' => 1234, - 'vhost' => 'vhost', - 'login' => 'guest', + 'host' => 'localhost', + 'port' => 1234, + 'vhost' => 'vhost', + 'login' => 'guest', 'password' => 'password', ], [ 'name' => 'exchangeName', @@ -106,9 +107,9 @@ public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn() { $this->assertEquals( new Connection([ - 'host' => 'localhost', - 'port' => 5672, - 'vhost' => '/', + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', 'persistent' => 'true', ], [ 'name' => 'exchangeName', @@ -117,7 +118,7 @@ public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn() ]), Connection::fromDsn('amqp://localhost/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [ 'persistent' => 'true', - 'exchange' => ['name' => 'toBeOverwritten'], + 'exchange' => ['name' => 'toBeOverwritten'], ]) ); } @@ -149,39 +150,38 @@ public function testExceptionIfInvalidExchangeOptionIsPassed() public function testSetsParametersOnTheQueueAndExchange() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), $amqpQueue = $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); $amqpQueue->expects($this->once())->method('setArguments')->with([ 'x-dead-letter-exchange' => 'dead-exchange', - 'x-delay' => 100, - 'x-expires' => 150, - 'x-max-length' => 200, - 'x-max-length-bytes' => 300, - 'x-max-priority' => 4, - 'x-message-ttl' => 100, + 'x-delay' => 100, + 'x-expires' => 150, + 'x-max-length' => 200, + 'x-max-length-bytes' => 300, + 'x-max-priority' => 4, + 'x-message-ttl' => 100, ]); $amqpExchange->expects($this->once())->method('setArguments')->with([ 'alternate-exchange' => 'alternate', ]); - $dsn = 'amqp://localhost/%2f/messages?'. - 'queues[messages][arguments][x-dead-letter-exchange]=dead-exchange&'. - 'queues[messages][arguments][x-message-ttl]=100&'. - 'queues[messages][arguments][x-delay]=100&'. - 'queues[messages][arguments][x-expires]=150&' - ; + $dsn = 'amqp://localhost/%2f/messages?' . + 'queues[messages][arguments][x-dead-letter-exchange]=dead-exchange&' . + 'queues[messages][arguments][x-message-ttl]=100&' . + 'queues[messages][arguments][x-delay]=100&' . + 'queues[messages][arguments][x-expires]=150&'; $connection = Connection::fromDsn($dsn, [ - 'queues' => [ + 'queues' => [ 'messages' => [ 'arguments' => [ - 'x-max-length' => '200', + 'x-max-length' => '200', 'x-max-length-bytes' => '300', - 'x-max-priority' => '4', + 'x-max-priority' => '4', ], ], ], @@ -199,12 +199,12 @@ public function invalidQueueArgumentsDataProvider(): iterable $baseDsn = 'amqp://localhost/%2f/messages'; return [ - [$baseDsn.'?queues[messages][arguments][x-delay]=not-a-number', []], - [$baseDsn.'?queues[messages][arguments][x-expires]=not-a-number', []], - [$baseDsn.'?queues[messages][arguments][x-max-length]=not-a-number', []], - [$baseDsn.'?queues[messages][arguments][x-max-length-bytes]=not-a-number', []], - [$baseDsn.'?queues[messages][arguments][x-max-priority]=not-a-number', []], - [$baseDsn.'?queues[messages][arguments][x-message-ttl]=not-a-number', []], + [$baseDsn . '?queues[messages][arguments][x-delay]=not-a-number', []], + [$baseDsn . '?queues[messages][arguments][x-expires]=not-a-number', []], + [$baseDsn . '?queues[messages][arguments][x-max-length]=not-a-number', []], + [$baseDsn . '?queues[messages][arguments][x-max-length-bytes]=not-a-number', []], + [$baseDsn . '?queues[messages][arguments][x-max-priority]=not-a-number', []], + [$baseDsn . '?queues[messages][arguments][x-message-ttl]=not-a-number', []], // Ensure the exception is thrown when the arguments are passed via the array options [$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-delay' => 'not-a-number']]]]], @@ -232,8 +232,8 @@ public function testItUsesANormalConnectionByDefault() $factory = new TestAmqpFactory( $amqpConnection = $this->createMock(\AMQPConnection::class), $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); // makes sure the channel looks connected, so it's not re-created @@ -249,8 +249,8 @@ public function testItAllowsToUseAPersistentConnection() $factory = new TestAmqpFactory( $amqpConnection = $this->createMock(\AMQPConnection::class), $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); // makes sure the channel looks connected, so it's not re-created @@ -264,14 +264,17 @@ public function testItAllowsToUseAPersistentConnection() public function testItSetupsTheConnectionWithDefaults() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), $amqpQueue = $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $amqpExchange->expects($this->once())->method('publish')->with('body', + null, + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $amqpQueue->expects($this->once())->method('declareQueue'); $amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null); @@ -294,7 +297,10 @@ public function testItSetupsTheConnection() $factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1)); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $amqpExchange->expects($this->once())->method('publish')->with('body', + 'routing_key', + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $amqpQueue0->expects($this->once())->method('declareQueue'); $amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive( [self::DEFAULT_EXCHANGE_NAME, 'binding_key0'], @@ -306,11 +312,11 @@ public function testItSetupsTheConnection() [self::DEFAULT_EXCHANGE_NAME, 'binding_key3'] ); - $dsn = 'amqp://localhost?'. - 'exchange[default_publish_routing_key]=routing_key&'. - 'queues[queue0][binding_keys][0]=binding_key0&'. - 'queues[queue0][binding_keys][1]=binding_key1&'. - 'queues[queue1][binding_keys][0]=binding_key2&'. + $dsn = 'amqp://localhost?' . + 'exchange[default_publish_routing_key]=routing_key&' . + 'queues[queue0][binding_keys][0]=binding_key0&' . + 'queues[queue0][binding_keys][1]=binding_key1&' . + 'queues[queue1][binding_keys][0]=binding_key2&' . 'queues[queue1][binding_keys][1]=binding_key3'; $connection = Connection::fromDsn($dsn, [], $factory); @@ -332,7 +338,10 @@ public function testItSetupsTheTTLConnection() $factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1)); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $amqpExchange->expects($this->once())->method('publish')->with('body', + 'routing_key', + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $amqpQueue0->expects($this->once())->method('declareQueue'); $amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive( [self::DEFAULT_EXCHANGE_NAME, 'binding_key0'], @@ -344,12 +353,12 @@ public function testItSetupsTheTTLConnection() [self::DEFAULT_EXCHANGE_NAME, 'binding_key3'] ); - $dsn = 'amqps://localhost?'. - 'cacert=/etc/ssl/certs&'. - 'exchange[default_publish_routing_key]=routing_key&'. - 'queues[queue0][binding_keys][0]=binding_key0&'. - 'queues[queue0][binding_keys][1]=binding_key1&'. - 'queues[queue1][binding_keys][0]=binding_key2&'. + $dsn = 'amqps://localhost?' . + 'cacert=/etc/ssl/certs&' . + 'exchange[default_publish_routing_key]=routing_key&' . + 'queues[queue0][binding_keys][0]=binding_key0&' . + 'queues[queue0][binding_keys][1]=binding_key1&' . + 'queues[queue1][binding_keys][0]=binding_key2&' . 'queues[queue1][binding_keys][1]=binding_key3'; $connection = Connection::fromDsn($dsn, [], $factory); @@ -370,24 +379,74 @@ public function testBindingArguments() $factory->method('createQueue')->willReturn($amqpQueue); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $amqpExchange->expects($this->once())->method('publish')->with('body', + null, + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $amqpQueue->expects($this->once())->method('declareQueue'); $amqpQueue->expects($this->exactly(1))->method('bind')->withConsecutive( [self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all']] ); - $dsn = 'amqp://localhost?exchange[type]=headers'. + $dsn = 'amqp://localhost?exchange[type]=headers' . '&queues[queue0][binding_arguments][x-match]=all'; $connection = Connection::fromDsn($dsn, [], $factory); $connection->publish('body'); } + public function testMultipleBindings() + { + $amqpConnection = $this->createMock(\AMQPConnection::class); + $amqpChannel = $this->createMock(\AMQPChannel::class); + $amqpExchange = $this->createMock(\AMQPExchange::class); + $amqpQueue = $this->createMock(\AMQPQueue::class); + $timestampTime = time(); + $amqpStamp = new AmqpStamp(null, \AMQP_NOPARAM, ['timestamp' => $timestampTime]); + + $factory = $this->createMock(AmqpFactory::class); + $factory->method('createConnection')->willReturn($amqpConnection); + $factory->method('createChannel')->willReturn($amqpChannel); + $factory->method('createExchange')->willReturn($amqpExchange); + $factory->method('createQueue')->willReturn($amqpQueue); + + $amqpExchange->expects($this->once())->method('declareExchange'); + $amqpExchange->expects($this->once())->method('publish') + ->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => $timestampTime]); + $amqpQueue->expects($this->once())->method('declareQueue'); + $amqpQueue->expects($this->exactly(2))->method('bind')->withConsecutive( + [self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all', 'header-property' => 'change']], + [self::DEFAULT_EXCHANGE_NAME, 'binding_key0', ['x-match' => 'all', 'header-property' => 'remove']] + ); + + $dsn = 'amqp://localhost?exchange[type]=headers' . + '&queues[queue0][bindings][one][arguments][x-match]=all' . + '&queues[queue0][bindings][one][arguments][header-property]=change' . + '&queues[queue0][bindings][two][arguments][x-match]=all' . + '&queues[queue0][bindings][two][arguments][header-property]=remove' . + '&queues[queue0][bindings][two][key]=binding_key0'; + + $connection = Connection::fromDsn($dsn, [], $factory); + $connection->publish('body', [], 0, $amqpStamp); + } + + public function testMultipleBindingsAndDeprecatedBindings() + { + $factory = $this->createMock(AmqpFactory::class); + $dsn = 'amqp://localhost?exchange[type]=headers' . + '&queues[queue0][bindings][one][arguments][x-match]=all' . + '&queues[queue0][binding_keys][one]' . + '&queues[queue0][binding_arguments][one][header-property]=change'; + + $this->expectException(LogicException::class); + Connection::fromDsn($dsn, [], $factory); + } + public function testItCanDisableTheSetup() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), $amqpQueue = $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); @@ -409,8 +468,8 @@ public function testItCanDisableTheSetup() public function testItSetupQueuesOnce() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), $amqpQueue = $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); @@ -434,7 +493,7 @@ public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay() $factory->method('createChannel')->willReturn($amqpChannel); $factory->method('createQueue')->will($this->onConsecutiveCalls( $amqpQueue = $this->createMock(\AMQPQueue::class), - $delayQueue = $this->createMock(\AMQPQueue::class) + $this->createMock(\AMQPQueue::class) )); $factory->method('createExchange')->will($this->onConsecutiveCalls( $amqpExchange = $this->createMock(\AMQPExchange::class), @@ -460,11 +519,13 @@ public function testItDelaysTheMessage() $delayExchange->expects($this->once()) ->method('publish') ->with('{}', 'delay_messages__5000_delay', \AMQP_NOPARAM, [ - 'headers' => ['x-some-headers' => 'foo'], + 'headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2, - 'timestamp' => time(), + 'timestamp' => time(), ]); - $connection = $this->createDelayOrRetryConnection($delayExchange, self::DEFAULT_EXCHANGE_NAME, 'delay_messages__5000_delay'); + $connection = $this->createDelayOrRetryConnection($delayExchange, + self::DEFAULT_EXCHANGE_NAME, + 'delay_messages__5000_delay'); $connection->publish('{}', ['x-some-headers' => 'foo'], 5000); } @@ -509,16 +570,19 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs() $delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000_delay'); $delayQueue->expects($this->once())->method('setArguments')->with([ - 'x-message-ttl' => 120000, - 'x-expires' => 120000 + 10000, - 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, + 'x-message-ttl' => 120000, + 'x-expires' => 120000 + 10000, + 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, 'x-dead-letter-routing-key' => '', ]); $delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000_delay'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $delayExchange->expects($this->once())->method('publish')->with('{}', + 'delay_messages__120000_delay', + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $connection->publish('{}', [], 120000); } @@ -528,9 +592,9 @@ public function testNoCredentialLeakageWhenConnectionFails() $this->expectExceptionMessage('Could not connect to the AMQP server. Please verify the provided DSN.'); $factory = new TestAmqpFactory( $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); $amqpConnection->method('connect')->willThrowException( @@ -547,10 +611,10 @@ public function testNoCaCertOnSslConnectionFromDsn() $this->expectExceptionMessage('No CA certificate has been provided. Set "amqp.cacert" in your php.ini or pass the "cacert" parameter in the DSN to use SSL. Alternatively, you can use amqp:// to use without SSL.'); $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); $oldCaCertValue = ini_set('amqp.cacert', ''); @@ -571,10 +635,16 @@ public function testAmqpStampHeadersAreUsed() $amqpExchange = $this->createMock(\AMQPExchange::class) ); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2, 'timestamp' => time()]); + $amqpExchange->expects($this->once())->method('publish')->with('body', + null, + \AMQP_NOPARAM, + ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2, 'timestamp' => time()]); $connection = Connection::fromDsn('amqp://localhost', [], $factory); - $connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, \AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']])); + $connection->publish('body', + ['Foo' => 'X'], + 0, + new AmqpStamp(null, \AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']])); } public function testAmqpStampDeliveryModeIsUsed() @@ -586,7 +656,10 @@ public function testAmqpStampDeliveryModeIsUsed() $amqpExchange = $this->createMock(\AMQPExchange::class) ); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1, 'timestamp' => time()]); + $amqpExchange->expects($this->once())->method('publish')->with('body', + null, + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 1, 'timestamp' => time()]); $connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection->publish('body', [], 0, new AmqpStamp(null, \AMQP_NOPARAM, ['delivery_mode' => 1])); @@ -595,30 +668,34 @@ public function testAmqpStampDeliveryModeIsUsed() public function testItCanPublishWithTheDefaultRoutingKey() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key'); - $connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=routing_key', [], $factory); + $connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=routing_key', + [], + $factory); $connection->publish('body'); } public function testItCanPublishWithASuppliedRoutingKey() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key'); - $connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', [], $factory); + $connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', + [], + $factory); $connection->publish('body', [], 0, new AmqpStamp('routing_key')); } @@ -649,25 +726,28 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument $delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000_delay'); $delayQueue->expects($this->once())->method('setArguments')->with([ - 'x-message-ttl' => 120000, - 'x-expires' => 120000 + 10000, - 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, + 'x-message-ttl' => 120000, + 'x-expires' => 120000 + 10000, + 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, 'x-dead-letter-routing-key' => 'routing_key', ]); $delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000_delay'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $delayExchange->expects($this->once())->method('publish')->with('{}', + 'delay_messages_routing_key_120000_delay', + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $connection->publish('{}', [], 120000, new AmqpStamp('routing_key')); } public function testItCanPublishWithCustomFlagsAndAttributes() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); @@ -679,16 +759,19 @@ public function testItCanPublishWithCustomFlagsAndAttributes() ); $connection = Connection::fromDsn('amqp://localhost', [], $factory); - $connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', \AMQP_IMMEDIATE, ['delivery_mode' => 2])); + $connection->publish('body', + ['type' => DummyMessage::class], + 0, + new AmqpStamp('routing_key', \AMQP_IMMEDIATE, ['delivery_mode' => 2])); } public function testItPublishMessagesWithoutWaitingForConfirmation() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPConnection::class), $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); $amqpChannel->expects($this->never())->method('waitForConfirm')->with(0.5); @@ -700,10 +783,10 @@ public function testItPublishMessagesWithoutWaitingForConfirmation() public function testSetChannelToConfirmMessage() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPConnection::class), $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); $amqpChannel->expects($this->once())->method('confirmSelect'); @@ -715,10 +798,10 @@ public function testSetChannelToConfirmMessage() public function testItCanPublishAndWaitForConfirmation() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPConnection::class), $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); $amqpChannel->expects($this->once())->method('waitForConfirm')->with(0.5); @@ -731,8 +814,8 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn() { $this->assertEquals( new Connection([ - 'host' => 'localhost', - 'port' => 5672, + 'host' => 'localhost', + 'port' => 5672, 'vhost' => '/', ], [ 'name' => self::DEFAULT_EXCHANGE_NAME, @@ -741,8 +824,8 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn() ]), Connection::fromDsn('amqp://', [ 'cacert' => 'foobar', - 'cert' => 'foobar', - 'key' => 'foobar', + 'cert' => 'foobar', + 'key' => 'foobar', 'verify' => false, ]) ); @@ -767,9 +850,9 @@ private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, stri $delayQueue->expects($this->once())->method('setName')->with($delayQueueName); $delayQueue->expects($this->once())->method('setArguments')->with([ - 'x-message-ttl' => 5000, - 'x-expires' => 5000 + 10000, - 'x-dead-letter-exchange' => $deadLetterExchangeName, + 'x-message-ttl' => 5000, + 'x-expires' => 5000 + 10000, + 'x-dead-letter-exchange' => $deadLetterExchangeName, 'x-dead-letter-routing-key' => '', ]); @@ -782,10 +865,10 @@ private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, stri class TestAmqpFactory extends AmqpFactory { - private $connection; - private $channel; - private $queue; - private $exchange; + private \AMQPConnection $connection; + private \AMQPChannel $channel; + private \AMQPQueue $queue; + private \AMQPExchange $exchange; public function __construct(\AMQPConnection $connection, \AMQPChannel $channel, \AMQPQueue $queue, \AMQPExchange $exchange) { diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 353d56a2ddb6c..b07e419866224 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -65,9 +65,21 @@ class Connection ]; private const AVAILABLE_QUEUE_OPTIONS = [ + 'flags', + 'arguments', + ]; + + private const NEW_QUEUE_OPTIONS = [ + 'bindings', + ]; + + private const DEPRECATED_BINDING_KEYS = [ 'binding_keys', 'binding_arguments', - 'flags', + ]; + + private const AVAILABLE_BINDINGS_OPTIONS = [ + 'key', 'arguments', ]; @@ -129,8 +141,11 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar * * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional. * * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional. * * queues[name]: An array of queues, keyed by the name - * * binding_keys: The binding keys (if any) to bind to this queue - * * binding_arguments: Arguments to be used while binding the queue. + * * binding_keys: The binding keys (if any) to bind to this queue (Usage is deprecated. See 'bindings') + * * binding_arguments: Arguments to be used while binding the queue. (Usage is deprecated. See 'bindings') + * * bindings[name]: An array of bindings for this queue, keyed by the name + * * key: The binding key (if any) to bind to this queue + * * arguments: An array of arguments to be used while binding the queue. * * flags: Queue flags (Default: AMQP_DURABLE) * * arguments: Extra arguments * * exchange: @@ -240,10 +255,24 @@ private static function validateOptions(array $options): void if (!\is_array($queue)) { continue; } + if (0 < \count($deprecatedQueueOptions = array_intersect(array_keys($queue), self::DEPRECATED_BINDING_KEYS))) { + trigger_deprecation('symfony/messenger', '6.3', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "%s" option(s) should be used rather than "%s".', implode('", "', $deprecatedQueueOptions), implode('", ', self::NEW_QUEUE_OPTIONS), implode('", ', self::DEPRECATED_BINDING_KEYS)); + if (0 < \count($newQueueOptions = array_intersect(array_keys($queue), self::NEW_QUEUE_OPTIONS))) { + throw new LogicException(sprintf('New "%s" and deprecated "%s" option(s) passed to the AMQP Messenger transport', implode('", "', $newQueueOptions), implode('", "', $deprecatedQueueOptions))); + } + } - if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) { + if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::NEW_QUEUE_OPTIONS, self::DEPRECATED_BINDING_KEYS))) { throw new LogicException(sprintf('Invalid queue option(s) "%s" passed to the AMQP Messenger transport.', implode('", "', $invalidQueueOptions))); } + + if (\is_array($queue['bindings'] ?? false)) { + foreach ($queue['bindings'] as $individualBinding) { + if (0 < \count(array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) { + throw new LogicException(sprintf("Valid options for each 'bindings' are: %s", implode(', ', self::AVAILABLE_BINDINGS_OPTIONS))); + } + } + } } } @@ -457,6 +486,12 @@ private function setupExchangeAndQueues(): void foreach ($this->queuesOptions as $queueName => $queueConfig) { $this->queue($queueName)->declareQueue(); + foreach ($queueConfig['bindings'] ?? [] as $binding) { + $this->queue($queueName)->bind($this->exchangeOptions['name'], $binding['key'] ?? null, $binding['arguments'] ?? []); + } + if (isset($queueConfig['bindings']) && empty($queueConfig['binding_keys'])) { + continue; + } foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) { $this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []); }