<?php declare(strict_types=1);
namespace Shopware\Elasticsearch\Admin;
use Doctrine\DBAL\Connection;
use Elasticsearch\Client;
use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
use Shopware\Core\Framework\Event\ProgressFinishedEvent;
use Shopware\Core\Framework\Event\ProgressStartedEvent;
use Shopware\Core\Framework\Log\Package;
use Shopware\Core\Framework\Uuid\Uuid;
use Shopware\Elasticsearch\Admin\Indexer\AbstractAdminIndexer;
use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/**
* @internal
*
* @final
*/
#[Package('system-settings')]
class AdminSearchRegistry implements MessageHandlerInterface, EventSubscriberInterface
{
/**
* @var array<string, mixed>
*/
private array $indexer;
private Connection $connection;
private MessageBusInterface $queue;
private EventDispatcherInterface $dispatcher;
private Client $client;
private AdminElasticsearchHelper $adminEsHelper;
/**
* @var array<mixed>
*/
private array $config;
/**
* @var array<mixed>
*/
private array $mapping;
/**
* @param AbstractAdminIndexer[] $indexer
* @param array<mixed> $config
* @param array<mixed> $mapping
*/
public function __construct(
$indexer,
Connection $connection,
MessageBusInterface $queue,
EventDispatcherInterface $dispatcher,
Client $client,
AdminElasticsearchHelper $adminEsHelper,
array $config,
array $mapping
) {
$this->indexer = $indexer instanceof \Traversable ? iterator_to_array($indexer) : $indexer;
$this->connection = $connection;
$this->queue = $queue;
$this->dispatcher = $dispatcher;
$this->client = $client;
$this->adminEsHelper = $adminEsHelper;
$this->mapping = $mapping;
if (isset($config['settings']['index'])) {
if (\array_key_exists('number_of_shards', $config['settings']['index']) && $config['settings']['index']['number_of_shards'] === null) {
unset($config['settings']['index']['number_of_shards']);
}
if (\array_key_exists('number_of_replicas', $config['settings']['index']) && $config['settings']['index']['number_of_replicas'] === null) {
unset($config['settings']['index']['number_of_replicas']);
}
}
$this->config = $config;
}
public function __invoke(AdminSearchIndexingMessage $message): void
{
$indexer = $this->getIndexer($message->getEntity());
$documents = $indexer->fetch($message->getIds());
$this->push($indexer, $message->getIndices(), $documents, $message->getIds());
}
public static function getSubscribedEvents(): array
{
return [
EntityWrittenContainerEvent::class => [
['refresh', -1000],
],
];
}
/**
* @return iterable<class-string>
*/
public static function getHandledMessages(): iterable
{
return [
AdminSearchIndexingMessage::class,
];
}
public function iterate(AdminIndexingBehavior $indexingBehavior): void
{
if ($this->adminEsHelper->getEnabled() === false) {
return;
}
/** @var array<string> $entities */
$entities = array_keys($this->indexer);
if ($indexingBehavior->getOnlyEntities()) {
$entities = array_intersect($entities, $indexingBehavior->getOnlyEntities());
} elseif ($indexingBehavior->getSkipEntities()) {
$entities = array_diff($entities, $indexingBehavior->getSkipEntities());
}
$indices = $this->createIndices($entities);
foreach ($entities as $entityName) {
$indexer = $this->getIndexer($entityName);
$iterator = $indexer->getIterator();
$this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $iterator->fetchCount()));
while ($ids = $iterator->fetch()) {
// we provide no queue when the data is sent by the admin
if ($indexingBehavior->getNoQueue() === true) {
$this->__invoke(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
} else {
$this->queue->dispatch(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
}
$this->dispatcher->dispatch(new ProgressAdvancedEvent(\count($ids)));
}
$this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
}
$this->swapAlias($indices);
}
public function refresh(EntityWrittenContainerEvent $event): void
{
if ($this->adminEsHelper->getEnabled() === false || !$this->isIndexedEntityWritten($event)) {
return;
}
if ($this->adminEsHelper->getRefreshIndices()) {
$this->refreshIndices();
}
/** @var array<string, string> $indices */
$indices = $this->connection->fetchAllKeyValue('SELECT `alias`, `index` FROM admin_elasticsearch_index_task');
if (empty($indices)) {
return;
}
foreach ($this->indexer as $indexer) {
$ids = $event->getPrimaryKeys($indexer->getEntity());
if (empty($ids)) {
continue;
}
$documents = $indexer->fetch($ids);
$this->push($indexer, $indices, $documents, $ids);
}
}
/**
* @return AbstractAdminIndexer[]
*/
public function getIndexers(): iterable
{
return $this->indexer;
}
public function getIndexer(string $name): AbstractAdminIndexer
{
$indexer = $this->indexer[$name] ?? null;
if ($indexer) {
return $indexer;
}
throw new ElasticsearchIndexingException([\sprintf('Indexer for name %s not found', $name)]);
}
private function isIndexedEntityWritten(EntityWrittenContainerEvent $event): bool
{
foreach ($this->indexer as $indexer) {
$ids = $event->getPrimaryKeys($indexer->getEntity());
if (!empty($ids)) {
return true;
}
}
return false;
}
/**
* @param array<string, string> $indices
* @param array<string, array<string|int, string>> $data
* @param array<string> $ids
*/
private function push(AbstractAdminIndexer $indexer, array $indices, array $data, array $ids): void
{
$alias = $this->adminEsHelper->getIndex($indexer->getName());
if (!isset($indices[$alias])) {
return;
}
$toRemove = array_filter($ids, static function (string $id) use ($data): bool {
return !isset($data[$id]);
});
$documents = [];
foreach ($data as $id => $document) {
$documents[] = ['index' => ['_id' => $id]];
$documents[] = \array_replace(
['entityName' => $indexer->getEntity(), 'parameters' => [], 'textBoosted' => '', 'text' => ''],
$document
);
}
foreach ($toRemove as $id) {
$documents[] = ['delete' => ['_id' => $id]];
}
$arguments = [
'index' => $indices[$alias],
'body' => $documents,
];
$result = $this->client->bulk($arguments);
if (\is_array($result) && !empty($result['errors'])) {
$errors = $this->parseErrors($result);
throw new ElasticsearchIndexingException($errors);
}
}
/**
* @param array<string> $entities
*
* @throws \Doctrine\DBAL\Exception
*
* @return array<string, string>
*/
private function createIndices(array $entities): array
{
$indexTasks = [];
$indices = [];
foreach ($entities as $entityName) {
$indexer = $this->getIndexer($entityName);
$alias = $this->adminEsHelper->getIndex($indexer->getName());
$index = $alias . '_' . time();
if ($this->indexExists($index)) {
continue;
}
$indices[$alias] = $index;
$this->create($indexer, $index, $alias);
$iterator = $indexer->getIterator();
$indexTasks[] = [
'id' => Uuid::randomBytes(),
'`entity`' => $indexer->getEntity(),
'`index`' => $index,
'`alias`' => $alias,
'`doc_count`' => $iterator->fetchCount(),
];
}
$this->connection->executeStatement(
'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
['entities' => $entities],
['entities' => Connection::PARAM_STR_ARRAY]
);
foreach ($indexTasks as $task) {
$this->connection->insert('admin_elasticsearch_index_task', $task);
}
return $indices;
}
private function refreshIndices(): void
{
$entities = [];
$indexTasks = [];
foreach ($this->indexer as $indexer) {
$alias = $this->adminEsHelper->getIndex($indexer->getName());
if ($this->aliasExists($alias)) {
continue;
}
$index = $alias . '_' . time();
$this->create($indexer, $index, $alias);
$entities[] = $indexer->getEntity();
$iterator = $indexer->getIterator();
$indexTasks[] = [
'id' => Uuid::randomBytes(),
'`entity`' => $indexer->getEntity(),
'`index`' => $index,
'`alias`' => $alias,
'`doc_count`' => $iterator->fetchCount(),
];
}
$this->connection->executeStatement(
'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
['entities' => $entities],
['entities' => Connection::PARAM_STR_ARRAY]
);
foreach ($indexTasks as $task) {
$this->connection->insert('admin_elasticsearch_index_task', $task);
}
}
private function create(AbstractAdminIndexer $indexer, string $index, string $alias): void
{
$mapping = $indexer->mapping([
'properties' => [
'id' => ['type' => 'keyword'],
'textBoosted' => ['type' => 'text'],
'text' => ['type' => 'text'],
'entityName' => ['type' => 'keyword'],
'parameters' => ['type' => 'keyword'],
],
]);
$mapping = array_merge_recursive($mapping, $this->mapping);
$body = array_merge(
$this->config,
['mappings' => $mapping]
);
$this->client->indices()->create([
'index' => $index,
'body' => $body,
]);
$this->createAliasIfNotExisting($index, $alias);
}
private function indexExists(string $name): bool
{
return $this->client->indices()->exists(['index' => $name]);
}
private function aliasExists(string $alias): bool
{
return $this->client->indices()->existsAlias(['name' => $alias]);
}
/**
* @param array<string, array<array<string, mixed>>> $result
*
* @return array<array{reason: string}|string>
*/
private function parseErrors(array $result): array
{
$errors = [];
foreach ($result['items'] as $item) {
$item = $item['index'] ?? $item['delete'];
if (\in_array($item['status'], [200, 201], true)) {
continue;
}
$errors[] = [
'index' => $item['_index'],
'id' => $item['_id'],
'type' => $item['error']['type'] ?? $item['_type'],
'reason' => $item['error']['reason'] ?? $item['result'],
];
}
return $errors;
}
private function createAliasIfNotExisting(string $index, string $alias): void
{
$exist = $this->client->indices()->existsAlias(['name' => $alias]);
if ($exist) {
return;
}
$this->putAlias($index, $alias);
}
/**
* @param array<string, string> $indices
*/
private function swapAlias($indices): void
{
foreach ($indices as $alias => $index) {
$exist = $this->client->indices()->existsAlias(['name' => $alias]);
if (!$exist) {
$this->putAlias($index, $alias);
return;
}
$current = $this->client->indices()->getAlias(['name' => $alias]);
if (!isset($current[$index])) {
$this->putAlias($index, $alias);
}
unset($current[$index]);
$current = array_keys($current);
foreach ($current as $value) {
$this->client->indices()->delete(['index' => $value]);
}
}
}
private function putAlias(string $index, string $alias): void
{
$this->client->indices()->refresh([
'index' => $index,
]);
$this->client->indices()->putAlias(['index' => $index, 'name' => $alias]);
}
}