jurry/amqp-symfony-bundle

RabbitMQ Symfony Bundle

1.1 2020-10-30 08:21 UTC

This package is auto-updated.

Last update: 2024-09-12 15:00:25 UTC


README

描述

它的目的是启动工作者(消费者)并发送“同步”和“异步”请求到其他队列或交换机。

安装

composer require jurry/amqp-symfony-bundle

使用

使用composer安装此包后,将创建一个新的配置文件 config/packages/jurry_rabbitmq.yaml。让我们看看

amqp_handler:
#    You can define connection in both ways: as array or as a string
#    connection: 'tcp://guest:guest@172.18.0.01:5672'
    connection:
        host: 172.18.0.1
        port: 5672
        user: guest
        password: guest
    queues_properties:
        sync_queue:
            name: stores_sync
            message_ttl: 10000 # 10 seconds
        async_queue:
            name: stores_async
            message_ttl: 10000 # 10 seconds
  • 如您所见,您有一个现成的yaml配置文件,您可以根据需要开始编辑这些值。
  • 根属性描述服务名称,而第二个属性 connection 描述与RabbitMQ代理服务器的连接字符串。

注意:连接属性可以有两种定义方式

  1. 连接字符串示例:“tcp://guest:guest@172.18.0.01:5672”
  2. 数组定义
  • 第三个也是最后一个属性是 queues_properties,它描述了与队列相关的配置,一旦工作者(消费者)启动就应该定义一次。

  • 一旦您根据自己的需求调整了配置,您可以通过运行以下命令来启动工作者(消费者)

./bin/console sync_worker # to start Sync consumer
./bin/console async_worker # to start Async consumer
  • 这两个命令将为您创建两个队列,第一个是“{name}_sync”,第二个是“{name}_async”

工作者(消费者)

工作者(消费者)正在寻找您注册的服务来启动并执行其中请求的方法,因此当另一个应用程序请求您的信息时,它们应在消息体中发送以下数据

{
  "route": "https://:8000/path/to/your/route", # base route should be set as ENV variable
  "method": "GET|POST|DELETE|PUT|...etc",
  "body": "Request body",
  "query": "Query parameters",
  "headers": "Request headers"
}

示例:假设服务A请求服务B的随机数

服务A会将此消息发送到服务B

{
  "route": "https://:8000/api/number",
  "method": "POST",
  "body": {"min": 1, "max": 10}
}

服务B将接收到消息,然后尝试消费它,因此,它应该有“POST /api/number”的路由来返回响应

class NumberController {
    public function generate(Request $request)
    {
        return rand($request->get('min'), $request->get('max'));
    }
}

您必须将base_uri设置为环境变量,只需在您的 .env 文件中设置这些值即可

JURRY_BASE_API_URI=https://:8000/api/
JURRY_HTTP_CLIENT_TIMEOUT=10

请求发送者

为了向另一个队列发送请求,您可以将 RequestSender 服务注入到任何您想要的类中,然后

use Jurry\RabbitMQ\Handler\RequestSender;

class FooBar {
    /** @var \Jurry\RabbitMQ\Handler\RequestSender*/
    private $requestSender;
    
    public function __construct(RequestSender $requestSender)
    {
        $this->requestSender = $requestSender;
    }
    
    public function getRandomNumberFromAnotherApp(int $min = 1, int $max = 10): int
    {
        return $this->requestSender
            ->setQueueName('external_queue_name') // This is the queue name which another app listens to
            ->setRoute('number/generate') // The path to the requested route, without providing the full link
            ->setMethod('post') // Http request method
            ->setBody(['min' => $min, 'max' => $max]) // Request body
            ->sendSync();
    }
}

变更日志

  1. 现在,此包通过向请求的路由发送http请求而不是直接调用服务类,来发起http请求。这使得使用控制器更容易、更完整。这将有助于使用控制器中实现的有效性验证、中间件和其他功能。

  2. 添加了新的环境变量

JURRY_BASE_API_URI=https://:8000/api/
JURRY_HTTP_CLIENT_TIMEOUT=10

这些值将在服务内部启动http请求时使用