zepgram / module-multi-threading
此模块是开发人员在短时间内处理大量数据集的强大工具。
Requires
- ext-pcntl: *
- ext-posix: *
- magento/framework: ^101.0.0|^102.0.0|^103.0.0
- magento/module-store: ^101
README
此模块是开发人员在短时间内处理大量数据集的强大工具。它允许您使用多个子进程并行处理大量数据集,提高性能并减少处理时间。
安装
composer require zepgram/module-multi-threading bin/magento module:enable Zepgram_MultiThreading bin/magento setup:upgrade
使用方法
这些类允许您使用多线程处理搜索条件、集合或数组。
ForkedSearchResultProcessor
use Zepgram\MultiThreading\Model\ForkedSearchResultProcessor; use Magento\Catalog\Api\ProductRepositoryInterface; use Magento\Framework\Api\SearchCriteriaBuilder; class MyAwesomeClass { /** @var ForkedSearchResultProcessor */ private $forkedSearchResultProcessor; /** @var ProductRepositoryInterface */ private $productRepository; public function __construct( ForkedSearchResultProcessor $forkedSearchResultProcessor, ProductRepositoryInterface $productRepository, SearchCriteriaBuilder $searchCriteriaBuilder ) { $this->forkedSearchResultProcessor = $forkedSearchResultProcessor; $this->productRepository = $productRepository; $this->searchCriteriaBuilder = $searchCriteriaBuilder; } $searchCriteria = $this->searchCriteriaBuilder->create(); $productRepository = $this->productRepository; $callback = function ($item) { $item->getData(); // do your business logic here }; $this->forkedSearchResultProcessor->process( $searchCriteria, $productRepository, $callback, $pageSize = 1000, $maxChildrenProcess = 10, $isIdempotent = true ); }
ForkedCollectionProcessor
use Zepgram\MultiThreading\Model\ForkedCollectionProcessor; use Magento\Catalog\Model\ResourceModel\Product\CollectionFactory; class MyAwesomeClass { /** @var ForkedCollectionProcessor */ private $forkedCollectionProcessor; public function __construct( ForkedCollectionProcessor $forkedCollectionProcessor, CollectionFactory $collectionFactory ) { $this->forkedCollectionProcessor = $forkedCollectionProcessor; $this->collectionFactory = $collectionFactory; } $collection = $this->collectionFactory->create(); $callback = function ($item) { $item->getData(); // do your business logic here }; $this->forkedCollectionProcessor->process( $searchCriteria, $productRepository, $callback, $pageSize = 1000, $maxChildrenProcess = 10, $isIdempotent = true ); }
ForkedArrayProcessor
此类允许您使用多线程处理数据数组。
use Zepgram\MultiThreading\Model\ForkedArrayProcessor; class MyAwesomeClass { /** @var ForkedArrayProcessor */ private $forkedArrayProcessor; public function __construct(ForkedArrayProcessor $forkedArrayProcessor) { $this->forkedArrayProcessor = $forkedArrayProcessor; } $array = [1,2,3,4,5,...]; $callback = function ($item) { echo $item; // do your business logic here }; $this->forkedArrayProcessor->process( $array, $callback, $pageSize = 2, $maxChildrenProcess = 2 ); }
ParallelStoreProcessor 或 ParallelWebsiteProcessor
use Zepgram\MultiThreading\Model\Dimension\ParallelStoreProcessor; use Magento\Catalog\Model\ResourceModel\Product\Collection; use Magento\Catalog\Model\ResourceModel\Product\CollectionFactory; class MyAwesomeClass { /** @var ParallelStoreProcessor */ private $parallelStoreProcessor; /** @var CollectionFactory */ private $collectionFactory; public function __construct( ParallelStoreProcessor $parallelStoreProcessor, CollectionFactory $collectionFactory ) { $this->parallelStoreProcessor = $parallelStoreProcessor; $this->collectionFactory = $collectionFactory; } $array = [1,2,3,4,5,...]; $callback = function (StoreInterface $store) { // retrieve data from database foreach stores (do not load the collection !) $collection = $this->collectionFactory->create(); $collection->addFieldToFilter('type_id', 'simple') ->addFieldToSelect(['sku', 'description', 'created_at']) ->setStoreId($store->getId()) ->addStoreFilter($store->getId()) ->distinct(true); // handle pagination system to avoid memory leak $currentPage = 1; $pageSize = 1000; $collection->setPageSize($pageSize); $totalPages = $collection->getLastPageNumber(); while ($currentPage <= $totalPages) { $collection->clear(); $collection->setCurPage($currentPage); foreach ($collection->getItems() as $product) { // do your business logic here } $currentPage++; } }; // your collection will be processed foreach store by a dedicated child process $this->parallelStoreProcessor->process( $callback, $maxChildrenProcess = null, $onlyActiveStores = true, $withDefaultStore = false ); }
bin/magento thread:processor 命令
此命令允许使用 Process Symfony 组件在专用线程中无限期运行命令。
bin/magento thread:processor <command_name> [--timeout=<timeout>] [--iterations=<iterations>] [--environment=<environment>] [--progress]
选项
timeout
: 定义进程超时时间(秒,默认:300)iterations
: 定义迭代次数(默认:0)environment
: 设置环境变量,以逗号分隔progress
: 在执行命令时显示进度条
工作原理
thread:processor
命令创建一个专用子进程来执行现有的命令行。子进程运行用户指定的命令,而父进程监控子进程并可以相应地采取行动。您可以使用专用子进程的 foreach 执行定义迭代次数,并多次执行相同的命令。
ForkedSearchResultProcessor
、ForkedCollectionProcessor
和 ForkedArrayProcessor
类使用类似的方法来处理搜索条件或集合。处理过程被分成几个页面,并为每个页面创建一个子进程来运行用户指定的回调函数,处理该页面的每个项目。
ParallelStoreProcessor
和 ParallelWebsiteProcessor
类旨在使并行处理商店列表或网站列表更加容易。要使用这两个类之一,您需要提供回调函数,该函数将针对列表中的每个商店或网站调用。回调函数应接受一个参数,该参数将是一个单独的商店或网站对象。
每个商店或网站都将通过单独的进程传递给回调函数,从而实现更快的处理时间。子进程的数量不能超过商店或网站的数量:例如,如果您有10家商店,则可以并行创建的最大子进程数是10。
以下是参数的分解
-
$collection
/$searchCriteria
/$array
:第一个参数是数据源,对于 ForkedSearchResultProcessor 是Magento\Framework\Api\SearchCriteriaInterface
,对于 ForkedCollectionProcessor 是Magento\Framework\Data\Collection
,对于 ForkedArrayProcessor 是array
。 -
$callback
:此参数是一个可调用的函数,它将在集合的每个项目上执行。这是一个回调函数,它接收来自集合的当前项目以进行处理。此函数应包含应在每个项目上执行的业务逻辑。 -
$pageSize
:此参数用于设置每页的项目数量。它用于对集合进行分页,以便可以分块处理。 -
$maxChildrenProcess
:此参数用于设置可以同时运行的子进程的最大数量。这用于控制多线程进程将使用的线程数。如果设置为1,根据定义,将不会有并行化,父进程将在创建另一个之前等待子进程完成。 -
$isIdempotent
:此参数默认设置为true
,当$maxChildrenProcess
大于1时,可用于ForkedSearchResultProcessor
或ForkedCollectionProcessor
。在使用ForkedSearchResult
和ForkedCollectionProcessor
从数据库中检索数据时,您可能会更改查询的值:通过修改查询的列上的项目,您将改变初始集合查询的性质,并且最后,查询中的OFFSET限制将无效,因为本地分页系统期望只有单个进程处理分页。为了避免这种情况,请将$isIdempotent
设置为false
。
例如:在您的集合查询中,您请求所有禁用
的产品,在您的回调方法中,您将它们启用
并保存在数据库中,然后在这种情况下,您正在修改请求的列,您的查询不是幂等的。
内存限制
此模块允许绕过内存限制的限制,因为内存限制在每次创建子进程时都会重置。这意味着即使内存限制设置为低值,此模块仍然可以处理大量数据而不会耗尽内存。然而,重要的是要记住,这也意味着整体资源使用率将更高,因此重要的是要监控系统并相应地调整参数。
限制
此模块使用pcntl_fork()
函数,该函数在Windows上不可用。
结论
此模块提供了一种有用的工具,用于以多线程方式运行命令或处理集合和搜索条件,从而成为提高性能和减少执行时间的绝佳解决方案。该模块易于安装和使用,并提供控制子进程数量、超时和环境变量的选项。
免责声明
Magento 2的多线程模块按原样提供,不提供任何保证或担保。虽然已测试此模块并认为其功能正常,但请注意,PHP中的多线程使用可能很复杂,并可能产生意外的后果。因此,使用此模块的用户有责任在将模块部署到生产环境之前在开发环境中彻底测试它。我拒绝对此模块使用可能导致的任何问题或损害承担责任。能力越大,责任越大,请明智使用。