简单、基于beanstalkd的ESB框架。


README

简单、基于beanstalkd的ESB框架。

Build Status

简介

Webgriffe ESB 是一个 PHP 框架,旨在加速企业服务总线 (ESB) 的开发。

它使用 Beanstalkd 作为队列引擎,以及 Elasticsearch 作为作业持久化层。它建立在流行的开源 PHP 库之上,例如

架构与核心概念

集成不同的系统是一个数据流的问题。使用 Webgriffe ESB,每个数据流都单向流动,从一个系统通过 Beanstalkd 管道 流向另一个系统。每个管道都必须有一个 生产者,它创建 作业,以及一个 工作者 来处理它们。因此,数据通过管道从生产者流向工作者。

使用 Webgriffe ESB,您只需实现生产者和工作者即可集成不同的系统。框架会处理其余的部分。

Webgriffe ESB 被设计为使用单个二进制文件,该文件用作整个应用程序的主要入口点;所有生产者和工作者都由该单个 PHP 二进制文件启动和执行。这是由于 Amp 并发框架的功劳。

要求

  • PHP 7.2、7.3 或 7.4
  • Beanstalk
  • Elasticsearch 7.*

安装

使用 Composer 安装此包

composer require webgriffe/esb

配置

将示例配置文件复制到您的 ESB 根目录中

cp vendor/webgriffe/esb/esb.yml.sample ./esb.yml

《esb.yml》文件是您的 ESB 应用程序的主要配置,其中您必须定义具有其工作者和生成者服务的流程。

services:
  _defaults:
    autowire: true                      # This is optional (see https://symfony.com.cn/doc/current/service_container/autowiring.html)

  My\Esb\Producer:                      # A producer service definition
    arguments: []

  My\Esb\Worker:                        # A worker service definition
    arguments: []


flows:
  sample_flow:                          # The flow "code" and will be the Beanstalkd tube name
    description: Sample Flow            # The flow description
    producer:
      service: My\Esb\Producer          # A producer service ID defined above
      batch_size: 1200                  # Jobs are produced in batches of 1200 jobs. Optional: default is 1000
    worker:
      service: My\Esb\Worker            # A worker service ID defined above
      instances: 1                      # The number of worker instances to spawn for this flow
      error_retry_delay: 0              # The number of seconds to wait before an errored job can be retried. The default is 0 (errored jobs can be retried immediately). Useful when "retrying later" might solve the problem.
      release_delay: 0                  # (deprecated) older name of the error_retry_delay parameter
      max_retry: 5                      # The number of maximum work retries for a job in this tube/flow before being buried
    dependencies:                       # This whole section can be omitted if the current flow has no dependencies
      flows: ['other_flow_1', 'other_flow_2']  # Optional: dependencies of this flow toward other flow(s)
      delay_after_idle_time: 1000       # Optional: delay that a worker with dependencies waits before working the first job received after the tube was empty
      initial_polling_interval: 1000    # Optional: initial polling delay that a worker waits when it has to wait for a dependency that is not idle
      maximum_polling_interval: 60000   # Optional: maximum polling delay that a worker waits when it has to wait for a dependency that is not idle
      polling_interval_multiplier: 2    # Optional: polling delay increase factor whenever a worker is waiting for a dependency that is not idle

  other_flow_1:
    # ...
    
  other_flow_2:
    # ...

在《services》部分,您必须使用 Symfony 依赖注入 组件的语法定义您的工作者和生成者服务。在《services》部分,您还可以找到两个服务 Webgriffe\Esb\Producer\CleanOldJobs 和 Webgriffe\Esb\Worker\CleanOldJobs,如果您想启用定期删除旧作业的流程,则应保留它们。

在《flows》部分,您必须定义您的 ESB 流程。每个流程都必须引用在《services》部分中定义的生成者和工作者服务。在《flows》部分,您还可以找到对《clean_old_jobs_flow》的定义:如果您想定期删除旧作业,则应保留它。这也是定义流程之间依赖关系的部分。有关详细信息,请参阅“依赖关系”部分。

您还必须在《parameters》部分下定义一些参数。有关所需参数的更多信息,请参阅《esb.yml.sample》文件。通常,最好将参数隔离到《parameters.yml》文件中,该文件可以按以下方式包含在《esb.yml》中:

# esb.yml
imports:
  - { resource: parameters.yml}

services:
  # ...

flows:
  # ...
# parameters.yml
parameters:
  beanstalkd: tcp://127.0.0.1:11300
  # Other parameters here ...

有关参数的完整列表以及有关 ESB 配置的更多信息,请参阅示例配置文件

依赖关系

可以在流程间指定依赖,这确保了只要配置的流程中有任何一个正在工作,一个流程就无法处理任何作业。这是通过使用dependencies配置部分来实现的:如果你想让流程A依赖于流程B,你需要在流程A的配置中指定dependencies部分来列出流程B。

# esb.yml
# ...

flows:
  flow_B:
    #...

  flow_A:
    #...
    dependencies: 
      flows: ['flow_B']

当指定了这样一个依赖关系,使得流程A依赖于流程B时,每当流程B正在处理某些作业或/和有排队作业时,流程A仍然会生成和排队新的作业,但不会处理它们。当流程B完成处理其最后作业并且其Beanstalk管子为空时,流程A开始处理其作业。如果在流程A工作时为流程B创建了新的作业,流程A将完成已经正在处理的作业(如果有多个工作者,则是作业集),然后停止直到所有依赖项空闲(空管子且没有正在处理的作业)。依赖项也可以是多个,这意味着流程A可以依赖于流程B和流程C(如果需要,还可以更多)。在这种情况下,流程A将等待直到所有依赖项都空闲。要声明多个依赖项,只需在dependencies.flows字段中列出所有依赖项。不尊重间接依赖。这意味着如果流程A依赖于流程B,而流程B反过来又依赖于流程C,流程C的作业将阻塞流程B,但不会停止流程A。流程A只会检查流程B。如果你想让流程A也检查流程C,只需明确指出流程A依赖于B和C之间的依赖关系。

当一个流程依赖于另一个流程时,例如流程A依赖于流程B,每当流程A的工作者从其队列中提取作业时,它将检查其依赖项以确保它们都是空闲的。如果找到一个不空闲的依赖项,流程A将开始轮询该依赖项以查看何时完成。如果需要,可以通过几个配置参数来控制轮询动作的时机(这些都是可选的)

  • initial_polling_interval(默认1000ms)是流程A第一次重新检查流程B状态前等待的毫秒数。
  • polling_interval_multiplier(默认:2),在定义了initial_polling_interval的第一个延迟之后,每个后续延迟是通过将前一个延迟乘以这个参数获得的。默认值2表示每次延迟加倍,所以如果第一个延迟是1000ms,第二个将是2000ms,然后是4000ms等。将此值设置为1将迫使系统继续使用初始延迟值而不改变。
  • maximum_polling_interval(默认:60000ms),由polling_interval_multiplier施加的指数增长受此值限制,这是最大可能的轮询间隔。
  • delay_after_idle_time(默认:1000ms),有时可能会发生流程A和流程B的生产者同时开始生成新作业的情况。如果流程A空闲,并且其生产者设法在流程B的生产者有时间生成其作业之前生成第一个作业,流程A的工作者可能在流程B的生产者生成其作业之前开始处理新的作业,这样就会违反依赖关系。为了解决这个问题,当一个带有某些依赖关系的流程已经等待一段时间以等待新作业的到来(以我们的示例中的流程A为例),在接收到新作业后,它将在指定的时间delay_after_idle_time之后恢复操作。这给了流程B的生产者足够的时间在其管子中发布其作业,这将确保当超时到期时,流程A将等待其依赖项。除非处理的是一个非常慢的生产者,否则通常不需要更改此参数,在这种情况下,可能需要增加它。

注意:对于每个依赖项,指数级轮询时间的增加都会重置:如果一个流程依赖于多个其他流程,每次一个依赖项进入空闲状态时,都会在检查下一个依赖项之前重置时间参数。

生产者

任何实现了 ProducerInterface 类的服务都可以作为生产者。然而,仅实现 ProducerInterface 是不够的:每个生产者还必须实现支持的其中一个 生产者类型 接口。这是因为框架必须知道何时调用每个生产者。目前支持的生产者类型包括

  • RepeatProducerInterface:这些生产者会每隔固定时间间隔被重复调用。
  • CrontabProducerInterface:这些生产者在它们的 crontab 表达式 匹配时被调用。
  • HttpRequestProducerInterface:这些生产者在 ESB 的 HTTP 服务器接收到相应的 HTTP 请求时被调用。

有关更多信息,请参阅源代码中的这些接口。ProducerInterfaceproduce 方法必须返回一个 Amp 的 迭代器,这允许您通过单个 produce 调用生成一个作业集合。此外,迭代器允许执行长时间运行的生产操作,这些操作异步执行。

此外,请记住,您绝对不应该在您的生产者中使用 I/O 阻塞函数调用。当您需要执行 I/O 操作时,请使用 AmpReactPHP 库。

有关示例,请参阅 tests/ 目录中的模拟生产者。

工作者

任何实现了 WorkerInterface 类的服务都可以作为工作者。每个工作者在它的流程的管道中有可用作业时立即被调用。

工作者的 work 方法必须返回一个 Amp 的 Promise,该 Promise 必须在作业成功处理时解析。否则,work 方法必须抛出异常。

当一个工作者成功处理一个作业时,ESB 框架会将其从管道中删除。相反,当一个工作者无法处理一个作业时,ESB 框架会将其保留在管道中,最多 max_retry 次。如果指定了非 0 的 error_retry_delay,则作业至少在指定的秒数内不会重试。如果超过最大重试次数,作业将被 埋藏,并记录一个关键事件。

与生产者一样,您绝对不应该在您的工作者中使用 I/O 阻塞函数调用。如果需要执行 I/O 操作,请使用 AmpReactPHP 库。

有关示例,请参阅 tests/ 目录中的模拟工作者。

初始化

WorkerInterfaceProducerInterface 都支持在启动阶段由 ESB 框架调用的 init 方法。

init 方法必须返回一个 Amp 的 Promise。这允许您异步执行初始化操作(例如,使用远程 WSDL URL 实例化 SOAP 客户端)。

单元测试

您还可以(并且应该)对您的工作者和生产者进行单元测试。因为工作者和生产者必须返回 Promise 和迭代器,所以您必须在测试中使用 Amp 循环。您还应该使用 amphp/phpunit-util 在测试之间重置循环状态。

单元测试示例

以下是一个生产者测试的示例,该测试验证生产者是否根据给定目录中的XML文件生成库存更新作业。

public function testShouldProduceMultipleJobsWithMultipleEntriesFile()
{
    filesystem(new BlockingDriver());
    vfsStream::setup();
    $this->importFile = vfsStream::url('root/stock.xml');
    $this->producer = new Stock($this->importFile);
    copy(__DIR__ . '/StockTestFixtures/multiple_entries.xml', $this->importFile);

    $this->jobs = [];
    Loop::run(
        function () use ($data) {
            $iterator = $this->producer->produce($data);
            while (yield $iterator->advance()) {
                $this->jobs[] = $iterator->getCurrent();
            }
        }
    );

    $this->assertCount(52, $this->jobs);
    $this->assertEquals(new Job(['sku' => 'SKU-1', 'qty' => 9519.000]), $this->jobs[0]);
    $this->assertEquals(new Job(['sku' => 'SKU-23', 'qty' => 299.000]), $this->jobs[12]);
    $this->assertEquals(new Job(['sku' => 'SKU-50', 'qty' => 2017.000]), $this->jobs[21]);
}

以下是对相关工作者单元测试的示例,该工作者从作业中获取要更新的SKU和数量,然后执行API调用以更新数量。

public function testWorksSimpleJob()
{
    $this->sessionId = random_int(1, 1000);
    $this->client = $this->prophesize(Client::class);
    $this->clientFactory = $this->prophesize(Factory::class);
    $this->clientFactory->create()->willReturn(new Success($this->client->reveal()));
    $this->worker = new Stock($this->clientFactory->reveal());

    $sku = 'SKU-1';
    $qty = 10;
    $this->client
        ->login()
        ->shouldBeCalled()
        ->willReturn(new Success($this->sessionId))
    ;
    $this->client
        ->call('cataloginventory_stock_item.update', [$sku, ['qty' => $qty, 'is_in_stock' => true]])
        ->shouldBeCalled()
        ->willReturn(new Success(true))
    ;
    $this->client->endSession()->shouldBeCalled()->willReturn(new Success());

    $job = new QueuedJob(1, ['sku' => $sku, 'qty' => $qty]);
    Loop::run(function () use ($job) {
        yield $this->worker->init();
        yield $this->worker->work($job);
    });
}

Web控制台

提供了一个Web控制台UI,允许检查管道和作业;还可以搜索作业并重新排队。目前,Web控制台仅在HTTP(不是HTTPS)下可用,必须使用以下参数进行配置

# esb.yml
parameters:
  # ...
  console_port: 8080                      # Web console port
  console_username: admin                 # Web console username
  console_password: password              # Web console password
  console_log_file: /tmp/esb_console.log  # Web console server log file

# ...

例如,根据上述配置,您可以使用用户名admin和密码password通过URL http://<ip_or_hostname>:8080/ 访问Web控制台。

Web控制台HTTP服务器必须设置在不同的端口上,而不是由HttpRequestProducerInterface生产者使用的端口(由http_server_port参数标识)。

在Web控制台仪表板中,您可以检查配置的流的状况

Web Console 1

通过单击一个流,您可以看到该流的作业的可搜索列表。列表是分页的,允许手动重新排队作业,如果需要的话

Web Console 2

单击单个作业或查看按钮,可以查看特定作业的更多详细信息,并强制将其放回队列

Web Console 3

部署

如前所述,所有工作者和生产者都由单个PHP二进制文件管理。此二进制文件位于vendor/bin/esb。要部署和运行您的ESB应用程序,您只需将应用程序作为任何其他PHP应用程序进行部署(例如,使用Deployer)并确保vendor/bin/esb始终运行(我们建议使用Supervisord来完成此任务)。

请注意,vendor/bin/esb二进制文件将操作记录到stdout,并使用error_log()函数报告错误。通过标准的PHP CLI配置,所有error_log()条目随后都重定向到stderr。这是通过MonologStreamHandlerErrorHandler处理器来完成的。此外,所有警告(或更高级别)事件都由NativeMailHander(使用logger_mail_tologger_mail_from参数配置)处理。

您还可以使用esb.yml配置文件添加自己的处理器。

贡献

要贡献,只需fork此存储库,进行更改,然后提出pull请求。

我们建议使用Docker。实际上,提供了一个docker-compose.yml文件。

只需将.env.dist文件复制到.env,并根据您的环境调整环境变量值。

例如,要运行整个测试套件(PHP Code Sniffer、PHPStan、PHPUnit等...),您可以简单地运行

docker-compose run php composer tests

或者要仅运行PHPUnit测试,您可以使用

docker-compose run php vendor/bin/phpunit

测试套件使用ESB_BEANSTALKD_URL环境变量来获取Beanstalkd实例的连接URL。此环境变量已在提供的docker-compose.yml文件中设置。

您还可以使用Docker在本地上运行ESB的一个实例。您必须在根目录中创建一个esb.yml配置文件,然后运行

docker-compose up

许可协议

本库采用MIT许可协议。请参阅LICENSE文件中的完整许可协议。

致谢

Webgriffe®开发。