Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,10 @@ function ($a) {
->prototype('variable')
->end()
->end()
->scalarNode('failure_transport')
->defaultNull()
->info('Transport name to send failed messages to (after all retries have failed).')
->end()
->arrayNode('retry_strategy')
->addDefaultsIfNotSet()
->beforeNormalization()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1793,7 +1793,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
->replaceArgument(2, $config['serializer']['symfony_serializer']['context']);
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
}

$senderAliases = [];
$transportRetryReferences = [];
foreach ($config['transports'] as $name => $transport) {
Expand All @@ -1802,7 +1802,11 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$transportDefinition = (new Definition(TransportInterface::class))
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
->addTag('messenger.receiver', ['alias' => $name])
->addTag('messenger.receiver', [
'alias' => $name,
'failure_transport' => $transport['failure_transport'] ?? null
]
)
;
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
$senderAliases[$name] = $transportId;
Expand Down Expand Up @@ -1863,24 +1867,59 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$container->getDefinition('messenger.retry_strategy_locator')
->replaceArgument(0, $transportRetryReferences);

$failureTransports = [];
$failureTransportsByTransportName = [];

$failureTransportsServiceLocatorId = 'messenger.failure_transports.locator';
$failureTransportsByTransportNameServiceLocatorId = 'messenger.failure_transports_by_transport_name.locator';

if ($config['failure_transport']) {
if (!isset($senderReferences[$config['failure_transport']])) {
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
}

$failureTransports[$config['failure_transport']] = $senderReferences[$config['failure_transport']];
$container->setAlias('messenger.failure_transports.default_transport', $config['failure_transport']);
}

$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
->replaceArgument(0, $senderReferences[$config['failure_transport']]);
foreach ($config['transports'] as $name => $transport) {
if ($transport['failure_transport']) {
if (!isset($config['transports'][$transport['failure_transport']])) {
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $transport['failure_transport']));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be checking for !isset($senderReferences[$transport['failure_transport']]) here instead? That would be consistent with the above code. It's more confusing, but there is the edge case that the failure_transport is not a registered transport, but just a "sender service id"


$failureTransportsByTransportName[$name] = $senderReferences[$transport['failure_transport']];
$failureTransports[$transport['failure_transport']] = $senderReferences[$transport['failure_transport']];
}
}

if (\count($failureTransports) > 0) {
$failureTransportsServiceLocator = ServiceLocatorTagPass::register($container, $failureTransports, $failureTransportsServiceLocatorId);
$container->getDefinition($failureTransportsServiceLocatorId)
->replaceArgument(0, $failureTransports);

$globalFailureReceiver = $config['failure_transport'] ?? null;
$container->getDefinition('console.command.messenger_failed_messages_retry')
->replaceArgument(0, $config['failure_transport']);
->replaceArgument(0, $globalFailureReceiver)
->replaceArgument(1, $senderReferences[$config['failure_transport']] ?? null)
->replaceArgument(5, $container->getDefinition($failureTransportsServiceLocator));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem here (ish) as below - we should pass a Reference (which $failureTransportsServiceLocator is), not a Definition.

$container->getDefinition('console.command.messenger_failed_messages_show')
->replaceArgument(0, $config['failure_transport']);
->replaceArgument(0, $globalFailureReceiver)
->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocatorId));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should be references, right? We don't normally set a Definition directly on an argument. And should it use $failureTransportsServiceLocatorId or $failureTransportsServiceLocator - I'm getting lost (this stuff confuses me) in which is which and what should be used.

$container->getDefinition('console.command.messenger_failed_messages_remove')
->replaceArgument(0, $config['failure_transport']);
->replaceArgument(0, $globalFailureReceiver)
->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocatorId));

$failureTransportsByTransportNameServiceLocator = ServiceLocatorTagPass::register($container, $failureTransportsByTransportName, $failureTransportsByTransportNameServiceLocatorId);
$container->getDefinition($failureTransportsByTransportNameServiceLocatorId)
->replaceArgument(0, $failureTransportsByTransportName);
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
->replaceArgument(0, $failureTransportsByTransportNameServiceLocator);
} else {
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
$container->removeDefinition('console.command.messenger_failed_messages_retry');
$container->removeDefinition('console.command.messenger_failed_messages_show');
$container->removeDefinition('console.command.messenger_failed_messages_remove');
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could revert this change to lessen the diff

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,20 +170,21 @@
service('messenger.routable_message_bus'),
service('event_dispatcher'),
service('logger'),
abstract_arg('Receivers'),
])
->tag('console.command', ['command' => 'messenger:failed:retry'])

->set('console.command.messenger_failed_messages_show', FailedMessagesShowCommand::class)
->args([
abstract_arg('Receiver name'),
abstract_arg('Receiver'),
abstract_arg('Receivers'),
])
->tag('console.command', ['command' => 'messenger:failed:show'])

->set('console.command.messenger_failed_messages_remove', FailedMessagesRemoveCommand::class)
->args([
abstract_arg('Receiver name'),
abstract_arg('Receiver'),
abstract_arg('Receivers'),
])
->tag('console.command', ['command' => 'messenger:failed:remove'])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\DependencyInjection\Loader\Configurator;

use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory;
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransportFactory;
Expand Down Expand Up @@ -130,6 +131,18 @@

->set('messenger.transport.beanstalkd.factory', BeanstalkdTransportFactory::class)

// failed transports
->set('messenger.failure_transports.locator', ServiceLocator::class)
->args([
abstract_arg('failed transports map by name'),
])
->tag('container.service_locator')
->set('messenger.failure_transports_by_transport_name.locator', ServiceLocator::class)
->args([
abstract_arg('failed transports map by transport name'),
])
->tag('container.service_locator')

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm correct in FrameworkExtension, these won't be needed.

// retry
->set('messenger.retry_strategy_locator')
->args([
Expand Down Expand Up @@ -158,7 +171,7 @@

->set('messenger.failure.send_failed_message_to_failure_transport_listener', SendFailedMessageToFailureTransportListener::class)
->args([
abstract_arg('failure transport'),
abstract_arg('failure transports'),
service('logger')->ignoreOnInvalid(),
])
->tag('kernel.event_subscriber')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="serializer" type="xsd:string" />
<xsd:attribute name="dsn" type="xsd:string" />
<xsd:attribute name="failure-transport" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="messenger_retry_strategy">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
'transport_1' => [
'dsn' => 'null://',
'failure_transport' => 'failure_transport_1'
],
'transport_2' => 'null://',
'transport_3' => [
'dsn' => 'null://',
'failure_transport' => 'failure_transport_3'
],
'failure_transport_1' => 'null://',
'failure_transport_3' => 'null://'
],
],
]);
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

$container->loadFromExtension('framework', [
'messenger' => [
'failure_transport' => 'failure_transport_global',
'transports' => [
'transport_1' => [
'dsn' => 'null://',
'failure_transport' => 'failure_transport_1'
],
'transport_2' => 'null://',
'transport_3' => [
'dsn' => 'null://',
'failure_transport' => 'failure_transport_3'
],
'failure_transport_global' => 'null://',
'failure_transport_1' => 'null://',
'failure_transport_3' => 'null://',
],
],
]);
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger>
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
<framework:transport name="transport_2" dsn="null://" />
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
<framework:transport name="failure_transport_1" dsn="null://" />
<framework:transport name="failure_transport_3" dsn="null://" />
</framework:messenger>
</framework:config>
</container>
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger failure-transport="failure_transport_global">
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
<framework:transport name="transport_2" dsn="null://" />
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
<framework:transport name="failure_transport_global" dsn="null://" />
<framework:transport name="failure_transport_1" dsn="null://" />
<framework:transport name="failure_transport_3" dsn="null://" />
</framework:messenger>
</framework:config>
</container>
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
framework:
messenger:
transports:
transport_1:
dsn: 'null://'
failure_transport: failure_transport_1
transport_2: 'null://'
transport_3:
dsn: 'null://'
failure_transport: failure_transport_3
failure_transport_1: 'null://'
failure_transport_3: 'null://'
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
framework:
messenger:
failure_transport: failure_transport_global
transports:
transport_1:
dsn: 'null://'
failure_transport: failure_transport_1
transport_2: 'null://'
transport_3:
dsn: 'null://'
failure_transport: failure_transport_3
failure_transport_global: 'null://'
failure_transport_1: 'null://'
failure_transport_3: 'null://'
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@
use Symfony\Component\Validator\DependencyInjection\AddConstraintValidatorsPass;
use Symfony\Component\Validator\Mapping\Loader\PropertyInfoLoader;
use Symfony\Component\Workflow;
use Symfony\Component\Workflow\Metadata\InMemoryMetadataStore;
use Symfony\Component\Workflow\WorkflowEvents;
use Symfony\Contracts\Cache\CacheInterface;
use Symfony\Contracts\Cache\TagAwareCacheInterface;
use Symfony\Component\Workflow\Metadata\InMemoryMetadataStore;

abstract class FrameworkExtensionTest extends TestCase
{
Expand Down Expand Up @@ -648,6 +648,48 @@ public function testMessenger()
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
}

public function testMessengerMultipleFailureTransports()
{
$container = $this->createContainerFromFile('messenger_multiple_failure_transports');

// transport 2 exists but does not appear in the mapping
$expectedFailureTransports = [
'failure_transport_1' => new Reference('messenger.transport.failure_transport_1'),
'failure_transport_3' => new Reference('messenger.transport.failure_transport_3'),
];
$failureTransportsLocator = $container->getDefinition('messenger.failure_transports.locator');
$this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0));

$expectedFailureTransportsByTransportName = [
'transport_1' => new Reference('messenger.transport.failure_transport_1'),
'transport_3' => new Reference('messenger.transport.failure_transport_3'),
];

$failureTransportsByTransportNameLocator = $container->getDefinition('messenger.failure_transports_by_transport_name.locator');
$this->assertEquals($expectedFailureTransportsByTransportName, $failureTransportsByTransportNameLocator->getArgument(0));
}

public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport()
{
$container = $this->createContainerFromFile('messenger_multiple_failure_transports_global');

$expectedFailureTransports = [
'failure_transport_global' => new Reference('messenger.transport.failure_transport_global'),
'failure_transport_1' => new Reference('messenger.transport.failure_transport_1'),
'failure_transport_3' => new Reference('messenger.transport.failure_transport_3'),
];
$failureTransportsLocator = $container->getDefinition('messenger.failure_transports.locator');
$this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0));

$expectedFailureTransportsByTransportName = [
'transport_1' => new Reference('messenger.transport.failure_transport_1'),
'transport_3' => new Reference('messenger.transport.failure_transport_3'),
];

$failureTransportsByTransportNameLocator = $container->getDefinition('messenger.failure_transports_by_transport_name.locator');
$this->assertEquals($expectedFailureTransportsByTransportName, $failureTransportsByTransportNameLocator->getArgument(0));
}

public function testMessengerTransports()
{
$container = $this->createContainerFromFile('messenger_transports');
Expand Down Expand Up @@ -694,7 +736,13 @@ public function testMessengerTransports()
$this->assertSame(3, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(2));
$this->assertSame(100, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(3));

$this->assertEquals(new Reference('messenger.transport.failed'), $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0));
$failureTransportsLocator = $container->getDefinition('messenger.failure_transports.locator');
$expectedFailureTransports = [
'failed' => new Reference('messenger.transport.failed'),
];
$this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0));
$failureTransportsByTransportNameLocator = $container->getDefinition('messenger.failure_transports_by_transport_name.locator');
$this->assertEquals([], $failureTransportsByTransportNameLocator->getArgument(0));
}

public function testMessengerRouting()
Expand Down
Loading