martinhej/robocloud

Robocloud是AWS SDK for PHP的扩展,用于与Kinesis和DynamoDB一起使用

v0.4 2018-05-30 14:30 UTC

This package is not auto-updated.

Last update: 2024-09-29 05:21:10 UTC


README

警告

请注意,此代码高度实验性,预计在未来将进行更改,从而破坏向后兼容性。

同样,robocloud和robotalk的概念也是实验性的,距离完整还远。

尽管如此,它已经可以用于简单的事情,如迷宫解决机器人的合作或针对单个或多个机器人执行特定指令集。所以,不要犹豫,用它来为你的实验性机器人乐趣项目使用!!! :)

功能建立在AWS Kinesis流服务之上:https://aws.amazon.com/kinesis/。如果你不熟悉Kinesis,首先了解更多关于它的信息。是的,计划在将来将robocloud从Kinesis解耦。但如前所述,目前该项目高度实验性,处于原型设计阶段。

高级概述

想法是提供一个基础设施,其中两个或更多系统可以在所有方向上通信,以便它们不仅可以交换数据,还可以交换问题解决指令集,从而实现相互学习。

Robotalk

Robotalk将是用于执行此类通信的“语言”。

消息和模式

每个消息由两个模式文件定义。第一个是通用消息模式,它定义了所有消息都通用的基本消息结构。第二个定义了根据消息“目的”而变化的“数据”属性结构。

目前,消息模式库是此项目的一部分,位于“schema”目录下。请注意,消息“目的”属性定义了实际模式文件所在的目录结构。

Robocloud

Robocloud将是技术上启用Robotalk使用的平台。

提供的功能

Robocloud提供从AWS Kinesis流中推送和读取特定消息的功能。它遵循Kinesis消息生产者和消费者的概念。它执行了大部分繁重的工作,以利用Kinesis流,并添加了在从流中消费时验证和处理消息的可能性。

生产者示例

use robocloud\Event\KinesisProducerErrorConsoleLogger;
use robocloud\Kinesis\Client\Producer;
use robocloud\KinesisClientFactory;
use robocloud\Message\Message;
use robocloud\Message\MessageFactory;
use robocloud\Message\MessageSchemaValidator;
use Symfony\Component\Cache\Simple\FilesystemCache;
use Symfony\Component\EventDispatcher\EventDispatcher;

// Define the Kinesis stream name.
$stream_name = 'robocloud';

// Get an event dispatcher instance.
$event_dispatcher = new EventDispatcher();

// Add message schema validator.
$event_dispatcher->addSubscriber(new MessageSchemaValidator('schema/stream/robocloud/message'));

// Add error processors. A few simple ones that passivly log errors
// are available in robocloud/Event. To provide more robust error
// processing like requeuing failed messages you need to provide
// your own.
$event_dispatcher->addSubscriber(new KinesisProducerErrorConsoleLogger());

// Now get the message facory that will create and validate messages
// for you.
$message_factory = new MessageFactory(Message::class, $event_dispatcher);

// Use the message factory to set the message data.
$message_factory->setMessageData([
    'version' => 'v_0_1',
    'roboId' => 'lost',
    'purpose' => 'buddy.find',
    'data' => [
        'reason' => 'line_follower.line.lost',
    ],
]);

// Create the actual message that will be sent to Kinesis.
// This will throw exception if message data validation
// fails or if schema files could not be found.
$message = $message_factory->createMessage();

// Create instance of the Kinesis client factory.
$kinesis_factory = new KinesisClientFactory('2013-12-02', 'eu-west-1');

// Provide cache that will be used to cache shard info.
$cache = new FilesystemCache();

// Create the Producer instance.
$producer = new Producer(
    $kinesis_factory->getKinesisClient('AKIAJG2QTSBDKBFNACDA', 'Pg2c2AzMfY/5koj6b0IO3GgOvgF/m5nUDayjBOh/'),
    $stream_name,
    $message_factory,
    $event_dispatcher,
    $cache
);

// Add the message and push it to the stream.
$producer->add($message);
var_dump(array_map(function($result) {
    return (string) $result;
}, $producer->pushAll()));

消费者示例

// Define the Kinesis stream name.
use robocloud\Event\KinesisConsumerErrorConsoleLogger;
use robocloud\Kinesis\Client\Consumer;
use robocloud\Kinesis\Client\ConsumerRecovery;
use robocloud\KinesisClientFactory;
use robocloud\Message\Message;
use robocloud\Message\MessageFactory;
use robocloud\Message\MessageSchemaValidator;
use robocloud\MessageProcessing\Backend\KeepInMemoryBackend;
use robocloud\MessageProcessing\Filter\KeepAllFilter;
use robocloud\MessageProcessing\Processor\DefaultProcessor;
use robocloud\MessageProcessing\Transformer\KeepOriginalTransformer;
use Symfony\Component\Cache\Simple\FilesystemCache;
use Symfony\Component\EventDispatcher\EventDispatcher;

// Define the Kinesis stream name.
$stream_name = 'robocloud';

// Create event dispatcher instance.
$event_dispatcher = new EventDispatcher();

// Add message schema validator.
$event_dispatcher->addSubscriber(new MessageSchemaValidator('schema/stream/robocloud/message'));

// Add error handler(s).
$event_dispatcher->addSubscriber(new KinesisConsumerErrorConsoleLogger());

// Create filter instance that will be used to filter out only those messages
// that you are interested in.
$filter = new KeepAllFilter();
// The transformer layer is responsible for extracting and processing the
// message data into a form that is expected by your backend.
$keep_original_transformer = new KeepOriginalTransformer();
// Finally provide your backend that will finish the message processing.
$keep_in_memory_backend = new KeepInMemoryBackend();

// Add the message processor as the subscriber that will be used
// during consuming to process the messages.
$event_dispatcher->addSubscriber(new DefaultProcessor($filter, $keep_original_transformer, $keep_in_memory_backend));

// Get the message factory that will be used for creating the message objects
// from the data pulled from Kinesis.
$message_factory = new MessageFactory(Message::class, $event_dispatcher);

// Create instance of the Kinesis client factory.
$kinesis_factory = new KinesisClientFactory('2013-12-02', 'eu-west-1');

// Provide cache that will be used to cache shard info.
$cache = new FilesystemCache();

// Provide the recovery object used to store last read position.
$consumer_recovery = new ConsumerRecovery($stream_name, 'Shard-000001', '/tmp/consumer_recovery.rec');

// Instantiate the consumer and consume messages from Kinesis stream.
$consumer = new Consumer(
    $kinesis_factory->getKinesisClient('AKIAINK5P33X2KBK2RAQ', 'EuUdvE7WW0SKaEpGWMWHvN5M+gIjGaoLAVTYzzhV'),
    $message_factory,
    $event_dispatcher,
    $cache,
    $consumer_recovery
);

// One process
$consumer->consume(0);

// Print the messages to see what we pulled from the stream.
var_dump($keep_in_memory_backend->flush());

// Note that this example is very trivial not providing any real functionality.
// To get better idea on how to use message processor see other filter,
// transformer and backend classes in the MessageProcessing namespace.