ikilobyte / pulsar-client-php
Apache Pulsar 的 PHP 原生客户端库
v1.3.3
2024-07-22 13:20 UTC
Requires
- php: >=7.0
- ext-curl: *
- google/crc32: ^0.1.0
- protobuf-php/protobuf: ^0.1.3
Requires (Dev)
- symfony/var-dumper: ^3.4
README
内容
关于
英语 | 中文
这是一个用 php 实现的 Apache Pulsar 客户端库,参考 PulsarApi.proto,并支持 Swoole 协程
特性
- 支持 URL(
pulsar://
、pulsar+ssl://
、http://
、https://
) - 多主题消费者
- TLS 连接
- 自动重连(仅消费者)
- 消息批处理
- 消息属性
- 使用
zstd
、zlib
压缩 - 使用
jwt
、basic
身份验证
要求
- PHP >=7.0(支持 PHP8)
- ZLib 扩展(如果需要使用
zlib
压缩) - Zstd 扩展(如果需要使用
zstd
压缩) - Swoole 扩展(如果需要在 Swoole 中使用)
- 在 Swoole 中使用只需要开启
SWOOLE_HOOK_SOCKETS、SWOOLE_HOOK_STREAM_FUNCTION
或SWOOLE_HOOK_ALL
- 在 Swoole 中使用只需要开启
提示
如果出现以下错误,请安装扩展
gmp
或bcmath
Negative integers are only supported with GMP or BC (64bit) intextensions.
安装
composer require ikilobyte/pulsar-client-php
生产者
<?php use Pulsar\Authentication\Basic; use Pulsar\Authentication\Jwt; use Pulsar\Compression\Compression; use Pulsar\Producer; use Pulsar\ProducerOptions; use Pulsar\MessageOptions; require_once __DIR__ . '/vendor/autoload.php'; $options = new ProducerOptions(); // If permission authentication is available // use JWT authentication $options->setAuthentication(new Jwt('token')); // use Basic authentication //$options->setAuthentication(new Basic('user','password')); $options->setConnectTimeout(3); $options->setTopic('persistent://public/default/demo'); $options->setCompression(Compression::ZLIB); $producer = new Producer('pulsar://:6650', $options); // or use pulsar proxy address //$producer = new Producer('https://:8080', $options); $producer->connect(); for ($i = 0; $i < 10; $i++) { $messageID = $producer->send(sprintf('hello %d',$i)); $messageID = $producer->send(sprintf('hello properties %d',$i),[ MessageOptions::PROPERTIES => [ 'key' => 'value', 'ms' => microtime(true), ], ]); echo 'messageID ' . $messageID . "\n"; } // Sending delayed messages for ($i = 0; $i < 10; $i++) { $producer->send(sprintf('hello-delay %d',$i),[ MessageOptions::DELAY_SECONDS => $i * 5, // Seconds ]); } // Send Batch message // The underlying protocol will automatically package these messages into a message and send it to pulsar $messages = []; for ($i = 0;$i < 10;$i++) { $messages[] = json_encode([ 'id' => $i, 'now' => date('Y-m-d H:i:s') ]); } $messageID = $producer->send($messages); echo "batch message id ${messageID}\n"; // close $producer->close();
保持连接(推荐)
- 需要
Swoole
扩展 - 如果是常驻内存应用程序,建议开启。
- 将保持连接,无需反复建立连接
- 调用
close
方法关闭连接 - 请参阅 示例
$options->setKeepalive(true);
消息去重
- 消息去重是 Pulsar 提供的功能,基于生产者名称和序列号 ID
- 同一生产者的名称需要固定且唯一,通常通过业务纬度区分,每个消息的序列号 ID 是唯一的且自增。
- 参考 Pulsar 文档
$options = new ProducerOptions(); $options->setProducerName('name'); $producer = new Producer('pulsar://:6650', $options); $producer->send('body',[ \Pulsar\MessageOptions::SEQUENCE_ID => 123456, ]);
消费者
<?php use Pulsar\Authentication\Jwt; use Pulsar\Authentication\Basic; use Pulsar\Consumer; use Pulsar\ConsumerOptions; use Pulsar\SubscriptionType; use Pulsar\Proto\CommandSubscribe\InitialPosition; require_once __DIR__ . '/vendor/autoload.php'; $options = new ConsumerOptions(); // If permission authentication is available // use JWT authentication $options->setAuthentication(new Jwt('token')); // use Basic authentication //$options->setAuthentication(new Basic('user','password')); $options->setConnectTimeout(3); $options->setTopic('persistent://public/default/demo'); $options->setSubscription('logic'); $options->setSubscriptionType(SubscriptionType::Shared); // Initial position at which to set cursor when subscribing to a topic at first time. // default use InitialPosition::Latest() // $options->setSubscriptionInitialPosition(InitialPosition::Earliest()); // Configure how many seconds Nack's messages are redelivered, the default is 1 minute $options->setNackRedeliveryDelay(20); $consumer = new Consumer('pulsar://:6650', $options); // or use pulsar proxy address //$consumer = new Consumer('https://:8080', $options); $consumer->connect(); while (true) { $message = $consumer->receive(); // get properties var_export($message->getProperties()); echo sprintf('Got message 【%s】messageID[%s] topic[%s] publishTime[%s] redeliveryCount[%d]', $message->getPayload(), $message->getMessageId(), $message->getTopic(), $message->getPublishTime(), $message->getRedeliveryCount() ) . "\n"; // ... // Remember to confirm that the message is complete after processing $consumer->ack($message); // When processing fails, you can also execute the Nack // The message will be re-delivered after the specified time // $consumer->nack($message); } $consumer->close();
接收批量消息
- 只有当生产者批量发送消息时,才能接收批量消息。
$messages = $consumer->batchReceive(); foreach ($messages as $message) { // ... // Ack $consumer->ack($message); }
订阅多个主题
$options->setTopics([ 'persistent://public/default/demo-1', 'persistent://public/default/demo-2', 'persistent://public/default/demo-3', //.... ]);
死信主题
// Assuming that the subject matter is: <topicname>-<subscriptionname>-DLQ $options->setDeadLetterPolicy(6); // Custom topic name $options->setDeadLetterPolicy(6,'persistent://public/default/demo-dead'); // Custom subscription name $options->setDeadLetterPolicy(6,'persistent://public/default/demo-dead','sub-name');
重连(仅支持消费者)
// start reconnect $options->setReconnectPolicy(true); // Reconnect interval(seconds) $options->setReconnectPolicy(true,3); // Maximum number of reconnections $options->setReconnectPolicy(true,3,100);
不循环接收并平滑退出
$running = true; // kill -15 $PID pcntl_signal(SIGTERM,function() use (&$running){ $running = false; }); while ($running) { try { $message = $consumer->receive(false); // ... } catch (\Pulsar\Exception\MessageNotFound $e) { if ($e ->getCode() != \Pulsar\Exception\MessageNotFound::Ignore) { die($e->getMessage()); } echo "Message Not Found\n"; continue; } catch (Throwable $e) { echo $e->getMessage() . "\n"; throw $e; } finally { pcntl_signal_dispatch(); } }
TLS
-
请参考官方 文档 进行证书配置
-
示例
$tls = new \Pulsar\TLSOptions('./cert.pem','./cert.key.pem'); // Establishing a TLS connection without a certificate //$tls = new \Pulsar\TLSOptions('',''); // CA Cert $tls->setTrustCertsFilePath('./ca.cart.pem'); // optional $tls->setAllowInsecureConnection(false); $tls->setValidateHostname(true); $options->setTLS($tls); $consumer = new \Pulsar\Consumer('pulsar+ssl://:6651',$options); //$producer = new \Pulsar\Producer('pulsar+ssl://:6651',$options); // or https $consumer = new \Pulsar\Consumer('https://:8081',$options); //$producer = new \Pulsar\Producer('https://:8081',$options);
模式
-
目前仅支持
INT8
、INT16
、INT32
、INT64
、DOUBLE
、STRING
、JSON
,以下代码使用JSON Schema
作为示例 -
model.php
<?php class Person { public $id; public $name; public $age; // ... }
- 生产者语句模式
<?php $define = '{"type":"record","name":"Person","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"}]}'; $schema = new \Pulsar\Schema\SchemaJson($define, [ 'key' => 'value', ]); // ... some code $producerOptions->setSchema($schema); $producer = new \Pulsar\Producer('xx',$options); $producer->connect(); $person = new Person(); $person->id = 1; $person->name = 'Tony'; $person->age = 18; // directly send Person Object No need to manually convert to json string $id = $producer->send($person);
- 消费者语句模式
<?php $define = '{"type":"record","name":"Person","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"age","type":"int"}]}'; $schema = new \Pulsar\Schema\SchemaJson($define, [ 'key' => 'value', ]); // ... some code $consumerOptions->setSchema($schema); $consumer = new \Pulsar\Consumer('pulsar://xxx',$consumerOptions); $consumer->connect(); while (true) { $message = $consumer->receive(); $person = new Person(); $message->getSchemaValue($person); echo sprintf( 'payload %s id %d name %s age %d', $message->getPayload(), $person->id, $person->name, $person->age ) . "\n"; // .. some code }
读取器
<?php use Pulsar\Message; use Pulsar\Reader; use Pulsar\ReaderOptions; require_once __DIR__ . '/../vendor/autoload.php'; $options = new ReaderOptions(); // If permission authentication is available // Only JWT authentication is currently supported // $options->setAuthentication(new Jwt('token')); $options->setConnectTimeout(3); $options->setTopic('persistent://public/default/demo'); // support partition topic // Read the latest message $options->setStartMessageID(Message::latestMessageIdData()); // From the earliest message // $options->setStartMessageID(Message::earliestMessageIdData()); // Start reading from a message // $options->setStartMessageID(Message::deserialize('621:103:0')); $reader = new Reader('pulsar://:6650', $options); $reader->connect(); while (true) { $message = $reader->next(); echo sprintf('Got message 【%s】messageID[%s] topic[%s] publishTime[%s]', $message->getPayload(), $message->getMessageId(), $message->getTopic(), $message->getPublishTime() ) . "\n"; } $reader->close();
选项
- 生产者选项
- setTopic()
- setAuthentication()
- setConnectTimeout()
- setProducerName()
- setCompression()
- setSchema()
- setKeepalive()
- 消费者选项
- setTopic()
- setTopics()
- setAuthentication()
- setConnectTimeout()
- setConsumerName()
- setSubscription()
- setSubscriptionType()
- setNackRedeliveryDelay()
- setReceiveQueueSize()
- setDeadLetterPolicy()
- setSubscriptionInitialPosition()
- setReconnectPolicy()
- setSchema()
- 读取器选项
- setTopic()
- setAuthentication()
- setConnectTimeout()
- setReaderName()
- setStartMessageID()
- setReceiveQueueSize()
- 消息选项
- DELAY_SECONDS
- SEQUENCE_ID
- PROPERTIES
- TLSOption (v1.3.0)
- __construct(string $certFilePath, string $keyFilePath)
- setTrustCertsFilePath()
- setValidateHostname()
- setAllowInsecureConnection()
MessageNotFound ErrCode (v1.2.1)
MessageNotFound::Ignore
MessageNotFound::CommandParseFail
许可
MIT 许可证