byjg/message-queue-client

一种通用且可扩展的轻量级消息客户端,用于从亚马逊SQS、亚马逊SNS、谷歌PubSub、RabbitMQ、Kafka等消息服务器发布和消费消息。

4.9.0 2024-01-04 21:19 UTC

This package is auto-updated.

Last update: 2024-09-12 23:45:26 UTC


README

Build Status Opensource ByJG GitHub source GitHub license GitHub release

这是一个简单客户端,用于从消息队列服务器发布和消费消息。

功能

  • 低代码发布和消费消息
  • 消息、队列和连接器对象解耦
  • 易于实现新的连接器
┌─────────────────┐                  ┌────────────────────────┐
│                 │                  │  Envelope              │
│                 │                  │                        │
│                 │                  │                        │
│                 │                  │   ┌─────────────────┐  │
│                 │   publish()      │   │      Pipe       │  │
│                 ├─────────────────▶│   └─────────────────┘  │
│                 │                  │   ┌─────────────────┐  │
│                 │                  │   │     Message     │  │
│                 │                  │   └─────────────────┘  │
│                 │                  │                        │
│                 │                  └────────────────────────┘
│    Connector    │
│                 │
│                 │
│                 │       consume()     ┌─────────────────┐
│                 │◀────────────────────│      Pipe       │
│                 │                     └─────────────────┘
│                 │
│                 │
│                 │
└─────────────────┘

已实现的连接器

用法

发布

<?php
// Register the connector and associate with a scheme
ConnectorFactory::registerConnector(MockConnector::class);

// Create a connector
$connector = ConnectorFactory::create(new Uri("mock://local"));

// Create a queue
$pipe = new Pipe("test");
$pipe->withDeadLetter(new Pipe("dlq_test"));

// Create a message
$message = new Message("Hello World");

// Publish the message into the queue
$connector->publish(new Envelope($pipe, $message));

消费

<?php
// Register the connector and associate with a scheme
ConnectorFactory::registerConnector(MockConnector::class);

// Create a connector
$connector = ConnectorFactory::create(new Uri("mock://local"));

// Create a queue
$pipe = new Pipe("test");
$pipe->withDeadLetter(new Pipe("dlq_test"));

// Connect to the queue and wait to consume the message
$connector->consume(
    $pipe,                                 // Queue name
    function (Envelope $envelope) {         // Callback function to process the message
        echo "Process the message";
        echo $envelope->getMessage()->getBody();
        return Message::ACK;
    },
    function (Envelope $envelope, $ex) {    // Callback function to process the failed message
        echo "Process the failed message";
        echo $ex->getMessage();
        return Message::REQUEUE;
    }
);

消费方法将等待消息,并调用回调函数处理消息。如果没有消息在队列中,该方法将等待直到有消息到达。

如果您想退出消费方法,只需从回调函数返回Message::ACK | Message::EXIT

回调函数可能的返回值

  • Message::ACK - 确认消息并从队列中删除
  • Message::NACK - 不确认消息并从队列中删除。如果队列有死信队列,消息将被发送到死信队列。
  • Message::REQUEUE - 重新入队消息
  • Message::EXIT - 退出消费方法

消费者客户端

您可以使用ConsumerClientTrait简化消费方法。更多详细信息请参阅docs/consumer-client-trait.md

连接器

连接器是负责连接到消息队列服务器并发送/接收消息的类。

所有连接器都有以下接口

<?php
interface ConnectorInterface
{
    public function setUp(Uri $uri);

    public function getDriver();

    public function publish(Envelope $envelope);

    public function consume(Pipe $pipe, \Closure $onReceive, \Closure $onError, $identification = null);
}

不需要调用getDriver()方法,因为publish()consume()方法会自动调用它。只有在您需要直接访问连接时才使用getDriver()方法。

依赖关系

开源ByJG