marwanalsoltany/amqp-agent

针对90%的使用场景,优雅地封装了著名的php-amqplib。

v2.2.1 2022-05-23 11:09 UTC

This package is auto-updated.

Last update: 2024-09-23 16:36:55 UTC


README

针对90%的使用场景,优雅地封装了著名的php-amqplib。

PHP Version Latest Version on Packagist License Maintenance Documentation Total Downloads Scrutinizer Build Status Scrutinizer Code Coverage Scrutinizer Code Quality Travis Build Status StyleCI Code Style

目录

安装
关于AMQP Agent
API
文档
配置
示例
链接
许可
变更日志

Tweet

主要功能

  1. 框架无关,易于集成到任何代码库
  2. 一个直观且经过测试的API,自带对发布者消费者RPC端点的支持
  3. 包含大量辅助函数,让您无需深入了解即可快速启动
  4. 高度灵活性,可根据您的确切需求进行自定义
  5. 积极维护,文档完善,专注于语法糖

安装

现在尝试AMQP Agent

使用Packagist的Composer

composer require marwanalsoltany/amqp-agent

使用GitHub仓库的Composer(不稳定版)

将此配置复制到您的composer.json

"repositories": {
    "amqp-agent-repo": {
        "type": "vcs",
        "url": "https://github.com/MarwanAlsoltany/amqp-agent.git"
    }
},
"require": {
    "marwanalsoltany/amqp-agent": "dev-dev"
},
"minimum-stability": "dev"

运行

composer update

#ff6347 注意: AMQP Agent从版本v1.1.1开始默认支持PHP 7.1,如果您在旧版本中使用了php7.1-compatibility分支,请更新您的composer.json!

关于AMQP Agent

AMQP Agent试图简化在PHP项目中实现消息代理的过程。它消除了构建和配置对象或创建与RabbitMQ服务器(通过php-amqplib)通信所需的类所需的全部开销,并公开了一个经过测试的、完全可配置且灵活的API,几乎适用于任何项目。

php-amqplib库非常出色,运行良好。唯一的问题是,它非常基础,用于项目时,如果不重新创建自己的包装类,几乎不可能不写出面条代码。此外,它附带的大量函数、方法和配置(参数)使得实现合理的API变得非常困难。AMQP Agent通过尽可能多地实现抽象,同时不失去对工作者的控制,并重新引入与消息代理相关的术语来解决此问题,对于新手来说,只需处理发布者和消费者即可。

根据这个口号,AMQP Agent通过公开一些巧妙实现的流畅接口,使其与RabbitMQ一起工作变得尽可能有趣和优雅;它适合现代PHP开发,易于使用;功能强大,可以在任何工作点覆盖最小的怪癖。使用AMQP Agent,您只需几行代码就可以开始发布和消费消息!

AMQP Agent不覆盖php-amqplib的任何内容,也不更改与其函数相关的术语。它仅简化了它;消除了函数名称的噪音,并在某些地方进行了扩展。它还添加了一些功能,如工作者命令、动态通道等待和辅助方法。

AMQP Agent还为您的小型物联网项目提供了一个强大的基于事件的RPC客户端和RPC服务器。

使用AMQP代理的工作可以像这样简单

// Publisher
$publisher = new Publisher();
$publisher->work($messages);

// Consumer
$consumer = new Consumer();
$consumer->work($callback);

// RPC Client
$rpcClient = new ClientEndpoint();
$rpcClient->connect();
$response = $rpcClient->request($request);
$rpcClient->disconnect();

// RPC Server
$rpcServer = new ServerEndpoint();
$rpcServer->connect();
$request = $rpcServer->respond($callback);
$rpcServer->disconnect();

API

AMQP代理公开了一系列可以直接使用和可以扩展的抽象类。这两种类变体还有一个辅助子分类。

AMQP代理类

另请参阅:AbstractWorkerSingletonPublisherSingletonConsumerSingletonAbstractWorkerInterfacePublisherInterfaceConsumerInterfaceWorkerFacilitationInterfaceWorkerMutationTraitWorkerCommandTraitAbstractEndpointInterfaceClientEndpointInterfaceServerEndpointInterfaceEventTraitArrayProxyTraitClassProxyTraitAbstractParameters

参考文献

  • *C 具体: 此类是一个具体类,可以直接实例化。
  • *A 抽象: 此类是一个抽象类,不能直接实例化。
  • *H 辅助: 此类是一个辅助类。可以自由使用第三方替代品。
  • *R 推荐: 在使用AMQP代理时(最佳实践),建议使用此类。
  • *S 单例: 此类提供单例版本,通过在类名后添加“Singleton”后缀获得,可以通过*Singleton::getInstance()检索,例如Publisher -> PublisherSingleton

#ff6347 注意: 单例被认为是反模式,应尽可能避免使用,尽管存在一些用例。只有当你知道你在做什么时才使用单例。

配置

如果你只想快速发布和消费消息,一切都已经准备和配置好了,AMQP代理附带了遵循最佳实践的测试配置。你只需在你的文件中导入Publisher类和/或Consumer类,然后在实例上覆盖你想要覆盖的参数(例如RabbitMQ凭证)即可。

如果您想根据具体需求调整和微调AMQP代理的配置,需要做一点工作。您需要提供一个配置文件(见:maks-amqp-agent-config.php,并注意注释)。您不必提供所有内容,只需简单地写下您想要覆盖的参数即可,AMQP代理足够智能,可以自动补充缺失的部分。这些参数也可以在之后通过公共赋值符号或每次方法调用进行覆盖。

#1e90ff 事实: AMQP代理在配置文件和方法调用传递的参数数组中使用与php-amqplib相同的参数名称。

以下是一个配置文件的示例

<?php return [
    // Global
    'connectionOptions' => [
        'host'     => 'your-rabbitmq-server.com',
        'port'     => 5672,
        'user'     => 'your-username',
        'password' => 'your-password',
        'vhost'    => '/'
    ],
    'queueOptions' => [
        'queue'   => 'your.queue.name',
        'durable' => true,
        'nowait'  => false
    ],
    // Publisher
    'exchangeOptions' => [
        'exchange' => 'your.exchange.name',
        'type'     => 'direct'
    ],
    'bindOptions' => [
        'queue'    => 'your.queue.name',
        'exchange' => 'your.exchange.name'
    ],
    'messageOptions' => [
        'properties' => [
            'content_type'     => 'application/json',
            'content_encoding' => 'UTF-8',
            'delivery_mode'    => 2
        ]
    ],
    'publishOptions' => [
        'exchange'    => 'your.exchange.name',
        'routing_key' => 'your.route.name'
    ],
    // Consumer
    'qosOptions' => [
        'prefetch_count' => 25
    ],
    'waitOptions' => [
        'timeout' => 3600
    ],
    'consumeOptions' => [
        'queue'        => 'your.queue.name',
        'consumer_tag' => 'your.consumer.name',
        'callback'     => 'YourNamespace\YourClass::yourCallback'
    ]
    // RPC Endpoints
    'rpcQueueName' => 'your.rpc.queue.name'
];

#ff6347 注意: 数组第一级键名(以Options结尾)是AMQP代理特有的。

示例

在开始示例之前,我们需要澄清一些事情。首先值得一提的是,使用AMQP代理有多种获取工作进程的方式,有简单的方式、推荐的方式和更高级的方式。在获取到工作进程后,它就像粘土一样,您可以按照自己的意愿塑造它。这种模块化设计优雅地满足您的需求,推动代码库的可扩展性,并让每个人都感到满意。

获取工作进程的方式

  1. 最简单的方式是直接实例化工作进程,即使用new关键字。这种方式需要通过构造函数、方法调用或公共属性赋值来传递参数。
  2. 更高级的方式是获取单例工作进程,即PublisherSingleton::getInstance()。这种方式需要通过getInstance()方法、方法调用或公共属性赋值来传递参数。
  3. 更高级但推荐的方式是使用Client类的实例。这种方式也使代码更易于阅读,因为参数是从传递的配置中获取的。
// Instantiating Demo

use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\Worker\Publisher;
use MAKS\AmqpAgent\Worker\PublisherSingleton;
use MAKS\AmqpAgent\Worker\Consumer;
use MAKS\AmqpAgent\Worker\ConsumerSingleton;
use MAKS\AmqpAgent\RPC\ClientEndpoint;
use MAKS\AmqpAgent\RPC\ServerEndpoint;

$publisher1 = new Publisher(/* parameters can be passed here */);
$publisher2 = PublisherSingleton::getInstance(/* parameters can be passed here */);

$consumer1 = new Consumer(/* parameters can be passed here */);
$consumer2 = ConsumerSingleton::getInstance(/* parameters can be passed here */);

$rpcClientA = new ClientEndpoint(/* parameters can be passed here */);
$rpcServerA = new ServerEndpoint(/* parameters can be passed here */);

// the parameters from this Config object will be passed to the workers.
$config = new Config('path/to/your/config-file.php');
$client = new Client($config); // path can also be passed directly to Client

$publisher3 = $client->getPublisher(); // or $client->get('publisher');
$consumer3 = $client->getConsumer(); // or $client->get('consumer');

$rpcClientB = $client->getClientEndpoint(); // or $client->get('client.endpoint');
$rpcServerB = $client->getServerEndpoint(); // or $client->get('server.endpoint');

// Use $client->gettable() to get an array of all available services.

以下是发布者的示例

  1. 变体I:在工作者构造函数中传递参数。
// Publisher Demo 1

$messages = [
    'This is an example message. ID [1].',
    'This is an example message. ID [2].',
    'This is an example message. ID [3].'
];


$publisher = new Publisher(
    [
        // connectionOptions
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ],
    [
        // channelOptions
    ],
    [
        // queueOptions
        'queue' => 'test.messages.queue'
    ],
    [
        // exchangeOptions
        'exchange' => 'test.messages.exchange'
    ],
    [
        // bindOptions
        'queue' => 'test.messages.queue',
        'exchange' => 'test.messages.exchange'
    ],
    [
        // messageOptions
        'properties' => [
            'content_type' => 'text/plain',
        ]
    ],
    [
        // publishOptions
        'exchange' => 'test.messages.exchange'
    ]
);

// Variant I (1)
$publisher->connect()->queue()->exchange()->bind();
foreach ($messages as $message) {
    $publisher->publish($message);
}
$publisher->disconnect();

// Variant I (2)
$publisher->prepare();
foreach ($messages as $message) {
    $publisher->publish($message);
}
$publisher->disconnect();

// Variant I (3)
$publisher->work($messages);
  1. 变体II:按方法调用覆盖参数。
// Publisher Demo 2

$messages = [
    'This is an example message. ID [1].',
    'This is an example message. ID [2].',
    'This is an example message. ID [3].'
];


$publisher = new Publisher();

// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$publisher->connectionOptions = [
    'host' => 'localhost',
    'user' => 'guest',
    'password' => 'guest'
];
$publisher->connect();
$publisher->queue([
    'queue' => 'test.messages.queue'
]);
$publisher->exchange([
    'exchange' => 'test.messages.exchange'
]);
$publisher->bind([
    'queue' => 'test.messages.queue',
    'exchange' => 'test.messages.exchange'
]);
foreach ($messages as $message) {
    $publisher->publish(
        [
            'body' => $message,
            'properties' => [
                'content_type' => 'text/plain',
            ]
        ],
        [
            'exchange' => 'test.messages.exchange'
        ]
    );
}
$publisher->disconnect();

以下是消费者的示例

  1. 变体I:在工作者构造函数中传递参数。
// Consumer Demo 1

$consumer = new Consumer(
    [
        // connectionOptions
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ],
    [
        // channelOptions
    ],
    [
        // queueOptions
        'queue' => 'test.messages.queue'
    ],
    [
        // qosOptions
        'exchange' => 'test.messages.exchange'
    ],
    [
        // waitOptions
    ],
    [
        // consumeOptions
        'queue' => 'test.messages.queue',
        'callback' => 'YourNamespace\YourClass::yourCallback',
    ],
    [
        // publishOptions
        'exchange' => 'test.messages.exchange'
    ]
);

// Variant I (1)
$consumer->connect();
$consumer->queue();
$consumer->qos();
$consumer->consume();
$consumer->wait();
$consumer->disconnect();

// Variant I (2)
$consumer->prepare()->consume()->wait()->disconnect();

// Variant I (3)
$consumer->work('YourNamespace\YourClass::yourCallback');
  1. 变体II:按方法调用覆盖参数。
// Consumer Demo 2

$variable = 'This variable is needed in your callback. It will be the second, the first is always the message!';


$consumer = new Consumer();

// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$consumer->connectionOptions = [
    'host' => 'localhost',
    'user' => 'guest',
    'password' => 'guest'
];
$consumer->connect();
$consumer->queue([
    'queue' => 'test.messages.queue'
]);
$consumer->qos([
    'prefetch_count' => 10
]);
$consumer->consume(
    [
        'YourNamespace\YourClass',
        'yourCallback'
    ],
    [
        $variable
    ],
    [
        'queue' => 'test.messages.queue'
    ]
);
$consumer->wait();
$consumer->disconnect();

以下是RPC客户端的示例

  1. 变体I:在客户端构造函数中传递参数。
// RPC Client Demo 1

$rpcClient = new ClientEndpoint(
    // connectionOptions
    [
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ],
    // queueName
    'your.rpc.queue.name'
);
$rpcClient->connect();
$response = $rpcClient->request('{"command":"some-command","parameter":"some-parameter"}');
$rpcClient->disconnect();
  1. 变体II:按方法调用覆盖参数。
// RPC Client Demo 2

$rpcClient = new ClientEndpoint();
$rpcClient->connect(
    // connectionOptions
    [
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ],
    // queueName
    'your.rpc.queue.name'
);
$response = $rpcClient->request(
    '{"command":"some-command","parameter":"some-parameter"}',
    'your.rpc.queue.name'
);
$rpcClient->disconnect();

以下是RPC服务器的示例

  1. 变体I:在服务器构造函数中传递参数。
// RPC Server Demo 1

$rpcServer = new ServerEndpoint(
    // connectionOptions
    [
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ],
    // queueName
    'your.rpc.queue.name'
);
$rpcServer->connect();
$request = $rpcServer->respond('YourNamespace\YourClass::yourCallback');
$rpcServer->disconnect();
  1. 变体II:按方法调用覆盖参数。
// RPC Server Demo 2

$rpcServer = new ServerEndpoint();
$rpcServer->connect(
    // connectionOptions
    [
        'host' => 'localhost',
        'user' => 'guest',
        'password' => 'guest'
    ],
    // queueName
    'your.rpc.queue.name'
);
$request = $rpcServer->respond(
    'YourNamespace\YourClass::yourCallback',
    'your.rpc.queue.name'
);
$rpcServer->disconnect();

#1e90ff 事实: 在提供参数时,只需提供您需要的参数。AMQP代理足够智能,可以补充缺失的部分。

#32cd32 建议: 如果您在提供所需参数的配置文件后,在Client类实例上使用get($className),则可以简化上面示例中编写的重型构造函数。

#ff6347 注意: 有关方法的完整解释,请参阅AMQP Agent 文档。有关参数的完整解释,请参阅RabbitMQ 文档php-amqplib

高级示例

在这些示例中,您将看到如何在现实场景中与 AMQP Agent 一起工作。

  • 发布者示例:您将看到如何将具有优先级的消息发布到队列。使用 workers-commands 启动额外的消费者(子进程/线程)以实现冗余,并在消费者失败时发布通道关闭命令,在消费者完成工作后关闭消费者的通道。
// Advanced Publisher Demo

use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\Worker\Publisher;
use MAKS\AmqpAgent\Helper\Serializer;

// Preparing some data to work with.
$data = [];
for ($i = 0; $i < 10000; $i++) {
    $data[] = [
        'id' => $i,
        'importance' => $i % 3 == 0 ? 'high' : 'low', // Tag 1/3 of the messages with high importance.
        'text' => 'Test message with ID ' . $i
    ];
}

// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$config = new Config();

// Instantiating a client.
$client = new Client($config);

// Retrieving a serializer from the client.
/** @var \MAKS\AmqpAgent\Helper\Serializer */
$serializer = $client->get('serializer');

// Retrieving a publisher from the client.
/** @var \MAKS\AmqpAgent\Worker\Publisher */
$publisher = $client->get('publisher');

// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$publisher->connect();

// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$publisher->queue([
    'queue' => 'high.and.low.importance.queue',
    'arguments' => $publisher->arguments([
        'x-max-priority' => 2,
        'x-queue-mode' => 'lazy'
    ])
]);

// Declaring a direct exchange to publish messages to.
$publisher->exchange([
    'exchange' => 'high.and.low.importance.exchange',
    'type' => 'direct'
]);

// Binding the queue with the exchange.
$publisher->bind([
    'queue' => 'high.and.low.importance.queue',
    'exchange' => 'high.and.low.importance.exchange'
]);

// Publishing messages according to their priority.
foreach ($data as $item) {
    $payload = $serializer->serialize($item, 'JSON');
    if ($item['importance'] == 'high') {
        $publisher->publish(
            [
                'body' => $payload,
                'properties' => [
                    'priority' => 2
                ],
            ],
            [
                'exchange' => 'high.and.low.importance.exchange'
            ]
        );
        continue;
    }
    $publisher->publish(
        $payload, // Not providing priority will fall back to 0
        [
            'exchange' => 'high.and.low.importance.exchange'
        ]
    );
}

// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$publisher->publish(
    [
        'body' => $serializer->serialize(
            Publisher::makeCommand('start', 'consumer'),
            'JSON'
        ),
        'properties' => [
            'priority' => 1
        ],
    ],
    [
        'exchange' => 'high.and.low.importance.exchange'
    ]
);

// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$iterator = 2;
do {
    $publisher->publish(
        [
            'body' => $serializer->serialize(
                Publisher::makeCommand('close', 'channel'),
                'JSON'
            ),
            'properties' => [
                'priority' => 0
            ],
        ],
        [
            'exchange' => 'high.and.low.importance.exchange'
        ]
    );
    $iterator--;
} while ($iterator != 0);

// Close the connection with RabbitMQ server.
$publisher->disconnect();
  • 消费者示例:您将看到如何消费消息。读取 workers-commands 以启动额外的消费者(子进程/线程)并关闭消费者的通道。
// Advanced Consumer Demo

use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\Worker\Consumer;
use MAKS\AmqpAgent\Helper\Serializer;
use MAKS\AmqpAgent\Helper\Logger;

$config = new Config();
$client = new Client($config);

// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var \MAKS\AmqpAgent\Helper\Logger */
$logger = $client->get('logger');
$logger->setDirectory(__DIR__);
$logger->setFilename('high-and-low-importance-messages');

// Retrieving a serializer from the client.
/** @var \MAKS\AmqpAgent\Helper\Serializer */
$serializer = $client->get('serializer');

// Retrieving a consumer from the client.
/** @var \MAKS\AmqpAgent\Worker\Consumer */
$consumer = $client->get('consumer');

$consumer->connect();

// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$consumer->queue([
    'queue' => 'high.and.low.importance.queue',
    'arguments' => $consumer->arguments([
        'x-max-priority' => 2,
        'x-queue-mode' => 'lazy'
    ])
]);

// Overwriting the default quality of service.
$consumer->qos([
    'prefetch_count' => 1,
]);

// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$callback = function($message, &$client, $callback) {
    $data = $client->getSerializer()->unserialize($message->body, 'JSON');

    if (Consumer::isCommand($data)) {
        Consumer::ack($message);
        if (Consumer::hasCommand($data, 'close', 'channel')) {
            // Giving time for acknowledgements to take effect,
            // because the channel will be closed shortly
            sleep(5);
            // Close the channel using the delivery info of the message.
            Consumer::shutdown($message);
        } elseif (Consumer::hasCommand($data, 'start', 'consumer')) {
            $consumer = $client->getConsumer();
            // Getting a new channel on the same connection.
            $channel = $consumer->getNewChannel();
            $consumer->queue(
                [
                    'queue' => 'high.and.low.importance.queue',
                    'arguments' => $consumer->arguments([
                        'x-max-priority' => 2,
                        'x-queue-mode' => 'lazy'
                    ])
                ],
                $channel
            );
            $consumer->qos(
                [
                    'prefetch_count' => 1,
                ],
                $channel
            );
            $consumer->consume(
                $callback,
                [
                    &$client,
                    $callback
                ],
                [
                    'queue' => 'high.and.low.importance.queue',
                    'consumer_tag' => 'callback.consumer-' . uniqid()
                ],
                $channel
            );
        }
        return;
    }

    $client->getLogger()->write("({$data['importance']}) - {$data['text']}");
    // Sleep for 50ms to mimic some processing.
    usleep(50000);

    // The final step is acknowledgment so that no data is lost.
    Consumer::ack($message);
};

$consumer->consume(
    $callback,
    [
        &$client, // Is used to refetch the consumer, serializer, and logger.
        $callback // This gets passed to the consumer that get started by the callback.
    ],
    [
        'queue' => 'high.and.low.importance.queue'
    ]
);

// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$consumer->waitForAll();

// Close the connection with RabbitMQ server.
$consumer->disconnect();
  • RPC 客户端示例:您将看到如何向 RPC 服务器发送请求,并通过使用它提供的功能来扩展端点的功能。
// Advanced RPC Client Demo

use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\RPC\ClientEndpoint;

$config = new Config();
$client = new Client($config);

// Retrieving an RPC client endpoint from the client.
/** @var \MAKS\AmqpAgent\RPC\ClientEndpoint */
$rpcClient = $client->getClientEndpoint();

// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$rpcClient
    ->on('connection.after.open', function ($connection, $rpcClient, $eventName) {
        printf('%s has emitted [%s] event and is now connected!', get_class($rpcClient), $eventName);
        if ($connection instanceof AMQPStreamConnection) {
            printf('  The connection has currently %d channel(s).', count($connection->channels) - 1);
        }
    })->on('request.before.send', function ($request, $rpcClient, $eventName) {
        printf('%s has emitted [%s] event and is about to send a request!', get_class($rpcClient), $eventName);
        if ($request instanceof AMQPMessage) {
            $request->set('content_type', 'application/json')
            printf('  The request content_type header has been set to: %s', $request->get('content_type'));
        }
    });

// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$roundtrip = $rpcClient->ping();

$rpcClient->connect();
$response = $rpcClient->request('{"command":"some-command","parameter":"some-parameter"}');
$rpcClient->disconnect();
  • RPC 服务器示例:您将看到如何响应对 RPC 客户端的请求,并通过使用它提供的功能来扩展端点的功能。
// Advanced RPC Server Demo

use MAKS\AmqpAgent\Client;
use MAKS\AmqpAgent\Config;
use MAKS\AmqpAgent\RPC\ServerEndpoint;

$config = new Config();
$client = new Client($config);

// Retrieving an RPC server from the client.
/** @var \MAKS\AmqpAgent\RPC\ServerEndpoint */
$rpcServer = $client->getServerEndpoint();

// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$rpcServer
    ->on('request.on.get', function ($request, $rpcServer, $eventName) {
        printf('%s has emitted [%s] event and has just got a request!', get_class($rpcServer), $eventName);
        if ($request instanceof AMQPMessage) {
            printf('  The request has the following body: %s', $request->body;
        }
    });

$rpcServer->connect();
$request = $rpcServer->respond('YourNamespace\YourClass::yourCallback');
$rpcServer->disconnect();

#1e90ff 事实: 如果您将所有参数更改放在配置文件中并通过它传递给客户端而不是默认值,则可以将发布者/消费者高级示例中的代码变得更加简单。

#32cd32 建议: AMQP Agent 代码库有良好的文档,请参阅此链接查看所有类和方法。

链接

许可

由于 php-amqplib 许可证,AMQP Agent 是一个开源软件包,根据 GNU LGPL v2.1 许可。
版权所有(c)2020 Marwan Al-Soltany。保留所有权利。