freyo/laravel-queue-rocketmq

适用于阿里巴巴MQ(Apache RocketMQ)SDK的队列适配器

1.0.1 2020-10-20 08:29 UTC

This package is auto-updated.

Last update: 2024-09-20 17:23:23 UTC


README

AlibabaMQ(Apache RocketMQ) Driver for Laravel Queue

Software License Build Status Coverage Status Quality Score Packagist Version Total Downloads

FOSSA Status

安装

composer require freyo/laravel-queue-rocketmq

配置

Laravel 5.5+ 使用包自动发现功能,因此不需要您手动添加 ServiceProvider。

  1. config/app.php:
'providers' => [
  // ...
  Freyo\LaravelQueueRocketMQ\LaravelQueueRocketMQServiceProvider::class,
]
  1. .env:
QUEUE_DRIVER=rocketmq

ROCKETMQ_ACCESS_KEY=your-access-key
ROCKETMQ_ACCESS_ID=your-access-id

ROCKETMQ_ENDPOINT=http://***.mqrest.***.aliyuncs.com
ROCKETMQ_INSTANCE_ID=MQ_INST_***_***
ROCKETMQ_GROUP_ID=GID_***

ROCKETMQ_QUEUE=topic_name # default topic name

ROCKETMQ_USE_MESSAGE_TAG=false # set to true to use message tag
ROCKETMQ_WAIT_SECONDS=0

使用

配置完成后,您可以使用 Laravel 队列 API。如果您使用了其他队列驱动,则不需要更改其他任何内容。如果您不知道如何使用队列 API,请参阅官方 Laravel 文档:https://laravel.net.cn/docs/queues

示例

派发作业

默认连接名称为 rocketmq

// Without message tag (ROCKETMQ_USE_MESSAGE_TAG=false)
Job::dispatch()->onConnection('connection-name')->onQueue('TopicTestMQ');
// or dispatch((new Job())->onConnection('connection-name')->onQueue('TopicTestMQ'))

// With message tag (ROCKETMQ_USE_MESSAGE_TAG=true)
Job::dispatch()->onConnection('connection-name')->onQueue('TagA');
// or dispatch((new Job())->onConnection('connection-name')->onQueue('TagA'))

多个队列

配置 config/queue.php

'connections' => [
    //...
    'new-connection-name' => [
        'driver' => 'rocketmq',
        'access_key' => 'your-access-key',
        'access_id'  => 'your-access-id',
        'endpoint' => 'http://***.mqrest.***.aliyuncs.com',
        'instance_id' => 'MQ_INST_***_***',
        'group_id' => 'GID_***',
        'queue' => 'your-default-topic-name',
        'use_message_tag' => false,
        'wait_seconds' => 0,
        'plain' => [
            'enable' => false,
            'job' => 'App\Jobs\RocketMQPlainJobHandler@handle',
        ],
    ];
    //...
];

处理作业

php artisan queue:work {connection-name} --queue={queue-name}

普通模式

配置 .env

ROCKETMQ_PLAIN_ENABLE=true
ROCKETMQ_PLAIN_JOB=App\Jobs\RocketMQPlainJob@handle

创建一个实现 PlainPayload 接口的作业。方法 getPayload 必须返回一个字符串值。

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Freyo\LaravelQueueRocketMQ\Queue\Contracts\PlainPayload;

class RocketMQPlainJob implements ShouldQueue, PlainPayload
{
    use Dispatchable, InteractsWithQueue, Queueable;
    
    protected $payload;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($payload)
    {
        $this->payload = $payload;
    }
    
    /**
     * Get the plain payload of the job.
     *
     * @return string
     */
    public function getPayload()
    {
        return $this->payload;
    }
}

创建一个普通作业处理器

<?php

namespace App\Jobs;

use Illuminate\Queue\Jobs\Job;

class RocketMQPlainJobHandler
{
    /**
     * Execute the job.
     * 
     * @param \Illuminate\Queue\Jobs\Job $job
     * @param string $payload
     * 
     * @return void
     */
    public function handle(Job $job, $payload)
    {
        // processing your payload...
        var_dump($payload);
        
        // release back to the queue manually when failed.
        // $job->release();
        
        // delete message when processed.
        if (! $job->isDeletedOrReleased()) {
            $job->delete();
        }        
    }
}

参考

许可

MIT 许可证(MIT)。有关更多信息,请参阅许可证文件

FOSSA Status