platform/src/Core/Framework/DataAbstractionLayer/Indexing/EntityIndexerRegistry.php line 96

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\DataAbstractionLayer\Indexing;
  3. use Shopware\Core\Framework\Context;
  4. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
  5. use Shopware\Core\Framework\DataAbstractionLayer\Indexing\MessageQueue\IterateEntityIndexerMessage;
  6. use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
  7. use Shopware\Core\Framework\Event\ProgressFinishedEvent;
  8. use Shopware\Core\Framework\Event\ProgressStartedEvent;
  9. use Shopware\Core\Framework\MessageQueue\Handler\AbstractMessageHandler;
  10. use Shopware\Core\Framework\Struct\ArrayStruct;
  11. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  12. use Symfony\Component\Messenger\MessageBusInterface;
  13. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  14. class EntityIndexerRegistry extends AbstractMessageHandler implements EventSubscriberInterface
  15. {
  16.     public const EXTENSION_INDEXER_SKIP 'indexer-skip';
  17.     /**
  18.      * @deprecated tag:v6.5.0 - `$context->addExtension(EntityIndexerRegistry::USE_INDEXING_QUEUE, ...)` will be ignored, use `context->addState(EntityIndexerRegistry::USE_INDEXING_QUEUE)` instead
  19.      */
  20.     public const USE_INDEXING_QUEUE 'use-queue-indexing';
  21.     /**
  22.      * @deprecated tag:v6.5.0 - `$context->addExtension(EntityIndexerRegistry::DISABLE_INDEXING, ...)` will be ignored, use `context->addState(EntityIndexerRegistry::DISABLE_INDEXING)` instead
  23.      */
  24.     public const DISABLE_INDEXING 'disable-indexing';
  25.     /**
  26.      * @var EntityIndexer[]
  27.      */
  28.     private iterable $indexer;
  29.     private MessageBusInterface $messageBus;
  30.     private bool $working false;
  31.     private EventDispatcherInterface $dispatcher;
  32.     public function __construct(iterable $indexerMessageBusInterface $messageBusEventDispatcherInterface $dispatcher)
  33.     {
  34.         $this->indexer $indexer;
  35.         $this->messageBus $messageBus;
  36.         $this->dispatcher $dispatcher;
  37.     }
  38.     public static function getSubscribedEvents(): array
  39.     {
  40.         return [
  41.             EntityWrittenContainerEvent::class => [
  42.                 ['refresh'1000],
  43.             ],
  44.         ];
  45.     }
  46.     public static function getHandledMessages(): iterable
  47.     {
  48.         return [
  49.             EntityIndexingMessage::class,
  50.             IterateEntityIndexerMessage::class,
  51.         ];
  52.     }
  53.     public function index(bool $useQueue, array $skip = []): void
  54.     {
  55.         foreach ($this->indexer as $indexer) {
  56.             if (\in_array($indexer->getName(), $skiptrue)) {
  57.                 continue;
  58.             }
  59.             $offset null;
  60.             $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $indexer->getTotal()));
  61.             while ($message $indexer->iterate($offset)) {
  62.                 $message->setIndexer($indexer->getName());
  63.                 $message->setSkip($skip);
  64.                 $this->sendOrHandle($message$useQueue);
  65.                 $offset $message->getOffset();
  66.                 try {
  67.                     $count \is_array($message->getData()) ? \count($message->getData()) : 1;
  68.                     $this->dispatcher->dispatch(new ProgressAdvancedEvent($count));
  69.                 } catch (\Exception $e) {
  70.                 }
  71.             }
  72.             $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
  73.         }
  74.     }
  75.     public function refresh(EntityWrittenContainerEvent $event): void
  76.     {
  77.         $context $event->getContext();
  78.         if ($this->working) {
  79.             return;
  80.         }
  81.         $this->working true;
  82.         if ($this->disabled($context)) {
  83.             $this->working false;
  84.             return;
  85.         }
  86.         $skips = [];
  87.         if ($context->hasExtension(self::EXTENSION_INDEXER_SKIP)) {
  88.             /** @var ArrayStruct $skip */
  89.             $skip $context->getExtension(self::EXTENSION_INDEXER_SKIP);
  90.             $skips $skip->all();
  91.         }
  92.         $useQueue $this->useQueue($context);
  93.         foreach ($this->indexer as $indexer) {
  94.             $message $indexer->update($event);
  95.             if (!$message) {
  96.                 continue;
  97.             }
  98.             $message->setIndexer($indexer->getName());
  99.             $message->setSkip($skips);
  100.             $this->sendOrHandle($message$useQueue);
  101.         }
  102.         $this->working false;
  103.     }
  104.     public function handle($message): void
  105.     {
  106.         if ($message instanceof EntityIndexingMessage) {
  107.             $indexer $this->getIndexer($message->getIndexer());
  108.             if ($indexer) {
  109.                 $indexer->handle($message);
  110.             }
  111.             return;
  112.         }
  113.         if ($message instanceof IterateEntityIndexerMessage) {
  114.             $next $this->iterateIndexer($message->getIndexer(), $message->getOffset(), true$message->getSkip());
  115.             if (!$next) {
  116.                 return;
  117.             }
  118.             $this->messageBus->dispatch(new IterateEntityIndexerMessage($message->getIndexer(), $next->getOffset(), $message->getSkip()));
  119.         }
  120.     }
  121.     public function sendIndexingMessage(array $indexer = [], array $skip = []): void
  122.     {
  123.         if (empty($indexer)) {
  124.             $indexer = [];
  125.             foreach ($this->indexer as $loop) {
  126.                 $indexer[] = $loop->getName();
  127.             }
  128.         }
  129.         if (empty($indexer)) {
  130.             return;
  131.         }
  132.         foreach ($indexer as $name) {
  133.             if (\in_array($name$skiptrue)) {
  134.                 continue;
  135.             }
  136.             $this->messageBus->dispatch(new IterateEntityIndexerMessage($namenull$skip));
  137.         }
  138.     }
  139.     public function has(string $name): bool
  140.     {
  141.         return $this->getIndexer($name) !== null;
  142.     }
  143.     public function getIndexer(string $name): ?EntityIndexer
  144.     {
  145.         foreach ($this->indexer as $indexer) {
  146.             if ($indexer->getName() === $name) {
  147.                 return $indexer;
  148.             }
  149.         }
  150.         return null;
  151.     }
  152.     private function useQueue(Context $context): bool
  153.     {
  154.         return $context->hasExtension(self::USE_INDEXING_QUEUE) || $context->hasState(self::USE_INDEXING_QUEUE);
  155.     }
  156.     private function disabled(Context $context): bool
  157.     {
  158.         return $context->hasExtension(self::DISABLE_INDEXING) || $context->hasState(self::DISABLE_INDEXING);
  159.     }
  160.     private function sendOrHandle(EntityIndexingMessage $messagebool $useQueue): void
  161.     {
  162.         if ($useQueue || $message->forceQueue()) {
  163.             $this->messageBus->dispatch($message);
  164.             return;
  165.         }
  166.         $this->handle($message);
  167.     }
  168.     private function iterateIndexer(string $name$offsetbool $useQueue, array $skip): ?EntityIndexingMessage
  169.     {
  170.         $indexer $this->getIndexer($name);
  171.         if (!$indexer instanceof EntityIndexer) {
  172.             throw new \RuntimeException(sprintf('Entity indexer with name %s not found'$name));
  173.         }
  174.         $message $indexer->iterate($offset);
  175.         if (!$message) {
  176.             return null;
  177.         }
  178.         $message->setIndexer($indexer->getName());
  179.         $message->setSkip($skip);
  180.         $this->sendOrHandle($message$useQueue);
  181.         return $message;
  182.     }
  183. }