eonx-com/search

搜索功能由Elasticsearch提供

v3.0.3 2020-06-04 06:23 UTC

README

此搜索包的目标是提供常见功能,使应用能够创建和维护Elasticsearch搜索索引。

应用需要提供SearchHandlerInterface的实现(对于包将确保创建的简单搜索索引)和TransformableSearchHandlerInterface的实现(对于将对外部系统中实体变化做出反应并将这些实体转换成索引文档的索引)。

提供了这两个接口的示例实现。

概述

此包提供了多个部分,以实现简单的搜索。

  • 提供了索引管理命令,允许在不破坏现有索引的情况下进行重新索引。
  • 仅在索引映射发生变化时才会重新索引的命令选项(尚未实现 PYMT-1690)
  • Lumen服务提供者会自动发现任何带有search_handler标签的服务,并将使用这些发现的处理程序在Elasticsearch中创建和管理索引。
  • Lumen桥提供了一种监听器,它将通过EasyEntityChange的使用来响应任何Doctrine实体变化。
  • 搜索处理程序接口提供了多种实现选项,具体取决于应用需求。

操作理论

此包的主要和默认实现设置了一个监听器,该监听器将对Doctrine中的任何实体变化做出响应,并基于对特定变化感兴趣的处理程序调度重新索引这些实体的作业。

每个应用搜索处理程序将定义一个ChangeSubscription DTO数组,该数组描述了应触发文档重新索引的实体和相关属性。

该包将处理将搜索更新批量处理到多个作业中,并将描述已更改对象的ObjectForChange DTO传递给应用搜索处理程序。该对象被更新或删除。然后由搜索处理程序返回一个描述Elasticsearch文档应如何处理的DocumentAction DTO。

生命周期 - 索引管理

当应用首次创建或部署时,在应用向Elasticsearch写入之前必须先创建索引。Elasticsearch会急切地创建索引,这是我们不希望的行为,因此,在应用接受请求之前必须运行迁移/搜索设置过程。

以下是一个假设的索引transactions的示例过程

# This command will create initial indices that are suffixed with the current date, and add an alias
# for each one that is suffixed with _new. No aliases exist at the root at this time.
#
# The system creates a `transactions_20200102121314` and a `transactions_new` alias that points to
# the date suffixed index.
$ ./artisan search:index:create

# This command fills the _new aliases with all document data for any search handlers that implement
# the TransformableSearchHandlerInterface. This command has options for synchronously filling or
# creating jobs to fill with workers.
#
# The system fills all data from the `getFillIterable` method on the TransactionSearchHandler. The
# index is still not live at this point.
$ ./artisan search:index:fill

# This command will atomically swap any root (live) aliases for any indices suffixed with _new that
# have had data populated. After this command is run, the application has been switched to the new
# indexes.
#
# The system sees that `transactions_new` (which points to `transactions_20200102121314`) has data 
# in it, and atomically swaps `transactions` (which currently points to 
# `transactions_20191212121212`) to now point to `transactions_20200102121314`. All index changes 
# occur at the same time and if any fail to swap, they all fail.
$ ./artisan search:index:live

# This command cleans up any old aliases/indices that are no longer required.
#
# The `transactions_2019121212` index is removed.
$ ./artisan search:index:clean

生命周期 - 对Doctrine变化的响应

此包监听来自EasyEntityChange包的EntityChange事件。EntityUpdateWorker将这些事件转换为ObjectForChange DTO,然后对这些DTO进行处理,以找到与搜索处理程序订阅之间的任何交集。

一旦找到任何交集,工作将批量处理并分发为作业,由工作者按需处理。

示例搜索处理程序

以下示例较为冗长,并包含通常放置在抽象搜索处理程序中的代码,以展示对TransformableSearchHandlerInterface实现的预期。

<?php
declare(strict_types=1);

namespace App\Services\Search;

use EonX\EasyEntityChange\DataTransferObjects\ChangedEntity;
use LoyaltyCorp\Search\Bridge\Doctrine\DoctrineSearchHandler;
use LoyaltyCorp\Search\DataTransferObjects\DocumentAction;
use LoyaltyCorp\Search\DataTransferObjects\DocumentDelete;
use LoyaltyCorp\Search\DataTransferObjects\DocumentUpdate;
use LoyaltyCorp\Search\DataTransferObjects\Handlers\ChangeSubscription;
use LoyaltyCorp\Search\DataTransferObjects\Handlers\ObjectForChange;
use LoyaltyCorp\Search\DataTransferObjects\Handlers\ObjectForDelete;
use LoyaltyCorp\Search\DataTransferObjects\Handlers\ObjectForUpdate;

/**
 * This handler represents a single index in Elasticsearch, and reacts to a single primary
 * entity for building those indicies.
 * 
 * The extends annotation below tells phpstan that we're creating a handler for dealing
 * with Transactions only, and enables additional checks to ensure code correctness. For
 * more details, check out PhpStan Generics.
 * 
 * @extends DoctrineSearchHandler<Transaction>
 */
class TransactionHandler extends DoctrineSearchHandler 
{
    /**
     * This method is used to define the Elasticsearch mappings. By convention, our indices should
     * be defined with dynamic->strict wherever they can be, to avoid issues with mistakes in the
     * transform method or the mappings being out of sync.
     *
     * {@inheritdoc}
     */
    public static function getMappings(): array
    {
        return [
            'doc' => [
                'dynamic' => 'strict',
                'properties' => [
                    'createdAt' => [
                        'type' => 'date',
                    ],
                    // Additional mappings as required
                ],
            ],
        ];
    }

    /**
     * Depending on the requirements of the application and if the Elasticsearch system is clustered
     * these settings may need to be modified, but for a default implementation with a single ES node
     * the defaults of 1 for both settings are preferred.
     *
     * {@inheritdoc}
     */
    public static function getSettings(): array
    {
        return [
            'number_of_replicas' => 1,
            'number_of_shards' => 1,
        ];
    }

    /**
     * The handler key is used internally by the search package to keep track of which handler needs
     * what data, the key needs to be unique across the application.
     *
     * {@inheritdoc}
     */
    public function getHandlerKey(): string
    {
        return 'transactions';
    }

    /**
     * This is the index name that will be used by the search package. Indexes will be created with
     * a date suffix and aliased to the "real index" name during the search setup or reindexing
     * process.
     *
     * {@inheritdoc}
     */
    public function getIndexName(): string
    {
        return 'transactions';
    }

    /**
     * This method returns the subscriptions for any objects that this search handler is interested
     * in - and optionally a transformation callback that will turn the ChangedEntity that it
     * receives into an iterable of ObjectForUpdate DTOs for batch processing.
     *
     * {@inheritdoc}
     */
    public function getSubscriptions(): iterable
    {
        yield from parent::getSubscriptions();
        
        // React to transaction metadata changes.
        yield new ChangeSubscription(
            Metadata::class,
            ['key', 'value'],
            fn (ChangedEntity $change) => $this->loadTransactionsFromMetadata($change)
        );

        // React to changes to the user's email address
        yield new ChangeSubscription(
            User::class,
            ['email'],
            fn (ChangedEntity $change) => $this->loadTransactionsFromUser($change)      
         );
    }

    /**
     * Loads related transactions from a metadata change.
     *
     * @phpstan-return iterable<ObjectForUpdate<Transaction>>
     *
     * @param \EonX\EasyEntityChange\DataTransferObjects\ChangedEntity $change
     *
     * @return \LoyaltyCorp\Search\DataTransferObjects\Handlers\ObjectForChange[]
     */
    public function loadTransactionsFromMetadata(ChangedEntity $change): iterable
    {
        if ($change->getClass() !== Metadata::class ||
            \is_string($change->getIds()['metadataId'] ?? null) === false) {
            return [];
        }

        $repository = $this->getEntityManager()->getRepository(Transaction::class);

        return $repository->getSearchTransactionsForMetadataUpdate($change->getIds()['metadataId']);
    }

    /**
     * Loads related transactions from a user.
     *
     * @phpstan-return iterable<ObjectForUpdate<Transaction>>
     *
     * @param \EonX\EasyEntityChange\DataTransferObjects\ChangedEntity $change
     *
     * @return \LoyaltyCorp\Search\DataTransferObjects\Handlers\ObjectForChange[]
     */
    public function loadTransactionsFromUser(ChangedEntity $change): iterable
    {
        if ($change->getClass() !== User::class ||
            \is_string($change->getIds()['userId'] ?? null) === false) {
            return [];
        }

        $repository = $this->getEntityManager()->getRepository(Transaction::class);

        return $repository->getSearchTransactionsForUserUpdate($change->getIds()['userId']);
    }

    /**
     * This method takes an ObjectForChange and returns a DocumentAction.
     *
     * Its primary purpose is to either decide that a document should be deleted or updated.
     *
     * {@inheritdoc}
     */
    public function transform(ObjectForChange $change): ?DocumentAction
    {
        // We didnt get a $change that makes sense for this transform method.
        if ($change->getClass() !== Transaction::class ||
            ($change->getObject() instanceof Transaction) === false) {
            return null;
        }
        
        // If we got an ObjectForDelete and we have the searchId metadata,
        // issue a delete action to search.
        if ($change instanceof ObjectForDelete === true &&
            \is_string($change->getMetadata()['searchId'] ?? null) === true) {
            return new DocumentDelete($change->getMetadata()['searchId']);
        }

        // If we didnt get an Update or Delete we dont know what the system// 
        // wants, lets not do anything.
        if ($change instanceof ObjectForUpdate === false) {
            return null;
        }

        /**
         * PHPStorm isnt capable of recognising that this is a Transaction even though
         * we check it above. This is just for IDE compatibility.
         *
         * @var \App\Database\Entities\Transaction $transaction
         */
        $transaction = $change->getObject();

        // An object without an external id cannot be transformed.
        if (\is_string($transaction->getExternalId()) === false) {
            return null;
        }

        return new DocumentUpdate(
            $transaction->getExternalId(),
            [
                'id' => $transaction->getId(),
                'created_at' => $transaction->getCreatedAt(),
                // ...
            ]
        );
    }
}

示例实体存储库

除了搜索处理程序外,还需要在实体存储库中实现一些方法。此包提供了一个SearchRepository trait来处理繁重的工作,但您仍然需要实现接口和几个代理到trait的方法。

<?php
declare(strict_types=1);

namespace App\Database\Repositories;

use LoyaltyCorp\Search\Bridge\Doctrine\Interfaces\FillableRepositoryInterface;
use LoyaltyCorp\Search\Bridge\Doctrine\SearchRepositoryTrait;
use LoyaltyCorp\Search\DataTransferObjects\Handlers\ObjectForUpdate;

/**
 * @implements FillableRepositoryInterface<Transaction>
 */
class TransactionRepository extends Repository implements FillableRepositoryInterface
{
    use SearchRepositoryTrait;

    /**
     * {@inheritdoc}
     *
     * @throws \Doctrine\ORM\ORMException
     */
    public function getFillIterable(): iterable
    {
        return $this->doGetFillIterable(
            $this->createQueryBuilder('e'),
            $this->entityManager->getClassMetadata(Transaction::class),
            Transaction::class
        );
    }

    /**
     * Returns an iterable of transactions that relate to a user.
     *
     * @phpstan-return array<ObjectForUpdate<Transaction>>
     *
     * @param string $metadataId
     *
     * @return \LoyaltyCorp\Search\DataTransferObjects\Handlers\ObjectForUpdate[]
     *
     * @throws \Doctrine\ORM\ORMException
     */
    public function getSearchTransactionsForMetadataUpdate(string $metadataId): iterable
    {
        $builder = $this->createQueryBuilder('t');
        $builder->select('t.transactionId');

        $builder->where(':metadata MEMBER OF t.metadata');
        $builder->setParameter('metadata', $metadataId);

        $index = 0;
        foreach ($builder->getQuery()->iterate([], AbstractQuery::HYDRATE_SCALAR) as $result) {
            yield new ObjectForUpdate(
                Transaction::class,
                ['transactionId' => $result[$index++]['transactionId']]
            );
        }
    }

    /**
     * Returns an iterable of transactions that relate to a user.
     *
     * @phpstan-return array<ObjectForUpdate<Transaction>>
     *
     * @param string $userId
     *
     * @return \LoyaltyCorp\Search\DataTransferObjects\Handlers\ObjectForUpdate[]
     *
     * @throws \Doctrine\ORM\ORMException
     */
    public function getSearchTransactionsForUserUpdate(string $userId): iterable
    {
        $builder = $this->createQueryBuilder('t');
        $builder->select('t.transactionId');

        $builder->where('IDENTITY(t.user) = :user');
        $builder->setParameter('user', $userId);

        $index = 0;
        foreach ($builder->getQuery()->iterate([], AbstractQuery::HYDRATE_SCALAR) as $result) {
            yield new ObjectForUpdate(
                Transaction::class,
                ['transactionId' => $result[$index++]['transactionId']]
            );
        }
    }

    /**
     * {@inheritdoc}
     *
     * @throws \Doctrine\ORM\Mapping\MappingException
     */
    public function prefillSearch(iterable $changes): void
    {
        $this->doPrefillSearch(
            $this->createQueryBuilder('e'),
            $this->entityManager->getClassMetadata(Transaction::class),
            Transaction::class,
            $changes
        );
    }
}