joopschilder/reactphp-amqp-stream

此包已被废弃,不再维护。作者建议使用 toalett/react-amqp-stream 包。

包装一个 AMQP 消费者,使其表现得像一个流,以便在 ReactPHP 事件循环中使用

v0.1.0 2020-02-13 13:42 UTC

This package is auto-updated.

Last update: 2021-04-07 22:08:43 UTC


README

此包提供了一种在 ReactPHP 事件循环组件中像处理流一样处理 AMQP 消息的方法。
它依赖于我的另一个包,reactphp-input-stream

用法

基本用法

以下代码可以在 examples/simple_queue_consumer.php 中找到。

<?php

use JoopSchilder\React\Stream\AMQP\Message;
use JoopSchilder\React\Stream\AMQP\NonBlockingAMQPInputBuilder;
use JoopSchilder\React\Stream\AMQP\ValueObject\Queue;
use JoopSchilder\React\Stream\NonBlockingInput\ReadableNonBlockingInputStream;
use React\EventLoop\Factory;

require_once __DIR__ . '/path/to/autoload.php';

// Define a queue:
$queue = new Queue('my_queue');

// The builder allows you to define an exchange or a custom connection.
// By default, an AMQPStreamConnection is used (guest:guest@localhost:5672).
$input = NonBlockingAMQPInputBuilder::create($queue)->build();

// Create the EventLoop and add the AMQP consumer input to it:
$loop = Factory::create();
$stream = new ReadableNonBlockingInputStream($input, $loop);

// By default, a message needs to be acknowledged.
$stream->on('data', fn(Message $message) => print('m'));
$stream->on('data', fn(Message $message) => $message->ack());

// Add a timer for demonstration purposes...
$loop->addPeriodicTimer(0.2, fn() => print('.'));

// Run it.
$loop->run();

高级用法

以下代码可以在 examples/advanced_queue_consumer.php 中找到。

<?php

use JoopSchilder\React\Stream\AMQP\Message;
use JoopSchilder\React\Stream\AMQP\NonBlockingAMQPInputBuilder;
use JoopSchilder\React\Stream\AMQP\ValueObject\ConsumeArguments;
use JoopSchilder\React\Stream\AMQP\ValueObject\ConsumerTag;
use JoopSchilder\React\Stream\AMQP\ValueObject\Exchange;
use JoopSchilder\React\Stream\AMQP\ValueObject\Queue;
use JoopSchilder\React\Stream\NonBlockingInput\ReadableNonBlockingInputStream;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use React\EventLoop\Factory;

require_once __DIR__ . '/path/to/autoload.php';

$queue = Queue::create('my_queue')
	->setDurable(true)
	->setAutoDelete(false)
	->setPassive(false)
	->setNoWait(false)
	->setArguments([]);

$consumerTag = ConsumerTag::format('worker[pid=%d]', getmypid());

$exchange = Exchange::create('my_exchange', AMQPExchangeType::FANOUT)
	->setDurable(true)
	->setAutoDelete(false)
	->setPassive(false)
	->setNoWait(false);

$arguments = ConsumeArguments::create()
	->setArguments([])
	->setNoAck(true)
	->setExclusive(false)
	->setNowait(false)
	->setNoLocal(false);

$input = NonBlockingAMQPInputBuilder::create($queue)
	->setConsumerTag($consumerTag)
	->setExchange($exchange)
	->setConsumeArguments($arguments)
	->build();

$loop = Factory::create();
$stream = new ReadableNonBlockingInputStream($input, $loop);
$stream->on('data', fn(Message $message) => print('m'));
$loop->addPeriodicTimer(0.2, fn() => print('.'));
$loop->run();