platform/src/Core/Framework/MessageQueue/Subscriber/MessageFailedHandler.php line 32

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\Subscriber;
  3. use Shopware\Core\Framework\Increment\Exception\IncrementGatewayNotFoundException;
  4. use Shopware\Core\Framework\Increment\IncrementGatewayRegistry;
  5. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  6. use Symfony\Component\Messenger\Envelope;
  7. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  8. use Symfony\Component\Messenger\Stamp\ReceivedStamp;
  9. class MessageFailedHandler implements EventSubscriberInterface
  10. {
  11.     private string $defaultTransportName;
  12.     private IncrementGatewayRegistry $gatewayRegistry;
  13.     public function __construct(IncrementGatewayRegistry $gatewayRegistrystring $defaultTransportName)
  14.     {
  15.         $this->defaultTransportName $defaultTransportName;
  16.         $this->gatewayRegistry $gatewayRegistry;
  17.     }
  18.     public static function getSubscribedEvents(): array
  19.     {
  20.         return [
  21.             // must have higher priority than SendFailedMessageToFailureTransportListener
  22.             WorkerMessageFailedEvent::class => ['onMessageFailed'99],
  23.         ];
  24.     }
  25.     public function onMessageFailed(WorkerMessageFailedEvent $event): void
  26.     {
  27.         if ($event->willRetry()) {
  28.             return;
  29.         }
  30.         $message $event->getEnvelope();
  31.         if (!$this->wasReceivedByDefaultTransport($message)) {
  32.             return;
  33.         }
  34.         $name \get_class($message->getMessage());
  35.         try {
  36.             $gateway $this->gatewayRegistry->get(IncrementGatewayRegistry::MESSAGE_QUEUE_POOL);
  37.         } catch (IncrementGatewayNotFoundException $exception) {
  38.             return;
  39.         }
  40.         $gateway->decrement('message_queue_stats'$name);
  41.     }
  42.     private function wasReceivedByDefaultTransport(Envelope $message): bool
  43.     {
  44.         foreach ($message->all(ReceivedStamp::class) as $stamp) {
  45.             if ($stamp instanceof ReceivedStamp && $stamp->getTransportName() === $this->defaultTransportName) {
  46.                 return true;
  47.             }
  48.         }
  49.         return false;
  50.     }
  51. }