vendor/enqueue/enqueue-bundle/DependencyInjection/EnqueueExtension.php line 111

Open in your IDE?
  1. <?php
  2. namespace Enqueue\Bundle\DependencyInjection;
  3. use Enqueue\AsyncCommand\DependencyInjection\AsyncCommandExtension;
  4. use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventDispatcherExtension;
  5. use Enqueue\Bundle\Consumption\Extension\DoctrineClearIdentityMapExtension;
  6. use Enqueue\Bundle\Consumption\Extension\DoctrineClosedEntityManagerExtension;
  7. use Enqueue\Bundle\Consumption\Extension\DoctrinePingConnectionExtension;
  8. use Enqueue\Bundle\Consumption\Extension\ResetServicesExtension;
  9. use Enqueue\Bundle\Profiler\MessageQueueCollector;
  10. use Enqueue\Client\CommandSubscriberInterface;
  11. use Enqueue\Client\TopicSubscriberInterface;
  12. use Enqueue\Consumption\Extension\ReplyExtension;
  13. use Enqueue\Consumption\Extension\SignalExtension;
  14. use Enqueue\JobQueue\Job;
  15. use Enqueue\Monitoring\Symfony\DependencyInjection\MonitoringFactory;
  16. use Enqueue\Symfony\Client\DependencyInjection\ClientFactory;
  17. use Enqueue\Symfony\DependencyInjection\TransportFactory;
  18. use Enqueue\Symfony\DiUtils;
  19. use Interop\Queue\Context;
  20. use Symfony\Component\Config\FileLocator;
  21. use Symfony\Component\Config\Resource\FileResource;
  22. use Symfony\Component\DependencyInjection\ContainerBuilder;
  23. use Symfony\Component\DependencyInjection\Extension\PrependExtensionInterface;
  24. use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
  25. use Symfony\Component\DependencyInjection\Reference;
  26. use Symfony\Component\HttpKernel\DependencyInjection\Extension;
  27. final class EnqueueExtension extends Extension implements PrependExtensionInterface
  28. {
  29.     public function load(array $configsContainerBuilder $container): void
  30.     {
  31.         $config $this->processConfiguration($this->getConfiguration($configs$container), $configs);
  32.         $loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
  33.         $loader->load('services.yml');
  34.         // find default configuration
  35.         $defaultName null;
  36.         foreach ($config as $name => $modules) {
  37.             // set first as default
  38.             if (null === $defaultName) {
  39.                 $defaultName $name;
  40.             }
  41.             // or with name 'default'
  42.             if (DiUtils::DEFAULT_CONFIG === $name) {
  43.                 $defaultName $name;
  44.             }
  45.         }
  46.         $transportNames = [];
  47.         $clientNames = [];
  48.         foreach ($config as $name => $modules) {
  49.             // transport & consumption
  50.             $transportNames[] = $name;
  51.             $transportFactory = (new TransportFactory($name$defaultName === $name));
  52.             $transportFactory->buildConnectionFactory($container$modules['transport']);
  53.             $transportFactory->buildContext($container, []);
  54.             $transportFactory->buildQueueConsumer($container$modules['consumption']);
  55.             $transportFactory->buildRpcClient($container, []);
  56.             // client
  57.             if (isset($modules['client'])) {
  58.                 $clientNames[] = $name;
  59.                 $clientConfig $modules['client'];
  60.                 // todo
  61.                 $clientConfig['transport'] = $modules['transport'];
  62.                 $clientConfig['consumption'] = $modules['consumption'];
  63.                 $clientFactory = new ClientFactory($name$defaultName === $name);
  64.                 $clientFactory->build($container$clientConfig);
  65.                 $clientFactory->createDriver($container$modules['transport']);
  66.                 $clientFactory->createFlushSpoolProducerListener($container);
  67.             }
  68.             // monitoring
  69.             if (isset($modules['monitoring'])) {
  70.                 $monitoringFactory = new MonitoringFactory($name);
  71.                 $monitoringFactory->buildStorage($container$modules['monitoring']);
  72.                 $monitoringFactory->buildConsumerExtension($container$modules['monitoring']);
  73.                 if (isset($modules['client'])) {
  74.                     $monitoringFactory->buildClientExtension($container$modules['monitoring']);
  75.                 }
  76.             }
  77.             // job-queue
  78.             if (false == empty($modules['job']['enabled'])) {
  79.                 if (false === isset($modules['client'])) {
  80.                     throw new \LogicException('Client is required for job-queue.');
  81.                 }
  82.                 if ($name !== $defaultName) {
  83.                     throw new \LogicException('Job-queue supports only default configuration.');
  84.                 }
  85.                 $loader->load('job.yml');
  86.             }
  87.             // async events
  88.             if (false == empty($modules['async_events']['enabled'])) {
  89.                 if ($name !== $defaultName) {
  90.                     throw new \LogicException('Async events supports only default configuration.');
  91.                 }
  92.                 $extension = new AsyncEventDispatcherExtension();
  93.                 $extension->load([[
  94.                     'context_service' => Context::class,
  95.                 ]], $container);
  96.             }
  97.         }
  98.         $defaultClient null;
  99.         if (in_array($defaultName$clientNamestrue)) {
  100.             $defaultClient $defaultName;
  101.         }
  102.         $container->setParameter('enqueue.transports'$transportNames);
  103.         $container->setParameter('enqueue.clients'$clientNames);
  104.         $container->setParameter('enqueue.default_transport'$defaultName);
  105.         if ($defaultClient) {
  106.             $container->setParameter('enqueue.default_client'$defaultClient);
  107.         }
  108.         if ($defaultClient) {
  109.             $this->setupAutowiringForDefaultClientsProcessors($container$defaultClient);
  110.         }
  111.         $this->loadMessageQueueCollector($config$container);
  112.         $this->loadAsyncCommands($config$container);
  113.         // extensions
  114.         $this->loadDoctrinePingConnectionExtension($config$container);
  115.         $this->loadDoctrineClearIdentityMapExtension($config$container);
  116.         $this->loadDoctrineOdmClearIdentityMapExtension($config$container);
  117.         $this->loadDoctrineClosedEntityManagerExtension($config$container);
  118.         $this->loadResetServicesExtension($config$container);
  119.         $this->loadSignalExtension($config$container);
  120.         $this->loadReplyExtension($config$container);
  121.     }
  122.     public function getConfiguration(array $configContainerBuilder $container): Configuration
  123.     {
  124.         $rc = new \ReflectionClass(Configuration::class);
  125.         $container->addResource(new FileResource($rc->getFileName()));
  126.         return new Configuration($container->getParameter('kernel.debug'));
  127.     }
  128.     public function prepend(ContainerBuilder $container): void
  129.     {
  130.         $this->registerJobQueueDoctrineEntityMapping($container);
  131.     }
  132.     private function registerJobQueueDoctrineEntityMapping(ContainerBuilder $container)
  133.     {
  134.         if (!class_exists(Job::class)) {
  135.             return;
  136.         }
  137.         $bundles $container->getParameter('kernel.bundles');
  138.         if (!isset($bundles['DoctrineBundle'])) {
  139.             return;
  140.         }
  141.         $config $container->getExtensionConfig('enqueue');
  142.         if (!empty($config)) {
  143.             $processedConfig $this->processConfiguration(new Configuration(false), $config);
  144.             foreach ($processedConfig as $name => $modules) {
  145.                 if (isset($modules['job']) && false === $modules['job']['default_mapping']) {
  146.                     return;
  147.                 }
  148.             }
  149.         }
  150.         foreach ($container->getExtensionConfig('doctrine') as $config) {
  151.             // do not register mappings if dbal not configured.
  152.             if (!empty($config['dbal'])) {
  153.                 $rc = new \ReflectionClass(Job::class);
  154.                 $jobQueueRootDir dirname($rc->getFileName());
  155.                 $container->prependExtensionConfig('doctrine', [
  156.                     'orm' => [
  157.                         'mappings' => [
  158.                             'enqueue_job_queue' => [
  159.                                 'is_bundle' => false,
  160.                                 'type' => 'xml',
  161.                                 'dir' => $jobQueueRootDir.'/Doctrine/mapping',
  162.                                 'prefix' => 'Enqueue\JobQueue\Doctrine\Entity',
  163.                             ],
  164.                         ],
  165.                     ],
  166.                 ]);
  167.                 break;
  168.             }
  169.         }
  170.     }
  171.     private function setupAutowiringForDefaultClientsProcessors(ContainerBuilder $containerstring $defaultClient)
  172.     {
  173.         $container->registerForAutoconfiguration(TopicSubscriberInterface::class)
  174.             ->setPublic(true)
  175.             ->addTag('enqueue.topic_subscriber', ['client' => $defaultClient])
  176.         ;
  177.         $container->registerForAutoconfiguration(CommandSubscriberInterface::class)
  178.             ->setPublic(true)
  179.             ->addTag('enqueue.command_subscriber', ['client' => $defaultClient])
  180.         ;
  181.     }
  182.     private function loadDoctrinePingConnectionExtension(array $configContainerBuilder $container): void
  183.     {
  184.         $configNames = [];
  185.         foreach ($config as $name => $modules) {
  186.             if ($modules['extensions']['doctrine_ping_connection_extension']) {
  187.                 $configNames[] = $name;
  188.             }
  189.         }
  190.         if ([] === $configNames) {
  191.             return;
  192.         }
  193.         $extension $container->register('enqueue.consumption.doctrine_ping_connection_extension'DoctrinePingConnectionExtension::class)
  194.             ->addArgument(new Reference('doctrine'))
  195.         ;
  196.         foreach ($configNames as $name) {
  197.             $extension->addTag('enqueue.consumption_extension', ['client' => $name]);
  198.             $extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
  199.         }
  200.     }
  201.     private function loadDoctrineClearIdentityMapExtension(array $configContainerBuilder $container): void
  202.     {
  203.         $configNames = [];
  204.         foreach ($config as $name => $modules) {
  205.             if ($modules['extensions']['doctrine_clear_identity_map_extension']) {
  206.                 $configNames[] = $name;
  207.             }
  208.         }
  209.         if ([] === $configNames) {
  210.             return;
  211.         }
  212.         $extension $container->register('enqueue.consumption.doctrine_clear_identity_map_extension'DoctrineClearIdentityMapExtension::class)
  213.             ->addArgument(new Reference('doctrine'))
  214.         ;
  215.         foreach ($configNames as $name) {
  216.             $extension->addTag('enqueue.consumption_extension', ['client' => $name]);
  217.             $extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
  218.         }
  219.     }
  220.     private function loadDoctrineOdmClearIdentityMapExtension(array $configContainerBuilder $container): void
  221.     {
  222.         $configNames = [];
  223.         foreach ($config as $name => $modules) {
  224.             if ($modules['extensions']['doctrine_odm_clear_identity_map_extension']) {
  225.                 $configNames[] = $name;
  226.             }
  227.         }
  228.         if ([] === $configNames) {
  229.             return;
  230.         }
  231.         $extension $container->register('enqueue.consumption.doctrine_odm_clear_identity_map_extension'DoctrineClearIdentityMapExtension::class)
  232.             ->addArgument(new Reference('doctrine_mongodb'))
  233.         ;
  234.         foreach ($configNames as $name) {
  235.             $extension->addTag('enqueue.consumption_extension', ['client' => $name]);
  236.             $extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
  237.         }
  238.     }
  239.     private function loadDoctrineClosedEntityManagerExtension(array $configContainerBuilder $container)
  240.     {
  241.         $configNames = [];
  242.         foreach ($config as $name => $modules) {
  243.             if ($modules['extensions']['doctrine_closed_entity_manager_extension']) {
  244.                 $configNames[] = $name;
  245.             }
  246.         }
  247.         if ([] === $configNames) {
  248.             return;
  249.         }
  250.         $extension $container->register('enqueue.consumption.doctrine_closed_entity_manager_extension'DoctrineClosedEntityManagerExtension::class)
  251.             ->addArgument(new Reference('doctrine'));
  252.         foreach ($configNames as $name) {
  253.             $extension->addTag('enqueue.consumption_extension', ['client' => $name]);
  254.             $extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
  255.         }
  256.     }
  257.     private function loadResetServicesExtension(array $configContainerBuilder $container)
  258.     {
  259.         $configNames = [];
  260.         foreach ($config as $name => $modules) {
  261.             if ($modules['extensions']['reset_services_extension']) {
  262.                 $configNames[] = $name;
  263.             }
  264.         }
  265.         if ([] === $configNames) {
  266.             return;
  267.         }
  268.         $extension $container->register('enqueue.consumption.reset_services_extension'ResetServicesExtension::class)
  269.             ->addArgument(new Reference('services_resetter'));
  270.         foreach ($configNames as $name) {
  271.             $extension->addTag('enqueue.consumption_extension', ['client' => $name]);
  272.             $extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
  273.         }
  274.     }
  275.     private function loadSignalExtension(array $configContainerBuilder $container): void
  276.     {
  277.         $configNames = [];
  278.         foreach ($config as $name => $modules) {
  279.             if ($modules['extensions']['signal_extension']) {
  280.                 $configNames[] = $name;
  281.             }
  282.         }
  283.         if ([] === $configNames) {
  284.             return;
  285.         }
  286.         $extension $container->register('enqueue.consumption.signal_extension'SignalExtension::class);
  287.         foreach ($configNames as $name) {
  288.             $extension->addTag('enqueue.consumption_extension', ['client' => $name]);
  289.             $extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
  290.         }
  291.     }
  292.     private function loadReplyExtension(array $configContainerBuilder $container): void
  293.     {
  294.         $configNames = [];
  295.         foreach ($config as $name => $modules) {
  296.             if ($modules['extensions']['reply_extension']) {
  297.                 $configNames[] = $name;
  298.             }
  299.         }
  300.         if ([] === $configNames) {
  301.             return;
  302.         }
  303.         $extension $container->register('enqueue.consumption.reply_extension'ReplyExtension::class);
  304.         foreach ($configNames as $name) {
  305.             $extension->addTag('enqueue.consumption_extension', ['client' => $name]);
  306.             $extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
  307.         }
  308.     }
  309.     private function loadAsyncCommands(array $configContainerBuilder $container): void
  310.     {
  311.         $configs = [];
  312.         foreach ($config as $name => $modules) {
  313.             if (false === empty($modules['async_commands']['enabled'])) {
  314.                 $configs[] = [
  315.                     'name' => $name,
  316.                     'timeout' => $modules['async_commands']['timeout'],
  317.                     'command_name' => $modules['async_commands']['command_name'],
  318.                     'queue_name' => $modules['async_commands']['queue_name'],
  319.                 ];
  320.             }
  321.         }
  322.         if (false == $configs) {
  323.             return;
  324.         }
  325.         if (false == class_exists(AsyncCommandExtension::class)) {
  326.             throw new \LogicException('The "enqueue/async-command" package has to be installed.');
  327.         }
  328.         $extension = new AsyncCommandExtension();
  329.         $extension->load(['clients' => $configs], $container);
  330.     }
  331.     private function loadMessageQueueCollector(array $configContainerBuilder $container)
  332.     {
  333.         $configNames = [];
  334.         foreach ($config as $name => $modules) {
  335.             if (isset($modules['client'])) {
  336.                 $configNames[] = $name;
  337.             }
  338.         }
  339.         if (false == $configNames) {
  340.             return;
  341.         }
  342.         $service $container->register('enqueue.profiler.message_queue_collector'MessageQueueCollector::class);
  343.         $service->addTag('data_collector', [
  344.             'template' => '@Enqueue/Profiler/panel.html.twig',
  345.             'id' => 'enqueue.message_queue',
  346.         ]);
  347.         foreach ($configNames as $configName) {
  348.             $service->addMethodCall('addProducer', [$configNameDiUtils::create('client'$configName)->reference('producer')]);
  349.         }
  350.     }
  351. }