From 74339e7cab25a862a3f52b6a1132801d31c10282 Mon Sep 17 00:00:00 2001 From: Ryan Jefferson Date: Thu, 8 Oct 2020 10:58:02 -0500 Subject: [PATCH 1/8] 37233 - Support multiple bindings for a queue --- .../Messenger/Bridge/Amqp/CHANGELOG.md | 1 + .../Amqp/Tests/Transport/ConnectionTest.php | 32 +++++++++++++++++ .../Bridge/Amqp/Transport/Connection.php | 36 ++++++++++++++++--- 3 files changed, 65 insertions(+), 4 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md index 81c0100991936..849f106537948 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md @@ -4,6 +4,7 @@ CHANGELOG 5.2.0 ----- + * Introduced support for multiple bindings on the same queue. * Add option to confirm message delivery * DSN now support AMQPS out-of-the-box. diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index 36fde1250587c..2563efd83af2f 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -398,6 +398,38 @@ public function testBindingArguments() $connection->publish('body'); } + public function testMultipleBindings() + { + $amqpConnection = $this->createMock(\AMQPConnection::class); + $amqpChannel = $this->createMock(\AMQPChannel::class); + $amqpExchange = $this->createMock(\AMQPExchange::class); + $amqpQueue = $this->createMock(\AMQPQueue::class); + + $factory = $this->createMock(AmqpFactory::class); + $factory->method('createConnection')->willReturn($amqpConnection); + $factory->method('createChannel')->willReturn($amqpChannel); + $factory->method('createExchange')->willReturn($amqpExchange); + $factory->method('createQueue')->willReturn($amqpQueue); + + $amqpExchange->expects($this->once())->method('declareExchange'); + $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); + $amqpQueue->expects($this->once())->method('declareQueue'); + $amqpQueue->expects($this->exactly(2))->method('bind')->withConsecutive( + [self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all', 'header-property' => 'change']], + [self::DEFAULT_EXCHANGE_NAME, 'binding_key0', ['x-match' => 'all', 'header-property' => 'remove']] + ); + + $dsn = 'amqp://localhost?exchange[type]=headers'. + '&queues[queue0][bindings][one][arguments][x-match]=all'. + '&queues[queue0][bindings][one][arguments][header-property]=change'. + '&queues[queue0][bindings][two][arguments][x-match]=all'. + '&queues[queue0][bindings][two][arguments][header-property]=remove'. + '&queues[queue0][bindings][two][key]=binding_key0'; + + $connection = Connection::fromDsn($dsn, [], $factory); + $connection->publish('body'); + } + public function testItCanDisableTheSetup() { $factory = new TestAmqpFactory( diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 64a550167b17c..86911d42db211 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -60,9 +60,18 @@ class Connection ]; private const AVAILABLE_QUEUE_OPTIONS = [ + 'bindings', + 'flags', + 'arguments', + ]; + + private const ORIGINAL_BINDING_KEYS = [ 'binding_keys', 'binding_arguments', - 'flags', + ]; + + private const AVAILABLE_BINDINGS_OPTIONS = [ + 'key', 'arguments', ]; @@ -131,8 +140,11 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar * * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional. * * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional. * * queues[name]: An array of queues, keyed by the name - * * binding_keys: The binding keys (if any) to bind to this queue - * * binding_arguments: Arguments to be used while binding the queue. + * * binding_keys: The binding keys (if any) to bind to this queue (Usage is deprecated. See 'bindings') + * * binding_arguments: Arguments to be used while binding the queue. (Usage is deprecated. See 'bindings') + * * bindings[name]: An array of bindings for this queue, keyed by the name + * * key: The binding key (if any) to bind to this queue + * * arguments: An array of arguments to be used while binding the queue. * * flags: Queue flags (Default: AMQP_DURABLE) * * arguments: Extra arguments * * exchange: @@ -237,8 +249,18 @@ private static function validateOptions(array $options): void continue; } - if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) { + if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::ORIGINAL_BINDING_KEYS))) { trigger_deprecation('symfony/messenger', '5.1', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions)); + } elseif (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) { + trigger_deprecation('symfony/messenger', '5.2', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "bindings" option should be used rather than "binding_keys" and "binding_arguments".', implode('", "', $invalidQueueOptions)); + } + + if (\is_array($queue['bindings'] ?? false)) { + foreach ($queue['bindings'] as $title => $individualBinding) { + if (0 < \count($invalidBindingsOptions = array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) { + throw new \InvalidArgumentException(sprintf('Invalid bindings option(s) "%s" passed to the AMQP Messenger transport in "%s". Each "bindings" option only accepts "key" and "arguments"', implode('", "', $invalidBindingsOptions), $title)); + } + } } } } @@ -461,6 +483,12 @@ private function setupExchangeAndQueues(): void foreach ($this->queuesOptions as $queueName => $queueConfig) { $this->queue($queueName)->declareQueue(); + foreach ($queueConfig['bindings'] ?? [] as $binding) { + $this->queue($queueName)->bind($this->exchangeOptions['name'], $binding['key'] ?? null, $binding['arguments'] ?? []); + } + if (isset($queueConfig['bindings']) && empty($queueConfig['binding_keys'])) { + continue; + } foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) { $this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []); } From b5e2f0ee51768f5d21c131e84ff1a3de55aba3e0 Mon Sep 17 00:00:00 2001 From: Ryan Jefferson Date: Fri, 9 Oct 2020 15:21:14 -0500 Subject: [PATCH 2/8] Tweak to exception message when invalid options for bindings are provided --- .../Component/Messenger/Bridge/Amqp/Transport/Connection.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 86911d42db211..29ea4d47f71e2 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -256,9 +256,9 @@ private static function validateOptions(array $options): void } if (\is_array($queue['bindings'] ?? false)) { - foreach ($queue['bindings'] as $title => $individualBinding) { + foreach ($queue['bindings'] as $individualBinding) { if (0 < \count($invalidBindingsOptions = array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) { - throw new \InvalidArgumentException(sprintf('Invalid bindings option(s) "%s" passed to the AMQP Messenger transport in "%s". Each "bindings" option only accepts "key" and "arguments"', implode('", "', $invalidBindingsOptions), $title)); + throw new \InvalidArgumentException(sprintf("Valid options for each 'bindings' are: %s", implode(', ', self::AVAILABLE_BINDINGS_OPTIONS))); } } } From 4b4dd0566ee15bcea08fedcc84cb9f25a7629e74 Mon Sep 17 00:00:00 2001 From: Oleg Namaka Date: Fri, 6 Jan 2023 15:07:13 -0600 Subject: [PATCH 3/8] Update the branch to target symfony 6.3 --- .../Component/Messenger/Bridge/Amqp/Transport/Connection.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 0f1b0c55ff051..1e0ecf399be66 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -256,12 +256,12 @@ private static function validateOptions(array $options): void if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::ORIGINAL_BINDING_KEYS))) { throw new LogicException(sprintf('Invalid queue option(s) "%s" passed to the AMQP Messenger transport.', implode('", "', $invalidQueueOptions))); } elseif (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) { - trigger_deprecation('symfony/messenger', '5.2', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "bindings" option should be used rather than "binding_keys" and "binding_arguments".', implode('", "', $invalidQueueOptions)); + trigger_deprecation('symfony/messenger', '6.3', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "bindings" option should be used rather than "binding_keys" and "binding_arguments".', implode('", "', $invalidQueueOptions)); } if (\is_array($queue['bindings'] ?? false)) { foreach ($queue['bindings'] as $individualBinding) { - if (0 < \count($invalidBindingsOptions = array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) { + if (0 < \count(array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) { throw new \InvalidArgumentException(sprintf("Valid options for each 'bindings' are: %s", implode(', ', self::AVAILABLE_BINDINGS_OPTIONS))); } } From b204b07cd16f04cab5a51313376a4e41d6916983 Mon Sep 17 00:00:00 2001 From: Oleg Namaka Date: Sun, 8 Jan 2023 11:13:38 -0600 Subject: [PATCH 4/8] Fix the failing test, remove unused variables, add missing type declarations --- .../Amqp/Tests/Transport/ConnectionTest.php | 85 ++++++++++--------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index f6f1695b4824f..20969517265b1 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -149,8 +149,8 @@ public function testExceptionIfInvalidExchangeOptionIsPassed() public function testSetsParametersOnTheQueueAndExchange() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), $amqpQueue = $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); @@ -232,8 +232,8 @@ public function testItUsesANormalConnectionByDefault() $factory = new TestAmqpFactory( $amqpConnection = $this->createMock(\AMQPConnection::class), $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); // makes sure the channel looks connected, so it's not re-created @@ -249,8 +249,8 @@ public function testItAllowsToUseAPersistentConnection() $factory = new TestAmqpFactory( $amqpConnection = $this->createMock(\AMQPConnection::class), $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); // makes sure the channel looks connected, so it's not re-created @@ -264,8 +264,8 @@ public function testItAllowsToUseAPersistentConnection() public function testItSetupsTheConnectionWithDefaults() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), $amqpQueue = $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); @@ -389,6 +389,8 @@ public function testMultipleBindings() $amqpChannel = $this->createMock(\AMQPChannel::class); $amqpExchange = $this->createMock(\AMQPExchange::class); $amqpQueue = $this->createMock(\AMQPQueue::class); + $timestampTime = time(); + $amqpStamp = new AmqpStamp(null, \AMQP_NOPARAM, ['timestamp' => $timestampTime]); $factory = $this->createMock(AmqpFactory::class); $factory->method('createConnection')->willReturn($amqpConnection); @@ -397,7 +399,8 @@ public function testMultipleBindings() $factory->method('createQueue')->willReturn($amqpQueue); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); + $amqpExchange->expects($this->once())->method('publish') + ->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => $timestampTime]); $amqpQueue->expects($this->once())->method('declareQueue'); $amqpQueue->expects($this->exactly(2))->method('bind')->withConsecutive( [self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all', 'header-property' => 'change']], @@ -412,7 +415,7 @@ public function testMultipleBindings() '&queues[queue0][bindings][two][key]=binding_key0'; $connection = Connection::fromDsn($dsn, [], $factory); - $connection->publish('body'); + $connection->publish('body', [], 0, $amqpStamp); } public function testItCanDisableTheSetup() @@ -441,8 +444,8 @@ public function testItCanDisableTheSetup() public function testItSetupQueuesOnce() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), $amqpQueue = $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); @@ -560,9 +563,9 @@ public function testNoCredentialLeakageWhenConnectionFails() $this->expectExceptionMessage('Could not connect to the AMQP server. Please verify the provided DSN.'); $factory = new TestAmqpFactory( $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); $amqpConnection->method('connect')->willThrowException( @@ -579,10 +582,10 @@ public function testNoCaCertOnSslConnectionFromDsn() $this->expectExceptionMessage('No CA certificate has been provided. Set "amqp.cacert" in your php.ini or pass the "cacert" parameter in the DSN to use SSL. Alternatively, you can use amqp:// to use without SSL.'); $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); $oldCaCertValue = ini_set('amqp.cacert', ''); @@ -627,9 +630,9 @@ public function testAmqpStampDeliveryModeIsUsed() public function testItCanPublishWithTheDefaultRoutingKey() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); @@ -642,9 +645,9 @@ public function testItCanPublishWithTheDefaultRoutingKey() public function testItCanPublishWithASuppliedRoutingKey() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); @@ -697,9 +700,9 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument public function testItCanPublishWithCustomFlagsAndAttributes() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); @@ -717,10 +720,10 @@ public function testItCanPublishWithCustomFlagsAndAttributes() public function testItPublishMessagesWithoutWaitingForConfirmation() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPConnection::class), $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); $amqpChannel->expects($this->never())->method('waitForConfirm')->with(0.5); @@ -732,10 +735,10 @@ public function testItPublishMessagesWithoutWaitingForConfirmation() public function testSetChannelToConfirmMessage() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPConnection::class), $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); $amqpChannel->expects($this->once())->method('confirmSelect'); @@ -747,10 +750,10 @@ public function testSetChannelToConfirmMessage() public function testItCanPublishAndWaitForConfirmation() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPConnection::class), $amqpChannel = $this->createMock(\AMQPChannel::class), - $amqpQueue = $this->createMock(\AMQPQueue::class), - $amqpExchange = $this->createMock(\AMQPExchange::class) + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) ); $amqpChannel->expects($this->once())->method('waitForConfirm')->with(0.5); @@ -814,10 +817,10 @@ private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, stri class TestAmqpFactory extends AmqpFactory { - private $connection; - private $channel; - private $queue; - private $exchange; + private \AMQPConnection $connection; + private \AMQPChannel $channel; + private \AMQPQueue $queue; + private \AMQPExchange $exchange; public function __construct(\AMQPConnection $connection, \AMQPChannel $channel, \AMQPQueue $queue, \AMQPExchange $exchange) { From 4c1e3da152cf2e7dabe4676c857d4c4d89ef5e4b Mon Sep 17 00:00:00 2001 From: Oleg Namaka Date: Sun, 8 Jan 2023 11:20:24 -0600 Subject: [PATCH 5/8] Remove unused variables --- .../Bridge/Amqp/Tests/Transport/ConnectionTest.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index 20969517265b1..e22be0dcebe1e 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -421,8 +421,8 @@ public function testMultipleBindings() public function testItCanDisableTheSetup() { $factory = new TestAmqpFactory( - $amqpConnection = $this->createMock(\AMQPConnection::class), - $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), $amqpQueue = $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) ); @@ -469,7 +469,7 @@ public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay() $factory->method('createChannel')->willReturn($amqpChannel); $factory->method('createQueue')->will($this->onConsecutiveCalls( $amqpQueue = $this->createMock(\AMQPQueue::class), - $delayQueue = $this->createMock(\AMQPQueue::class) + $this->createMock(\AMQPQueue::class) )); $factory->method('createExchange')->will($this->onConsecutiveCalls( $amqpExchange = $this->createMock(\AMQPExchange::class), From 768b1cab85c54d992434deec83b6ecc04df0671f Mon Sep 17 00:00:00 2001 From: Oleg Namaka Date: Thu, 12 Jan 2023 16:55:32 -0600 Subject: [PATCH 6/8] Improve handling of new and deprecated options, add more test coverage --- .../Amqp/Tests/Transport/ConnectionTest.php | 214 +++++++++++------- .../Bridge/Amqp/Transport/Connection.php | 19 +- 2 files changed, 144 insertions(+), 89 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index e22be0dcebe1e..5eb526aef983e 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -17,6 +17,7 @@ use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp; use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection; use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\Exception\LogicException; /** * @requires extension amqp @@ -38,8 +39,8 @@ public function testItCanBeConstructedWithDefaults() { $this->assertEquals( new Connection([ - 'host' => 'localhost', - 'port' => 5672, + 'host' => 'localhost', + 'port' => 5672, 'vhost' => '/', ], [ 'name' => self::DEFAULT_EXCHANGE_NAME, @@ -54,16 +55,16 @@ public function testItCanBeConstructedWithAnAmqpsDsn() { $this->assertEquals( new Connection([ - 'host' => 'localhost', - 'port' => 5671, - 'vhost' => '/', + 'host' => 'localhost', + 'port' => 5671, + 'vhost' => '/', 'cacert' => '/etc/ssl/certs', ], [ 'name' => self::DEFAULT_EXCHANGE_NAME, ], [ self::DEFAULT_EXCHANGE_NAME => [], ]), - Connection::fromDsn('amqps://localhost?'. + Connection::fromDsn('amqps://localhost?' . 'cacert=/etc/ssl/certs') ); } @@ -72,8 +73,8 @@ public function testItGetsParametersFromTheDsn() { $this->assertEquals( new Connection([ - 'host' => 'host', - 'port' => 5672, + 'host' => 'host', + 'port' => 5672, 'vhost' => '/', ], [ 'name' => 'custom', @@ -88,10 +89,10 @@ public function testOverrideOptionsViaQueryParameters() { $this->assertEquals( new Connection([ - 'host' => 'localhost', - 'port' => 1234, - 'vhost' => 'vhost', - 'login' => 'guest', + 'host' => 'localhost', + 'port' => 1234, + 'vhost' => 'vhost', + 'login' => 'guest', 'password' => 'password', ], [ 'name' => 'exchangeName', @@ -106,9 +107,9 @@ public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn() { $this->assertEquals( new Connection([ - 'host' => 'localhost', - 'port' => 5672, - 'vhost' => '/', + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', 'persistent' => 'true', ], [ 'name' => 'exchangeName', @@ -117,7 +118,7 @@ public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn() ]), Connection::fromDsn('amqp://localhost/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [ 'persistent' => 'true', - 'exchange' => ['name' => 'toBeOverwritten'], + 'exchange' => ['name' => 'toBeOverwritten'], ]) ); } @@ -157,31 +158,30 @@ public function testSetsParametersOnTheQueueAndExchange() $amqpQueue->expects($this->once())->method('setArguments')->with([ 'x-dead-letter-exchange' => 'dead-exchange', - 'x-delay' => 100, - 'x-expires' => 150, - 'x-max-length' => 200, - 'x-max-length-bytes' => 300, - 'x-max-priority' => 4, - 'x-message-ttl' => 100, + 'x-delay' => 100, + 'x-expires' => 150, + 'x-max-length' => 200, + 'x-max-length-bytes' => 300, + 'x-max-priority' => 4, + 'x-message-ttl' => 100, ]); $amqpExchange->expects($this->once())->method('setArguments')->with([ 'alternate-exchange' => 'alternate', ]); - $dsn = 'amqp://localhost/%2f/messages?'. - 'queues[messages][arguments][x-dead-letter-exchange]=dead-exchange&'. - 'queues[messages][arguments][x-message-ttl]=100&'. - 'queues[messages][arguments][x-delay]=100&'. - 'queues[messages][arguments][x-expires]=150&' - ; + $dsn = 'amqp://localhost/%2f/messages?' . + 'queues[messages][arguments][x-dead-letter-exchange]=dead-exchange&' . + 'queues[messages][arguments][x-message-ttl]=100&' . + 'queues[messages][arguments][x-delay]=100&' . + 'queues[messages][arguments][x-expires]=150&'; $connection = Connection::fromDsn($dsn, [ - 'queues' => [ + 'queues' => [ 'messages' => [ 'arguments' => [ - 'x-max-length' => '200', + 'x-max-length' => '200', 'x-max-length-bytes' => '300', - 'x-max-priority' => '4', + 'x-max-priority' => '4', ], ], ], @@ -199,12 +199,12 @@ public function invalidQueueArgumentsDataProvider(): iterable $baseDsn = 'amqp://localhost/%2f/messages'; return [ - [$baseDsn.'?queues[messages][arguments][x-delay]=not-a-number', []], - [$baseDsn.'?queues[messages][arguments][x-expires]=not-a-number', []], - [$baseDsn.'?queues[messages][arguments][x-max-length]=not-a-number', []], - [$baseDsn.'?queues[messages][arguments][x-max-length-bytes]=not-a-number', []], - [$baseDsn.'?queues[messages][arguments][x-max-priority]=not-a-number', []], - [$baseDsn.'?queues[messages][arguments][x-message-ttl]=not-a-number', []], + [$baseDsn . '?queues[messages][arguments][x-delay]=not-a-number', []], + [$baseDsn . '?queues[messages][arguments][x-expires]=not-a-number', []], + [$baseDsn . '?queues[messages][arguments][x-max-length]=not-a-number', []], + [$baseDsn . '?queues[messages][arguments][x-max-length-bytes]=not-a-number', []], + [$baseDsn . '?queues[messages][arguments][x-max-priority]=not-a-number', []], + [$baseDsn . '?queues[messages][arguments][x-message-ttl]=not-a-number', []], // Ensure the exception is thrown when the arguments are passed via the array options [$baseDsn, ['queues' => ['messages' => ['arguments' => ['x-delay' => 'not-a-number']]]]], @@ -271,7 +271,10 @@ public function testItSetupsTheConnectionWithDefaults() ); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $amqpExchange->expects($this->once())->method('publish')->with('body', + null, + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $amqpQueue->expects($this->once())->method('declareQueue'); $amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null); @@ -294,7 +297,10 @@ public function testItSetupsTheConnection() $factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1)); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $amqpExchange->expects($this->once())->method('publish')->with('body', + 'routing_key', + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $amqpQueue0->expects($this->once())->method('declareQueue'); $amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive( [self::DEFAULT_EXCHANGE_NAME, 'binding_key0'], @@ -306,11 +312,11 @@ public function testItSetupsTheConnection() [self::DEFAULT_EXCHANGE_NAME, 'binding_key3'] ); - $dsn = 'amqp://localhost?'. - 'exchange[default_publish_routing_key]=routing_key&'. - 'queues[queue0][binding_keys][0]=binding_key0&'. - 'queues[queue0][binding_keys][1]=binding_key1&'. - 'queues[queue1][binding_keys][0]=binding_key2&'. + $dsn = 'amqp://localhost?' . + 'exchange[default_publish_routing_key]=routing_key&' . + 'queues[queue0][binding_keys][0]=binding_key0&' . + 'queues[queue0][binding_keys][1]=binding_key1&' . + 'queues[queue1][binding_keys][0]=binding_key2&' . 'queues[queue1][binding_keys][1]=binding_key3'; $connection = Connection::fromDsn($dsn, [], $factory); @@ -332,7 +338,10 @@ public function testItSetupsTheTTLConnection() $factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1)); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $amqpExchange->expects($this->once())->method('publish')->with('body', + 'routing_key', + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $amqpQueue0->expects($this->once())->method('declareQueue'); $amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive( [self::DEFAULT_EXCHANGE_NAME, 'binding_key0'], @@ -344,12 +353,12 @@ public function testItSetupsTheTTLConnection() [self::DEFAULT_EXCHANGE_NAME, 'binding_key3'] ); - $dsn = 'amqps://localhost?'. - 'cacert=/etc/ssl/certs&'. - 'exchange[default_publish_routing_key]=routing_key&'. - 'queues[queue0][binding_keys][0]=binding_key0&'. - 'queues[queue0][binding_keys][1]=binding_key1&'. - 'queues[queue1][binding_keys][0]=binding_key2&'. + $dsn = 'amqps://localhost?' . + 'cacert=/etc/ssl/certs&' . + 'exchange[default_publish_routing_key]=routing_key&' . + 'queues[queue0][binding_keys][0]=binding_key0&' . + 'queues[queue0][binding_keys][1]=binding_key1&' . + 'queues[queue1][binding_keys][0]=binding_key2&' . 'queues[queue1][binding_keys][1]=binding_key3'; $connection = Connection::fromDsn($dsn, [], $factory); @@ -370,13 +379,16 @@ public function testBindingArguments() $factory->method('createQueue')->willReturn($amqpQueue); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $amqpExchange->expects($this->once())->method('publish')->with('body', + null, + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $amqpQueue->expects($this->once())->method('declareQueue'); $amqpQueue->expects($this->exactly(1))->method('bind')->withConsecutive( [self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all']] ); - $dsn = 'amqp://localhost?exchange[type]=headers'. + $dsn = 'amqp://localhost?exchange[type]=headers' . '&queues[queue0][binding_arguments][x-match]=all'; $connection = Connection::fromDsn($dsn, [], $factory); @@ -407,17 +419,29 @@ public function testMultipleBindings() [self::DEFAULT_EXCHANGE_NAME, 'binding_key0', ['x-match' => 'all', 'header-property' => 'remove']] ); - $dsn = 'amqp://localhost?exchange[type]=headers'. - '&queues[queue0][bindings][one][arguments][x-match]=all'. - '&queues[queue0][bindings][one][arguments][header-property]=change'. - '&queues[queue0][bindings][two][arguments][x-match]=all'. - '&queues[queue0][bindings][two][arguments][header-property]=remove'. + $dsn = 'amqp://localhost?exchange[type]=headers' . + '&queues[queue0][bindings][one][arguments][x-match]=all' . + '&queues[queue0][bindings][one][arguments][header-property]=change' . + '&queues[queue0][bindings][two][arguments][x-match]=all' . + '&queues[queue0][bindings][two][arguments][header-property]=remove' . '&queues[queue0][bindings][two][key]=binding_key0'; $connection = Connection::fromDsn($dsn, [], $factory); $connection->publish('body', [], 0, $amqpStamp); } + public function testMultipleBindingsAndDeprecatedBindings() + { + $factory = $this->createMock(AmqpFactory::class); + $dsn = 'amqp://localhost?exchange[type]=headers' . + '&queues[queue0][bindings][one][arguments][x-match]=all' . + '&queues[queue0][binding_keys][one]' . + '&queues[queue0][binding_arguments][one][header-property]=change'; + + $this->expectException(LogicException::class); + Connection::fromDsn($dsn, [], $factory); + } + public function testItCanDisableTheSetup() { $factory = new TestAmqpFactory( @@ -495,11 +519,13 @@ public function testItDelaysTheMessage() $delayExchange->expects($this->once()) ->method('publish') ->with('{}', 'delay_messages__5000_delay', \AMQP_NOPARAM, [ - 'headers' => ['x-some-headers' => 'foo'], + 'headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2, - 'timestamp' => time(), + 'timestamp' => time(), ]); - $connection = $this->createDelayOrRetryConnection($delayExchange, self::DEFAULT_EXCHANGE_NAME, 'delay_messages__5000_delay'); + $connection = $this->createDelayOrRetryConnection($delayExchange, + self::DEFAULT_EXCHANGE_NAME, + 'delay_messages__5000_delay'); $connection->publish('{}', ['x-some-headers' => 'foo'], 5000); } @@ -544,16 +570,19 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs() $delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000_delay'); $delayQueue->expects($this->once())->method('setArguments')->with([ - 'x-message-ttl' => 120000, - 'x-expires' => 120000 + 10000, - 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, + 'x-message-ttl' => 120000, + 'x-expires' => 120000 + 10000, + 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, 'x-dead-letter-routing-key' => '', ]); $delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000_delay'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $delayExchange->expects($this->once())->method('publish')->with('{}', + 'delay_messages__120000_delay', + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $connection->publish('{}', [], 120000); } @@ -606,10 +635,16 @@ public function testAmqpStampHeadersAreUsed() $amqpExchange = $this->createMock(\AMQPExchange::class) ); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2, 'timestamp' => time()]); + $amqpExchange->expects($this->once())->method('publish')->with('body', + null, + \AMQP_NOPARAM, + ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2, 'timestamp' => time()]); $connection = Connection::fromDsn('amqp://localhost', [], $factory); - $connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, \AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']])); + $connection->publish('body', + ['Foo' => 'X'], + 0, + new AmqpStamp(null, \AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']])); } public function testAmqpStampDeliveryModeIsUsed() @@ -621,7 +656,10 @@ public function testAmqpStampDeliveryModeIsUsed() $amqpExchange = $this->createMock(\AMQPExchange::class) ); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1, 'timestamp' => time()]); + $amqpExchange->expects($this->once())->method('publish')->with('body', + null, + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 1, 'timestamp' => time()]); $connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection->publish('body', [], 0, new AmqpStamp(null, \AMQP_NOPARAM, ['delivery_mode' => 1])); @@ -630,7 +668,7 @@ public function testAmqpStampDeliveryModeIsUsed() public function testItCanPublishWithTheDefaultRoutingKey() { $factory = new TestAmqpFactory( - $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPConnection::class), $this->createMock(\AMQPChannel::class), $this->createMock(\AMQPQueue::class), $amqpExchange = $this->createMock(\AMQPExchange::class) @@ -638,7 +676,9 @@ public function testItCanPublishWithTheDefaultRoutingKey() $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key'); - $connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=routing_key', [], $factory); + $connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=routing_key', + [], + $factory); $connection->publish('body'); } @@ -653,7 +693,9 @@ public function testItCanPublishWithASuppliedRoutingKey() $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key'); - $connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', [], $factory); + $connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', + [], + $factory); $connection->publish('body', [], 0, new AmqpStamp('routing_key')); } @@ -684,16 +726,19 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument $delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000_delay'); $delayQueue->expects($this->once())->method('setArguments')->with([ - 'x-message-ttl' => 120000, - 'x-expires' => 120000 + 10000, - 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, + 'x-message-ttl' => 120000, + 'x-expires' => 120000 + 10000, + 'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME, 'x-dead-letter-routing-key' => 'routing_key', ]); $delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000_delay'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $delayExchange->expects($this->once())->method('publish')->with('{}', + 'delay_messages_routing_key_120000_delay', + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $connection->publish('{}', [], 120000, new AmqpStamp('routing_key')); } @@ -714,7 +759,10 @@ public function testItCanPublishWithCustomFlagsAndAttributes() ); $connection = Connection::fromDsn('amqp://localhost', [], $factory); - $connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', \AMQP_IMMEDIATE, ['delivery_mode' => 2])); + $connection->publish('body', + ['type' => DummyMessage::class], + 0, + new AmqpStamp('routing_key', \AMQP_IMMEDIATE, ['delivery_mode' => 2])); } public function testItPublishMessagesWithoutWaitingForConfirmation() @@ -766,8 +814,8 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn() { $this->assertEquals( new Connection([ - 'host' => 'localhost', - 'port' => 5672, + 'host' => 'localhost', + 'port' => 5672, 'vhost' => '/', ], [ 'name' => self::DEFAULT_EXCHANGE_NAME, @@ -776,8 +824,8 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn() ]), Connection::fromDsn('amqp://', [ 'cacert' => 'foobar', - 'cert' => 'foobar', - 'key' => 'foobar', + 'cert' => 'foobar', + 'key' => 'foobar', 'verify' => false, ]) ); @@ -802,9 +850,9 @@ private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, stri $delayQueue->expects($this->once())->method('setName')->with($delayQueueName); $delayQueue->expects($this->once())->method('setArguments')->with([ - 'x-message-ttl' => 5000, - 'x-expires' => 5000 + 10000, - 'x-dead-letter-exchange' => $deadLetterExchangeName, + 'x-message-ttl' => 5000, + 'x-expires' => 5000 + 10000, + 'x-dead-letter-exchange' => $deadLetterExchangeName, 'x-dead-letter-routing-key' => '', ]); diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 1e0ecf399be66..4007d939a1cc1 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -65,12 +65,15 @@ class Connection ]; private const AVAILABLE_QUEUE_OPTIONS = [ - 'bindings', 'flags', 'arguments', ]; - private const ORIGINAL_BINDING_KEYS = [ + private const NEW_QUEUE_OPTIONS = [ + 'bindings', + ]; + + private const DEPRECATED_BINDING_KEYS = [ 'binding_keys', 'binding_arguments', ]; @@ -252,17 +255,21 @@ private static function validateOptions(array $options): void if (!\is_array($queue)) { continue; } + if (0 < \count($invalidQueueOptions1 = array_intersect(array_keys($queue), self::DEPRECATED_BINDING_KEYS))) { + trigger_deprecation('symfony/messenger', '6.3', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "%s" option(s) should be used rather than "%s".', implode('", "', $invalidQueueOptions1), implode('", ', self::NEW_QUEUE_OPTIONS), implode('", ', self::DEPRECATED_BINDING_KEYS)); + if (0 < \count($invalidQueueOptions2 = array_intersect(array_keys($queue), self::NEW_QUEUE_OPTIONS))) { + throw new LogicException(sprintf('New "%s" and deprecated "%s" option(s) passed to the AMQP Messenger transport', implode('", "', $invalidQueueOptions2), implode('", "', $invalidQueueOptions1))); + } + } - if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::ORIGINAL_BINDING_KEYS))) { + if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::NEW_QUEUE_OPTIONS, self::DEPRECATED_BINDING_KEYS))) { throw new LogicException(sprintf('Invalid queue option(s) "%s" passed to the AMQP Messenger transport.', implode('", "', $invalidQueueOptions))); - } elseif (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) { - trigger_deprecation('symfony/messenger', '6.3', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "bindings" option should be used rather than "binding_keys" and "binding_arguments".', implode('", "', $invalidQueueOptions)); } if (\is_array($queue['bindings'] ?? false)) { foreach ($queue['bindings'] as $individualBinding) { if (0 < \count(array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) { - throw new \InvalidArgumentException(sprintf("Valid options for each 'bindings' are: %s", implode(', ', self::AVAILABLE_BINDINGS_OPTIONS))); + throw new LogicException(sprintf("Valid options for each 'bindings' are: %s", implode(', ', self::AVAILABLE_BINDINGS_OPTIONS))); } } } From 2275b223e1b37cbf3ceb543eb5b09afe3dcea3f3 Mon Sep 17 00:00:00 2001 From: Oleg Namaka Date: Thu, 12 Jan 2023 18:16:27 -0600 Subject: [PATCH 7/8] Temporarily remove deprecation logic --- .../Messenger/Bridge/Amqp/Transport/Connection.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 4007d939a1cc1..580f764110c78 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -255,10 +255,10 @@ private static function validateOptions(array $options): void if (!\is_array($queue)) { continue; } - if (0 < \count($invalidQueueOptions1 = array_intersect(array_keys($queue), self::DEPRECATED_BINDING_KEYS))) { - trigger_deprecation('symfony/messenger', '6.3', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "%s" option(s) should be used rather than "%s".', implode('", "', $invalidQueueOptions1), implode('", ', self::NEW_QUEUE_OPTIONS), implode('", ', self::DEPRECATED_BINDING_KEYS)); - if (0 < \count($invalidQueueOptions2 = array_intersect(array_keys($queue), self::NEW_QUEUE_OPTIONS))) { - throw new LogicException(sprintf('New "%s" and deprecated "%s" option(s) passed to the AMQP Messenger transport', implode('", "', $invalidQueueOptions2), implode('", "', $invalidQueueOptions1))); + if (0 < \count($deprecatedQueueOptions = array_intersect(array_keys($queue), self::DEPRECATED_BINDING_KEYS))) { + //trigger_deprecation('symfony/messenger', '6.3', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "%s" option(s) should be used rather than "%s".', implode('", "', $deprecatedQueueOptions), implode('", ', self::NEW_QUEUE_OPTIONS), implode('", ', self::DEPRECATED_BINDING_KEYS)); + if (0 < \count($newQueueOptions = array_intersect(array_keys($queue), self::NEW_QUEUE_OPTIONS))) { + throw new LogicException(sprintf('New "%s" and deprecated "%s" option(s) passed to the AMQP Messenger transport', implode('", "', $newQueueOptions), implode('", "', $deprecatedQueueOptions))); } } From 35d6478d20d9a5e62a3b99fd444ce885ef269cad Mon Sep 17 00:00:00 2001 From: Oleg Namaka Date: Fri, 13 Jan 2023 08:48:22 -0600 Subject: [PATCH 8/8] Restore deprecation triggering --- .../Component/Messenger/Bridge/Amqp/Transport/Connection.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 580f764110c78..b07e419866224 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -256,7 +256,7 @@ private static function validateOptions(array $options): void continue; } if (0 < \count($deprecatedQueueOptions = array_intersect(array_keys($queue), self::DEPRECATED_BINDING_KEYS))) { - //trigger_deprecation('symfony/messenger', '6.3', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "%s" option(s) should be used rather than "%s".', implode('", "', $deprecatedQueueOptions), implode('", ', self::NEW_QUEUE_OPTIONS), implode('", ', self::DEPRECATED_BINDING_KEYS)); + trigger_deprecation('symfony/messenger', '6.3', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "%s" option(s) should be used rather than "%s".', implode('", "', $deprecatedQueueOptions), implode('", ', self::NEW_QUEUE_OPTIONS), implode('", ', self::DEPRECATED_BINDING_KEYS)); if (0 < \count($newQueueOptions = array_intersect(array_keys($queue), self::NEW_QUEUE_OPTIONS))) { throw new LogicException(sprintf('New "%s" and deprecated "%s" option(s) passed to the AMQP Messenger transport', implode('", "', $newQueueOptions), implode('", "', $deprecatedQueueOptions))); }