b2pweb/bdf-queue

Bdf队列组件

v1.5.3 2024-09-26 15:56 UTC

This package is auto-updated.

Last update: 2024-09-26 16:00:01 UTC


README

本包提供了消息代理抽象的2层。

  • 连接层
  • 目标层

build codecov Packagist Version Total Downloads Type Coverage

支持

使用说明

生产消息

首先,创建一个新的目标管理器实例。

<?php

use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory;
use Bdf\Queue\Connection\Pheanstalk\PheanstalkConnection;
use Bdf\Queue\Destination\ConfigurationDestinationFactory;
use Bdf\Queue\Destination\DestinationManager;
use Bdf\Queue\Destination\DestinationFactory;
use Bdf\Queue\Serializer\JsonSerializer;

// Declare connections
$driverFactory = new ResolverConnectionDriverFactory([
    'foo' => [
        'driver' => 'pheanstalk',
        'host' => 'localhost',
        'port' => '11300',
        'additionalOption' => 'value',
    ]
    // OR use DSN 'foo' => 'pheanstalk://:11300?additionalOption=value'
]);

// Declare drivers
$driverFactory->addDriverResolver('pheanstalk', function($config) {
    //echo $config['connection'] displays "foo"
    return new PheanstalkConnection($config['connection'], new JsonSerializer());
});

// Declare destination
// You can also declare your custom destination that defined type of transport (queue, multi queues, topic, ...),
// the connection to use, and the name of the queue(s) / topic(s) to use.
// This example will use the queue driver of the "foo" connection defined above. And send / consume message on the queue named "default".
$destinationFactory = new DestinationFactory(
    $driverFactory,
    ['my_destination' => 'queue://foo/default']
);

// To send a message to multiple destinations, you can use "aggregate" destination type.
// You can use a wildcard to send to all destinations that match the pattern.
// In this example, 'user' destination will be sent to the "foo" and "bar" queues, and to all topics that match the pattern "*.user"
$destinationFactory = new DestinationFactory(
    $driverFactory,
    [
        'foo' => 'queue://test/foo',
        'bar' => 'queue://test/bar',
        'a.user' => 'topic://a/user',
        'b.user' => 'topic://b/user',
        'user' => 'aggregate://foo,bar,*.user',
    ]
);

// Create the manager
$manager = new DestinationManager($driverFactory, $destinationFactory);

将一个基本消息推送到队列中。消费应定义处理器来处理消息。

<?php

use Bdf\Queue\Message\Message;

$message = Message::create('Hello world');
$message->setDestination('my_destination');
// or use a lower level setting the connection and queue.
$message = Message::create('Hello world', 'queue');
$message->setConnection('foo');

/** @var Bdf\Queue\Destination\DestinationManager $manager */
$manager->send($message);

适用于需要延迟处理过程的单体应用程序。将消息作业推送到队列。消费者将评估作业字符串并运行处理器。在这种情况下,生产者和接收者共享相同的模型。

<?php
$message = \Bdf\Queue\Message\Message::createFromJob(Mailer::class.'@send', ['body' => 'my content']);
$message->setDestination('my_destination');

/** @var Bdf\Queue\Destination\DestinationManager $manager */
$manager->send($message);

dsn目标可用的类型

Bdf\Queue\Destination\DsnDestinationFactory 提供默认的目标类型

您可以声明自己的类型

<?php

use Bdf\Dsn\DsnRequest;
use Bdf\Queue\Connection\ConnectionDriverInterface;
use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory;

/** @var ResolverConnectionDriverFactory $driverFactory */

$destinationFactory = new Bdf\Queue\Destination\DsnDestinationFactory($driverFactory);
$destinationFactory->register('my_own_type', function(ConnectionDriverInterface $connection, DsnRequest $dsn) {
    // ...
});

// use dsn as "my_own_type://connection/queue_or_topic_name?option="

消费消息

消费者层提供许多消息处理工具。默认的消息接收对象堆栈是

消费者 (ConsumerInterface) -> 接收者 (ReceiverInterface) -> 处理器 (ProcessorInterface) -> 处理器 (callable)

  • consumer 具有从队列/主题中读取消息的策略,它还管理优雅的关闭。
  • receivers 是与信封交互的中间件堆栈。
  • processor 解决处理器参数。您可以将业务逻辑插入此处并移除处理器层。默认情况下,处理器向处理器注入2个参数:消息数据和信封。
  • handler 管理业务逻辑。处理器允许无接口模式。

消费简单消息的示例

<?php

use Bdf\Queue\Consumer\Receiver\ProcessorReceiver;
use Bdf\Queue\Destination\DestinationManager;
use Bdf\Queue\Processor\CallbackProcessor;
use Bdf\Queue\Processor\MapProcessorResolver;

// Create your processor and declare in a map:
$myProcessor = new CallbackProcessor(function($data) {
    echo $data;
});
$processorResolver = new MapProcessorResolver(['foo' => $myProcessor]);

/** @var DestinationManager $manager */
$manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);

消费作业消息

<?php

use Bdf\Instantiator\Instantiator;
use Bdf\Queue\Consumer\Receiver\ProcessorReceiver;
use Bdf\Queue\Destination\DestinationManager;
use Bdf\Queue\Processor\JobHintProcessorResolver;

/** @var Instantiator $instantiator */

// The job should be provided from message to get the processor
$processorResolver = new JobHintProcessorResolver($instantiator);

/** @var DestinationManager $manager */
$manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);

创建处理器

<?php

/** @var Bdf\Queue\Destination\DestinationManager $manager */

class MyHandler
{
    public function handle($data, \Bdf\Queue\Message\EnvelopeInterface $envelope)
    {
        echo $data; // Display 'foo'
        
        // Ack the message. Default behavior. The ack is sent before the call by the consumer.
        $envelope->acknowledge();
        
        // Reject the message. It will be no more available. The message is rejected if and exception is thrown.
        $envelope->reject();
        
        // Reject the message and send it back to the queue
        $envelope->reject(true);
    }
}

$message = \Bdf\Queue\Message\Message::createFromJob(MyHandler::class, 'foo', 'queue');
$manager->send($message);

使用语法 "Class@method" 确定可调用对象(默认方法为 "handle")或使用接收者构建器在特定目标上注册处理器

<?php

use Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder;
use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader;
use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoaderInterface;
use Psr\Container\ContainerInterface;

/** @var ContainerInterface $container */
/** @var Bdf\Queue\Destination\DestinationManager $manager */

$container->set(ReceiverLoaderInterface::class, function (ContainerInterface $container) {
    return new ReceiverLoader(
        $container,
        [
            'destination_name or connection_name' => function(ReceiverBuilder $builder) {
                /** @var \Bdf\Queue\Processor\ProcessorInterface $myProcessor */
                /** @var \Bdf\Queue\Consumer\ReceiverInterface $myReceiver */

                // Register your unique handler for the destination or connection. 
                // all message will be handled by this handler.
                $builder->handler(MyHandler::class);
                
                // Or register your unique processor
                $builder->processor($myProcessor);
                
                // Or register the job bearer resolver as processor. The procesor will resolve the job
                // from the Message::$job attribute value.
                $builder->jobProcessor();
                
                // Or register your own processor or handler by queue in case you consume a connection.
                // By default the key of the map is the queue name. You can provide your own key provider 
                // with the second parameter.
                $builder->mapProcessor([
                    'queue1' => $myProcessor,
                    'queue2' => MyHandler::class,
                ]);
                
                // Or register your final own receiver
                $builder->outlet($myReceiver);
                
                // Or register your own receiver in the stack
                $builder->add($myReceiver);
                
                // You can add more defined middlewares here
                // $builder->retry(2);
            }
        ]
    );
});

$receiver = $container->get(ReceiverLoaderInterface::class)->load('destination_name or connection_name')->build();

$manager->create('queue://foo')->consumer($receiver)->consume(0);

在控制台运行消费者

$ example/consumer.php "connection name OR destination name"
创建接收器扩展

消费者使用接收器堆栈来扩展消息接收。请参阅接口 Bdf\Queue\Consumer\ReceiverInterface 和特质 Bdf\Queue\Consumer\DelegateHelper

<?php
class MyExtension implements \Bdf\Queue\Consumer\ReceiverInterface
{
    use \Bdf\Queue\Consumer\DelegateHelper;
    
    private $options;

    /**
     * MyExtension constructor.
     */
    public function __construct(\Bdf\Queue\Consumer\ReceiverInterface $delegate, array $options)
    {
        $this->delegate = $delegate;
        $this->options = $options;
    }
    
    /**
     * {@inheritdoc}
     */
    public function receive($message, \Bdf\Queue\Consumer\ConsumerInterface $consumer): void
    {
        // Do something when receiving message
        if ($message->queue() === 'foo') {
            return;        
        }

        // Call the next receiver
        $this->delegate->receive($message, $consumer);
    }
}

您可以使用 Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader::add() 在堆栈中注册您的接收器

<?php
$options = ['foo' => 'bar'];

/** @var \Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder $builder */
$builder->add(MyExtension::class, [$options]);

自定义字符串负载

Bdf\Queue\Serializer\SerializerInterface 管理发送到消息代理的负载内容。默认情况下,元数据被添加到json中,如下所示

  • PHP类型:帮助消费者反序列化复杂实体。
  • 消息信息:重试尝试次数,发送日期等。

一个基本的负载看起来像

{
  "name": "Foo",
  "data": "Hello World",
  "date": "2019-12-23T16:02:03+01:00"
}

您可以使用自己的序列化接口实现来自定义负载。

尝试hello world示例(在example/config/connections.php中配置消息代理)

$ example/producer.php foo '{"name":"Foo", "data":"Hello World"}' --raw
$ example/consumer.php foo

RPC客户端

<?php

use Bdf\Queue\Message\InteractEnvelopeInterface;
use Bdf\Queue\Message\Message;

class RpcReplyHandler
{
    public function doSomethingUseful(int $number, InteractEnvelopeInterface $envelope)
    {
        // Send bask: 1 x 2 to client
        $envelope->reply($number * 2);

        // Or retry in 10sec
        $envelope->retry(10);
    }
}

$message = Message::createFromJob(RpcReplyHandler::class.'@doSomethingUseful', 1, 'queue');
$message->setConnection('foo');

/** @var Bdf\Queue\Destination\DestinationManager $manager */
$promise = $manager->send($message);

// Consume the foo connection

// Receive data from the reply queue. If the header "replyTo" is not set, 
// the response will be sent to "queue_reply"
echo $promise->await(500)->data(); // Display 2

连接的附加选项

注意

消息的附加选项

序列化

基准测试

简单作业/闭包作业

分析

  • 为了最佳执行时间,无论大小如何,请使用默认的 Serializer
  • 为了更小的尺寸,无论时间如何,请使用 BdfSerializerCompressedSerializer
  • 为了最佳折衷,请使用 SerializerCompressedSerializer
    • 始终小于纯 BdfSerializer(JSON或二进制)
    • unserialize 上更快,在 serialize 上略慢
    • 比压缩的bdf大约快两倍,但在简单任务上的体积仅大约 40%

许可协议

在MIT许可协议的条款下分发。