duodashuang/php-queue

PHP消息队列客户端

v1.2.0 2018-07-09 09:02 UTC

This package is auto-updated.

Last update: 2024-08-27 22:39:33 UTC


README

一个PHP消息队列客户端,支持RabbitMQ、Kafka和Redis。

要求

安装

  • composer

      composer require dudashuang/php-queue

基本用法

  • examples

  • 创建驱动

    • redis

      <?php
      require __DIR__ . '/vendor/autoload.php';
      
      $connector  = new Lily\Connectors\RedisConnector([
          'scheme'             => 'tcp',
          'host'               => '127.0.0.1',
          'port'               => 6379,
          'read_write_timeout' => 0,
      ]);
      $driver = new Lily\Drivers\Redis($connector);
      
    • kafka

      <?php
      require __DIR__ . '/vendor/autoload.php';
      
      $connector  = new Lily\Connectors\KafkaConnector([
          'brokers' => [
              ['host' => 'localhost', 'port' => 9092],
          ],
      ]);
      $driver = new Lily\Drivers\Kafka($connector);
      
    • rabbitmq

      <?php
      require __DIR__ . '/vendor/autoload.php';
      
      $connector  = new Lily\Connectors\RabbitMQConnector([
          'host'     => 'localhost',
          'port'     => 5672,
          'username' => 'guest',
          'password' => 'guest',
      ]);
      $driver = new Lily\Drivers\RabbitMQ($connector);
      
  • 创建应用

    <?php
    require __DIR__ . '/vendor/autoload.php';
        
    $application = new Lily\Application($driver, [
        'deafult_queue' => 'default-queue',
        'failed_queue'  => 'failed-queue',
    ]);
    
  • 分发任务

    • 默认队列

      for ($i=0; $i<10; $i++) {
          $application->dispatch(new TestJob('hello', new LilyTest\TestModel(1, 2)));
      }
      
    • 其他队列

      $application->dispatch((new TestJob(...))->set_queue($queue_name));
      
  • 分发事件

    $application->dispatch(new TestEvent(...));
    
  • 创建消费者

    $application−>consume($queue_name);
    
  • 创建监听器

    $application->listen('LilyTest\Listeners\SendListener', ['TestEvent', 'TestEvent1']);
    

待办事项

  • 添加RocketMQ驱动

  • 添加延迟队列

附加

您可以使用supervisor来管理消费者

  • 安装

      sudo apt-get install supervisor
  • supervisor配置

      sudo vim /etc/supervisor/conf.d/guopika.config
      [program:guopika-worker]
      process_name=%(program_name)s_%(process_num)02d
      command=php /path_to_listen_queue/cli_func
      autostart=true
      autorestart=true
      user=www-data
      numprocs=4
      redirect_stderr=true
      stdout_logfile=/path/worker.log