willry/rmq

一个Redis消息队列管理器,允许在出错时使用工作进程和重试次数

4.0.1 2022-04-23 19:36 UTC

This package is auto-updated.

Last update: 2024-09-24 00:25:34 UTC


README

一个Redis消息队列管理器,允许在出错时使用工作进程和重试次数

消息队列管理器允许使用两种类型的数据队列

  • 列表
  • 集合

列表在处理“输入”和“输出”项目时表现出更高的性能,但在搜索和删除时较慢

集合在处理“输入”和“输出”项目时表现略逊于列表,但在搜索和删除时更快

如何使用?

通过composer安装

composer require willry/rmq

在demo目录中可以查看list和set的使用示例

包含以下文件

  • 消费者:consumer.php
  • 发布者:publisher.php
  • 操作:manipulate.php

性能

可以在队列中放置超过300,000个项目,同时使用少量CPU和多个工作进程

队列使用

需要创建一个负责处理的类,并实现\WillRy\RMQ\Worker接口

<?php

class WorkerTest implements \WillRy\RMQ\Worker
{

    public function handle(array $data = [])
    {
        try {
            /** Erro fake para simular o mecanismo de retentativa */
            if(rand() % 2 === 0) throw new \Exception("Erro");

            print("Success: {$data['id']}" . PHP_EOL);
        } catch (\Exception $e) {
            print("Retrying: {$data["id"]}" . PHP_EOL);
            throw $e;
        }

    }

    public function error($data)
    {
        print("Error: {$data["id"]}" . PHP_EOL);
    }
}

使用列表作为队列

发布

<?php

use WillRy\RMQ\QueueList;

//composer autoload
require __DIR__ . "/../../vendor/autoload.php";

$rmq = new QueueList("list", "redis", 6379);

for ($i = 0; $i < 300; $i++) {
    $rmq->publish([
        "id" => $i,
        "payload" => [
            "id" => $i,
            "name" => "Fulano",
            "descricao" => "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Morbi in justo nisl. Praesent pharetra ex vel nisl sagittis ullamcorper. Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur vestibulum, ipsum a vestibulum faucibus, lorem lorem semper turpis, vitae tristique est felis a urna. Cras gravida diam ac hendrerit venenatis. Vestibulum purus erat, maximus quis massa vitae, egestas euismod enim. Ut nec pulvinar nulla. Donec quis urna scelerisque, lacinia ante accumsan, fermentum eros. Aliquam sodales pulvinar quam non vehicula. Praesent odio libero, euismod at justo sed, feugiat ullamcorper orci. Cras id risus non nunc pharetra venenatis nec a leo. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae;Nunc mollis tellus odio, vel placerat nibh vehicula at. Praesent eu venenatis quam, sed tempor est. Praesent elit lectus, viverra vitae eros ac, semper posuere turpis. Proin porttitor sem nec urna consequat tempus vel sit amet magna. Aenean blandit, arcu eget accumsan porttitor, turpis mi elementum nunc, quis egestas dui risus eget magna. Sed sollicitudin mauris at dolor rhoncus, non fermentum tellus consequat. Vivamus dignissim vel quam eget pretium. Etiam vel magna aliquam, gravida erat eget, maximus orci. Pellentesque ac tempor nisl. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Vestibulum nibh tortor, auctor a pellentesque nec, rutrum ac arcu."
        ]
    ]);
}

消费

<?php

use WillRy\RMQ\QueueList;

//composer autoload
require __DIR__ . "/../../vendor/autoload.php";

require __DIR__ . "/../WorkerTest.php";

$rmq = new QueueList("list", "redis", 6379);

$worker = new WorkerTest();

/** Queue work */
$rmq->consume($worker, 1, true, 3);

使用有序集合作为队列

发布

<?php

use WillRy\RMQ\QueueSet;

//composer autoload
require __DIR__ . "/../../vendor/autoload.php";

$rmq = new QueueSet("set", "redis", 6379);

for ($i = 0; $i < 300; $i++) {
    $rmq->publish([
        "id" => $i,
        "payload" => [
            "id" => $i,
            "name" => "Fulano",
            "descricao" => "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Morbi in justo nisl. Praesent pharetra ex vel nisl sagittis ullamcorper. Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur vestibulum, ipsum a vestibulum faucibus, lorem lorem semper turpis, vitae tristique est felis a urna. Cras gravida diam ac hendrerit venenatis. Vestibulum purus erat, maximus quis massa vitae, egestas euismod enim. Ut nec pulvinar nulla. Donec quis urna scelerisque, lacinia ante accumsan, fermentum eros. Aliquam sodales pulvinar quam non vehicula. Praesent odio libero, euismod at justo sed, feugiat ullamcorper orci. Cras id risus non nunc pharetra venenatis nec a leo. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae;Nunc mollis tellus odio, vel placerat nibh vehicula at. Praesent eu venenatis quam, sed tempor est. Praesent elit lectus, viverra vitae eros ac, semper posuere turpis. Proin porttitor sem nec urna consequat tempus vel sit amet magna. Aenean blandit, arcu eget accumsan porttitor, turpis mi elementum nunc, quis egestas dui risus eget magna. Sed sollicitudin mauris at dolor rhoncus, non fermentum tellus consequat. Vivamus dignissim vel quam eget pretium. Etiam vel magna aliquam, gravida erat eget, maximus orci. Pellentesque ac tempor nisl. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Vestibulum nibh tortor, auctor a pellentesque nec, rutrum ac arcu.",
            "teste" => [1, 2, 3]
        ]
    ]);
}

消费

<?php

use WillRy\RMQ\QueueSet;

//composer autoload
require __DIR__ . "/../../vendor/autoload.php";

require __DIR__ . "/../WorkerTest.php";

$rmq = new QueueSet("set", "redis", 6379);

$worker = new WorkerTest();

/** Queue work */

$rmq->consume($worker, 1, true, 3);