ellinaut/elasticsearch-7-connector

针对 Elasticsearch 7.x 的抽象 PHP 连接器

1.0.1 2022-01-28 15:50 UTC

This package is auto-updated.

Last update: 2024-08-28 21:08:20 UTC


README

Ellinaut 提供

这个库是用来做什么的?

此库提供了一种可重用的结构和实现,用于与 PHP 开发的 Elasticsearch 相关的常见任务。目标是让每个应用程序都具有相同的结构,并在需要时避免重复编写简单的任务,例如创建索引或存储文档。

该库使用官方库 elasticsearch/elasticsearch 并添加了一些结构和功能。

要求

此库要求您使用 PHP 版本 7.2 或更高版本以及版本 7.x 的 Elasticsearch 服务器。

安装

将此库安装到您的应用程序中最简单的方法是 composer

composer require ellinaut/elasticsearch-7-connector

如何在您的应用程序中使用此库

此库的核心,以及您应用程序中每个调用的入口点,是类 Ellinaut\ElasticsearchConnector\ElasticsearchConnector

因此,您需要一个此类实例,它需要 Ellinaut\Elasticsearch\Connection\ConnectionFactoryInterfaceEllinaut\Elasticsearch\NameProvider\NameProviderInterfaceEllinaut\Elasticsearch\Connection\ResponseHandlerInterface(可选)的实例。

以下是一个示例,该示例连接到本地主机的默认端口,且不更改 PHP 和 Elasticsearch 之间的索引或管道名称

    use Ellinaut\ElasticsearchConnector\Connection\DsnConnectionFactory;
    use Ellinaut\ElasticsearchConnector\NameProvider\RawNameProvider;
    use Ellinaut\ElasticsearchConnector\ElasticsearchConnector;

    $elasticsearch = new ElasticsearchConnector(
        new DsnConnectionFactory('http://127.0.0.1:9200'),
        new RawNameProvider(),
        new RawNameProvider()
    );

如何管理连接

使用连接工厂(Ellinaut\ElasticsearchConnector\Connection\ConnectionFactoryInterface 的实例)创建连接。创建的连接是 Elasticsearch\Client 的实例,用于执行所有通过 ElasticsearchConnector 执行的操作。

最简单的方法是使用此库提供的 DsnConenctionFactory(如上所述),但如果您的配置更复杂,您也可以自行实现 ConnectionFactoryInterface

如何管理索引

ElasticsearchConnector 提供了一些管理索引的方法。每个方法都会导致对 Ellinaut\ElasticsearchConnector\Index\IndexManagerInterface 实例的一个或多个调用。每个索引都需要一个此类接口的实例,该实例必须由您的应用程序提供并注册到连接器。

为了简化您的实现,您可以使用 Ellinaut\ElasticsearchConnector\Index\IndexManagerTrait 特性。此特性要求您实现方法 getIndexDefinition,该方法必须提供 Elasticsearch 索引配置作为数组。特性使用此方法并提供所有由 IndexManagerInterface 所需的方法。

以下是一个自定义 IndexManager 的示例

    namespace App\IndexManager;
    
    use Ellinaut\ElasticsearchConnector\Index\IndexManagerInterface;
    use Ellinaut\ElasticsearchConnector\Index\IndexManagerTrait;

    class CustomIndexManager implements IndexManagerInterface {
        use IndexManagerTrait;
        
        /**
         * @return array
         */
        protected function getIndexDefinition() : array{
            return [
                'mappings' => [
                    'properties' => [
                        'test' => [
                            'type' => 'keyword',
                        ],
                    ],
                ],
            ];
        }
    }

要使用您的自定义索引管理器,您必须在连接器实例上注册它

    /** @var \Ellinaut\ElasticsearchConnector\ElasticsearchConnector $elasticsearch */
    $elasticsearch->addIndexManager('custom_index', new App\IndexManager\CustomIndexManager());

然后您可以通过这些连接器方法调用使用此索引

    /** @var \Ellinaut\ElasticsearchConnector\ElasticsearchConnector $elasticsearch */
    
    // Creates the index. Will throw an exception if the index already exists.
    $elasticsearch->createIndex('custom_index');
    
    // Creates the index only if it does not exist.
    $elasticsearch->createIndexIfNotExist('custom_index');
    
    // Deletes the index if it exists, then create the index new.
    $elasticsearch->recreateIndex('custom_index');
    
    // Migrate all documents from the index to a (new) migration index,
    // then recreate the old index and moves all the documents back.
    $elasticsearch->updateIndex('custom_index');
    
    // Deletes the index if it exists.
    $elasticsearch->deleteIndex('custom_index');

索引命名

到目前为止,文档中只使用了内部索引名称。如果您的应用程序使用多个环境,则可能有必要为每个环境使用不同的索引名称,尤其是在同一 Elasticsearch 服务器上托管时。

这可以通过使用索引名称提供者来实现,该提供者是 Ellinaut\ElasticsearchConnector\NameProvider\NameProviderInterface 的实例。此提供者将为所有 Elasticsearch 请求“装饰”内部索引名称,并且可以使用它来从(外部)Elasticsearch 索引名称获取原始(内部)索引名称。

内置提供者包括

  • Ellinaut\ElasticsearchConnector\NameProvider\RawNameProvider:用于在 PHP 和 Elasticsearch 中使用相同的名称
  • Ellinaut\ElasticsearchConnector\NameProvider\PrefixedNameProvider:用于外部索引名称的定制前缀
  • Ellinaut\ElasticsearchConnector\NameProvider\SuffixedNameProvider:用于外部索引名称的定制后缀
  • Ellinaut\ElasticsearchConnector\NameProvider\ChainedNameProvider:用于组合两个或多个提供者

如果您需要一个定制的命名策略,您也可以实现 NameProviderInterface 并提供您的自定义名称提供者。

名称提供者在构造函数中传递给连接器。

索引迁移

默认情况下,updateIndex 方法通过以下步骤工作

  1. 创建迁移索引(索引名称将为 "INDEX_NAME__migrating")
  2. 从旧索引获取旧文档
  3. 将旧文档存储到迁移索引中
  4. 将所有文档从旧索引移动到迁移索引
  5. 删除旧索引
  6. 创建与旧索引同名的全新索引
  7. 将所有文档从迁移索引移动到新索引
  8. 删除迁移索引

此过程可能对只添加新字段或更改分析器或字段配置的简单迁移很有用。如果您的迁移需要数据更改,您可以向连接器提供一个 Ellinaut\ElasticsearchConnector\Document\DocumentMigratorInterface 实例。

以下是您的文档迁移器可能的样子

    namespace App\Document;

    use Ellinaut\ElasticsearchConnector\Document\DocumentMigratorInterface;
    
    class CustomDocumentMigrator implements DocumentMigratorInterface {
    
        /**
         * @param array $previousSource
         * @return array
         */
        public function migrate(array $previousSource) : array{
            if(!array_key_exists('test',$previousSource)){
                $previousSource['test'] = 'Test';
            }
            
            return $previousSource;
        }
    }

它可以这样添加到连接器中

    /** @var \Ellinaut\ElasticsearchConnector\ElasticsearchConnector $elasticsearch */
    $elasticsearch->addDocumentMigrator('custom_index', new App\Document\CustomDocumentMigrator());

如果您已向连接器添加文档迁移器,则 updateIndex 的新过程将是

  1. 创建迁移索引(索引名称将为 "INDEX_NAME__migrating")
  2. 从旧索引获取旧文档
  3. 使用文档迁移器迁移旧文档
  4. 将迁移的文档存储到迁移索引中
  5. 删除旧索引
  6. 创建与旧索引同名的全新索引
  7. 将所有文档从迁移索引移动到新索引
  8. 删除迁移索引

如何管理管道

ElasticsearchConnector 提供了一些方法来管理管道。每个方法都将导致对 Ellinaut\ElasticsearchConnector\Index\PipelineManagerInterface 实例的一次或多次调用。每个管道都需要此接口的一个实例,该实例必须由您的应用程序提供并注册到连接器。

为了简化您的实现,您可以使用特质 Ellinaut\ElasticsearchConnector\Index\PipelineManagerTrait。此特质要求您实现 getPipelineDefinition 方法,必须提供 Elasticsearch 管道配置作为数组。特质使用此方法并提供 PipelineManagerInterface 所需的所有方法。

以下是一个自定义 PipelineManager 的示例

    namespace App\PipelineManager;
    
    use Ellinaut\ElasticsearchConnector\Index\PipelineManagerInterface;
    use Ellinaut\ElasticsearchConnector\Index\PipelineManagerTrait;

    class CustomPipelineManager implements PipelineManagerInterface {
        use PipelineManagerTrait;
        
        /**
         * @return array
         */
        protected function getPipelineDefinition() : array{
            return [
                'description' => 'Your custom pipeline which converts content from field "test" to lowercase.',
                'processors' => [
                    [
                        'lowercase' => [
                            'field' => 'test',
                        ],
                    ],
                ],
            ];
        }
    }

要使用您的自定义管道管理器,您必须将其注册到连接器实例上

    /** @var \Ellinaut\ElasticsearchConnector\ElasticsearchConnector $elasticsearch */
    $elasticsearch->addPipelineManager('custom_pipeline', new App\PipelineManager\CustomPipelineManager());

然后,您可以通过这些连接器方法调用使用此管道

    /** @var \Ellinaut\ElasticsearchConnector\ElasticsearchConnector $elasticsearch */
    
    // Creates all registered pipelines.
    $elasticsearch->createPipelines();
    
    // Creates only the given pipeline.
    $elasticsearch->createPipelines(['custom_pipeline']);
    
    // Deletes all registered pipelines.
    $elasticsearch->deletePipelines();
    
    // Deletes only the given pipeline.
    $elasticsearch->deletePipelines(['custom_pipeline']);

管道命名

与索引一样,管道名称在 PHP 和 Elasticsearch 之间也可能不同。名称提供者与索引相同。

如何管理文档

ElasticsearchConnector 提供了一些方法来管理文档。您应该使用 indexDocument 方法创建或更新文档。您应该使用 deleteDocument 方法通过 ID 删除文档。如果您不想使用批量请求,您也可以使用 indexDocumentImmediatelydeleteDocumentImmediately。您可以通过 retrieveDocumentretrieveDocumentSource 方法检索单个文档。

看看它在您的代码中是什么样子

    /** @var \Ellinaut\ElasticsearchConnector\ElasticsearchConnector $elasticsearch */
    
    // Index the document with ID "document_1" into the index "custom_index" and use pipeline "custom_pipeline".
    // Document is index via queue system which uses bulk requests internally. Recommended method to index documents.
    $elasticsearch->indexDocument('custom_index','document_1', ['test'=>'Test'],'custom_pipeline');
    
    // Index the document with ID "document_1" into the index "custom_index" and use pipeline "custom_pipeline".
    // Document is indexed immediately to elasticsearch.
    $elasticsearch->indexDocumentImmediately('custom_index','document_1', ['test'=>'Test'],'custom_pipeline');
    
    // Retrieves the full document by ID from elasticsearch.
    $elasticsearch->retrieveDocument('custom_index','document_1');
    
    // Retrieves only the content of key "_source" from the elasticsearch document.
    $elasticsearch->retrieveDocumentSource('custom_index','document_1');
    
    // Deletes the document with ID "document_1" from index "custom_index".
    // Document is deleted via queue system which uses bulk requests internally. Recommended method to delete documents.
    $elasticsearch->deleteDocument('custom_index','document_1');
    
    // Deletes the document with ID "document_1" from index "custom_index".
    // Document is deleted immediately from elasticsearch.
    $elasticsearch->deleteDocumentImmediately('custom_index','document_1');

队列系统

如果您使用推荐的索引或删除文档的方法,您的“命令”将内部存储在队列中。如果达到配置的maxQueueSize(通过ElasticsearchConnector的构造函数),或者下一个文档应使用不同的管道,则队列将自动执行。如果从未达到限制,您必须在php进程完成之前调用executeQueueImmediately。在symfony等框架中,这应该通过事件监听器来完成。在您的自定义应用程序中,此方法应在您的php脚本或应用程序周期结束时执行。该方法将执行所有队列中的命令,通过单个批量请求到elasticsearch,这将通过减少http请求来提高您的应用程序性能。

    /** @var \Ellinaut\ElasticsearchConnector\ElasticsearchConnector $elasticsearch */
    
    // Executes all queued index or delete commands with a single bulk request.
    $elasticsearch->executeQueueImmediately();

如何搜索文档

连接器为您提供了一个搜索结果的方法和一个计数结果而不获取所有结果的方法。这些方法使用来自Elasticsearch\Client的基本方法。区别在于连接器方法会处理内部索引名称,并将它们转换为外部索引名称,这些名称可用于elasticsearch请求。

    /** @var \Ellinaut\ElasticsearchConnector\ElasticsearchConnector $elasticsearch */
    
    // Executes a search request over all indices.
    $searchAllResult = $elasticsearch->executeSearch(
        [
            'body' => [
                'query' => [
                    'match_all' => (object)[] // fix for elasticsearch, because an empty array doesn't work in the api
                ] 
            ]
        ]
    );
    
    // Executes a search request only for the index "custom_index".
    $searchCustomResult = $elasticsearch->executeSearch(
        [
            'body' => [
                'query' => [
                    'match_all' => (object)[] // fix for elasticsearch, because an empty array doesn't work in the api
                ] 
            ]
        ],
        ['custom_index']
    );
    
    // Executes a count request over all indices.
    $numberOfAllResults = $elasticsearch->executeCount(
        [
            'body' => [
                'query' => [
                    'match_all' => (object)[] // fix for elasticsearch, because an empty array doesn't work in the api
                ] 
            ]
        ]
    );
    
    // Executes a count request only for the index "custom_index".
    $numberOfCustomResults = $elasticsearch->executeCount(
        [
            'body' => [
                'query' => [
                    'match_all' => (object)[] // fix for elasticsearch, because an empty array doesn't work in the api
                ] 
            ]
        ],
        ['custom_index']
    );

如何处理更复杂的情况

此库的目标是标准化和简化PHP和elasticsearch的常见操作。更复杂的情况需要由您的应用程序解决,但连接器可以帮助您。

连接器提供了一些可能对您的自定义场景有用的辅助方法

    /** @var \Ellinaut\ElasticsearchConnector\ElasticsearchConnector $elasticsearch */
    
    // Returns the configured instance of "Elasticsearch\Client" which is also used by the connector.
    $connection = $elasticsearch->getConnection();
    
    // Get the external index name for elasticsearch requests
    $externalIndexName = $elasticsearch->getExternalIndexName('custom_index');
    
    // Get the internal index name for internal mappings with the result of an elasticsearch request
    $internalIndexName = $elasticsearch->getInternalIndexName('external_custom_index');
    
    // Get the external pipeline name for elasticsearch requests
    $externalPipelineName = $elasticsearch->getExternalPipelineName('custom_pipeline');
    
    // Get the internal pipeline name for internal mappings with the result of an elasticsearch request
    $internalPipelineName = $elasticsearch->getInternalPipelineName('external_custom_pipeline');

错误处理/响应处理

有时您可能需要直接访问elasticsearch的响应。这将在自定义错误处理或调试时非常有用。您可以通过构造函数向连接器提供一个Ellinaut\ElasticsearchConnector\Connection\ResponseHandlerInterface实例。

自定义响应处理器可能如下所示

    namespace App\PipelineManager;
    
    use Ellinaut\ElasticsearchConnector\Connection\ResponseHandlerInterface;

    class CustomResponseHandler implements ResponseHandlerInterface {
    
        /**
         * @param string $method
         * @param array $response
         */
        public function handleResponse(string $method,array $response) : void{
            if($method === 'createIndex'){
                var_dump($response);
            }
        }
    }

响应处理器将在以下方法中调用

  • IndexManagerInterface::createIndex
  • IndexManagerInterface::updateIndex
  • IndexManagerTrait::moveDocuments
  • IndexManagerInterface::deleteIndex
  • PipelineManagerInterface::createPipeline
  • PipelineManagerInterface::deletePipeline
  • ElasticsearchConnector::indexDocumentImmediately
  • ElasticsearchConnector::deleteDocumentImmediately
  • ElasticsearchConnector::executeQueueImmediately

Ellinaut由NXI GmbH & Co. KGBVH Bootsvermietung Hamburg GmbH提供支持。