b2pweb / bdf-queue
Bdf队列组件
Requires
- php: ~7.4 || ~8.0.0 || ~8.1.0 || ~8.2.0 || ~8.3.0
- ext-pcntl: *
- b2pweb/bdf-dsn: ~1.0
- b2pweb/bdf-instantiator: ~1.0
- symfony/console: ~5.4|~6.0|~7.0
- symfony/messenger: ~5.4|~6.0|~7.0
Requires (Dev)
- b2pweb/bdf-serializer: ~1.0
- doctrine/dbal: ~2.5|~3.0
- enqueue/enqueue: ~0.9
- enqueue/fs: ~0.9
- friendsofphp/php-cs-fixer: ^3.0
- jeremeamia/superclosure: ~2.1
- league/container: ~3.0
- monolog/monolog: ~2.0
- pda/pheanstalk: ^3.1@dev
- php-amqplib/php-amqplib: ~3.0
- phpbench/phpbench: ~0.0|~1.0
- phpunit/phpunit: ~9.6
- predis/predis: ~1.1.0
- ramsey/uuid: ~3.0|~4.0
- symfony/error-handler: ~5.4|~6.0|~7.0
- symfony/phpunit-bridge: ~5.4|~6.0|~7.0
- symfony/var-dumper: ~5.4|~6.0|~7.0
- vimeo/psalm: ~5.22
Suggests
- ext-redis: Required to use the Redis queue driver
- aws/aws-sdk-php: Required to use the SQS queue driver (~3.0).
- enqueue/enqueue: Required to use the Enqueue driver (~0.9)
- ext/rdkafka: Required to use the Kafka queue driver (~4.0|~5.0|~6.0)
- iron-io/iron_mq: Required to use the iron queue driver (~2.0).
- kwn/php-rdkafka-stubs: Required to use the Kafka queue driver (~2.0)
- pda/pheanstalk: Required to use the beanstalk queue driver (~3.0).
- php-amqplib/php-amqplib: Required to use the RabbitMQ queue driver (~2.6|~3.0)
- symfony/var-dumper: VarDumper could be used for displaying failed message (~5.4|~6.0|~7.0)
README
本包提供了消息代理抽象的2层。
- 连接层
- 目标层
支持
使用说明
生产消息
首先,创建一个新的目标管理器实例。
<?php use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory; use Bdf\Queue\Connection\Pheanstalk\PheanstalkConnection; use Bdf\Queue\Destination\ConfigurationDestinationFactory; use Bdf\Queue\Destination\DestinationManager; use Bdf\Queue\Destination\DestinationFactory; use Bdf\Queue\Serializer\JsonSerializer; // Declare connections $driverFactory = new ResolverConnectionDriverFactory([ 'foo' => [ 'driver' => 'pheanstalk', 'host' => 'localhost', 'port' => '11300', 'additionalOption' => 'value', ] // OR use DSN 'foo' => 'pheanstalk://:11300?additionalOption=value' ]); // Declare drivers $driverFactory->addDriverResolver('pheanstalk', function($config) { //echo $config['connection'] displays "foo" return new PheanstalkConnection($config['connection'], new JsonSerializer()); }); // Declare destination // You can also declare your custom destination that defined type of transport (queue, multi queues, topic, ...), // the connection to use, and the name of the queue(s) / topic(s) to use. // This example will use the queue driver of the "foo" connection defined above. And send / consume message on the queue named "default". $destinationFactory = new DestinationFactory( $driverFactory, ['my_destination' => 'queue://foo/default'] ); // To send a message to multiple destinations, you can use "aggregate" destination type. // You can use a wildcard to send to all destinations that match the pattern. // In this example, 'user' destination will be sent to the "foo" and "bar" queues, and to all topics that match the pattern "*.user" $destinationFactory = new DestinationFactory( $driverFactory, [ 'foo' => 'queue://test/foo', 'bar' => 'queue://test/bar', 'a.user' => 'topic://a/user', 'b.user' => 'topic://b/user', 'user' => 'aggregate://foo,bar,*.user', ] ); // Create the manager $manager = new DestinationManager($driverFactory, $destinationFactory);
将一个基本消息推送到队列中。消费应定义处理器来处理消息。
<?php use Bdf\Queue\Message\Message; $message = Message::create('Hello world'); $message->setDestination('my_destination'); // or use a lower level setting the connection and queue. $message = Message::create('Hello world', 'queue'); $message->setConnection('foo'); /** @var Bdf\Queue\Destination\DestinationManager $manager */ $manager->send($message);
适用于需要延迟处理过程的单体应用程序。将消息作业推送到队列。消费者将评估作业字符串并运行处理器。在这种情况下,生产者和接收者共享相同的模型。
<?php $message = \Bdf\Queue\Message\Message::createFromJob(Mailer::class.'@send', ['body' => 'my content']); $message->setDestination('my_destination'); /** @var Bdf\Queue\Destination\DestinationManager $manager */ $manager->send($message);
dsn目标可用的类型
类 Bdf\Queue\Destination\DsnDestinationFactory
提供默认的目标类型
您可以声明自己的类型
<?php use Bdf\Dsn\DsnRequest; use Bdf\Queue\Connection\ConnectionDriverInterface; use Bdf\Queue\Connection\Factory\ResolverConnectionDriverFactory; /** @var ResolverConnectionDriverFactory $driverFactory */ $destinationFactory = new Bdf\Queue\Destination\DsnDestinationFactory($driverFactory); $destinationFactory->register('my_own_type', function(ConnectionDriverInterface $connection, DsnRequest $dsn) { // ... }); // use dsn as "my_own_type://connection/queue_or_topic_name?option="
消费消息
消费者层提供许多消息处理工具。默认的消息接收对象堆栈是
消费者 (ConsumerInterface) -> 接收者 (ReceiverInterface) -> 处理器 (ProcessorInterface) -> 处理器 (callable)
consumer
具有从队列/主题中读取消息的策略,它还管理优雅的关闭。receivers
是与信封交互的中间件堆栈。processor
解决处理器参数。您可以将业务逻辑插入此处并移除处理器层。默认情况下,处理器向处理器注入2个参数:消息数据和信封。handler
管理业务逻辑。处理器允许无接口模式。
消费简单消息的示例
<?php use Bdf\Queue\Consumer\Receiver\ProcessorReceiver; use Bdf\Queue\Destination\DestinationManager; use Bdf\Queue\Processor\CallbackProcessor; use Bdf\Queue\Processor\MapProcessorResolver; // Create your processor and declare in a map: $myProcessor = new CallbackProcessor(function($data) { echo $data; }); $processorResolver = new MapProcessorResolver(['foo' => $myProcessor]); /** @var DestinationManager $manager */ $manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);
消费作业消息
<?php use Bdf\Instantiator\Instantiator; use Bdf\Queue\Consumer\Receiver\ProcessorReceiver; use Bdf\Queue\Destination\DestinationManager; use Bdf\Queue\Processor\JobHintProcessorResolver; /** @var Instantiator $instantiator */ // The job should be provided from message to get the processor $processorResolver = new JobHintProcessorResolver($instantiator); /** @var DestinationManager $manager */ $manager->create('queue://foo')->consumer(new ProcessorReceiver($processorResolver))->consume(0);
创建处理器
<?php /** @var Bdf\Queue\Destination\DestinationManager $manager */ class MyHandler { public function handle($data, \Bdf\Queue\Message\EnvelopeInterface $envelope) { echo $data; // Display 'foo' // Ack the message. Default behavior. The ack is sent before the call by the consumer. $envelope->acknowledge(); // Reject the message. It will be no more available. The message is rejected if and exception is thrown. $envelope->reject(); // Reject the message and send it back to the queue $envelope->reject(true); } } $message = \Bdf\Queue\Message\Message::createFromJob(MyHandler::class, 'foo', 'queue'); $manager->send($message);
使用语法 "Class@method"
确定可调用对象(默认方法为 "handle")或使用接收者构建器在特定目标上注册处理器
<?php use Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder; use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader; use Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoaderInterface; use Psr\Container\ContainerInterface; /** @var ContainerInterface $container */ /** @var Bdf\Queue\Destination\DestinationManager $manager */ $container->set(ReceiverLoaderInterface::class, function (ContainerInterface $container) { return new ReceiverLoader( $container, [ 'destination_name or connection_name' => function(ReceiverBuilder $builder) { /** @var \Bdf\Queue\Processor\ProcessorInterface $myProcessor */ /** @var \Bdf\Queue\Consumer\ReceiverInterface $myReceiver */ // Register your unique handler for the destination or connection. // all message will be handled by this handler. $builder->handler(MyHandler::class); // Or register your unique processor $builder->processor($myProcessor); // Or register the job bearer resolver as processor. The procesor will resolve the job // from the Message::$job attribute value. $builder->jobProcessor(); // Or register your own processor or handler by queue in case you consume a connection. // By default the key of the map is the queue name. You can provide your own key provider // with the second parameter. $builder->mapProcessor([ 'queue1' => $myProcessor, 'queue2' => MyHandler::class, ]); // Or register your final own receiver $builder->outlet($myReceiver); // Or register your own receiver in the stack $builder->add($myReceiver); // You can add more defined middlewares here // $builder->retry(2); } ] ); }); $receiver = $container->get(ReceiverLoaderInterface::class)->load('destination_name or connection_name')->build(); $manager->create('queue://foo')->consumer($receiver)->consume(0);
在控制台运行消费者
$ example/consumer.php "connection name OR destination name"
创建接收器扩展
消费者使用接收器堆栈来扩展消息接收。请参阅接口 Bdf\Queue\Consumer\ReceiverInterface
和特质 Bdf\Queue\Consumer\DelegateHelper
。
<?php class MyExtension implements \Bdf\Queue\Consumer\ReceiverInterface { use \Bdf\Queue\Consumer\DelegateHelper; private $options; /** * MyExtension constructor. */ public function __construct(\Bdf\Queue\Consumer\ReceiverInterface $delegate, array $options) { $this->delegate = $delegate; $this->options = $options; } /** * {@inheritdoc} */ public function receive($message, \Bdf\Queue\Consumer\ConsumerInterface $consumer): void { // Do something when receiving message if ($message->queue() === 'foo') { return; } // Call the next receiver $this->delegate->receive($message, $consumer); } }
您可以使用 Bdf\Queue\Consumer\Receiver\Builder\ReceiverLoader::add()
在堆栈中注册您的接收器
<?php $options = ['foo' => 'bar']; /** @var \Bdf\Queue\Consumer\Receiver\Builder\ReceiverBuilder $builder */ $builder->add(MyExtension::class, [$options]);
自定义字符串负载
类 Bdf\Queue\Serializer\SerializerInterface
管理发送到消息代理的负载内容。默认情况下,元数据被添加到json中,如下所示
- PHP类型:帮助消费者反序列化复杂实体。
- 消息信息:重试尝试次数,发送日期等。
一个基本的负载看起来像
{ "name": "Foo", "data": "Hello World", "date": "2019-12-23T16:02:03+01:00" }
您可以使用自己的序列化接口实现来自定义负载。
尝试hello world示例(在example/config/connections.php
中配置消息代理)
$ example/producer.php foo '{"name":"Foo", "data":"Hello World"}' --raw
$ example/consumer.php foo
RPC客户端
<?php use Bdf\Queue\Message\InteractEnvelopeInterface; use Bdf\Queue\Message\Message; class RpcReplyHandler { public function doSomethingUseful(int $number, InteractEnvelopeInterface $envelope) { // Send bask: 1 x 2 to client $envelope->reply($number * 2); // Or retry in 10sec $envelope->retry(10); } } $message = Message::createFromJob(RpcReplyHandler::class.'@doSomethingUseful', 1, 'queue'); $message->setConnection('foo'); /** @var Bdf\Queue\Destination\DestinationManager $manager */ $promise = $manager->send($message); // Consume the foo connection // Receive data from the reply queue. If the header "replyTo" is not set, // the response will be sent to "queue_reply" echo $promise->await(500)->data(); // Display 2
连接的附加选项
注意
- 有效DSN的格式:{driver}+{vendor}://{user}:{password}@{host}:{port}/{queue}?{option}=value
- 有关更多Kafka选项,请参阅 https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
消息的附加选项
序列化
基准测试
简单作业/闭包作业
分析
- 为了最佳执行时间,无论大小如何,请使用默认的
Serializer
- 为了更小的尺寸,无论时间如何,请使用
BdfSerializer
与CompressedSerializer
- 为了最佳折衷,请使用
Serializer
与CompressedSerializer
- 始终小于纯
BdfSerializer
(JSON或二进制) - 在 unserialize 上更快,在 serialize 上略慢
- 比压缩的bdf大约快两倍,但在简单任务上的体积仅大约 40%
- 始终小于纯
许可协议
在MIT许可协议的条款下分发。