martinhej / robocloud
Robocloud是AWS SDK for PHP的扩展,用于与Kinesis和DynamoDB一起使用
Requires
- php: ^7.1.3
- aws/aws-sdk-php: ^3.30
- symfony/cache: ^4.0
- symfony/event-dispatcher: ^4.0
Requires (Dev)
- phpunit/phpunit: ^7.2@dev
- symfony/phpunit-bridge: ^4.1@dev
This package is not auto-updated.
Last update: 2024-09-29 05:21:10 UTC
README
警告
请注意,此代码高度实验性,预计在未来将进行更改,从而破坏向后兼容性。
同样,robocloud和robotalk的概念也是实验性的,距离完整还远。
尽管如此,它已经可以用于简单的事情,如迷宫解决机器人的合作或针对单个或多个机器人执行特定指令集。所以,不要犹豫,用它来为你的实验性机器人乐趣项目使用!!! :)
功能建立在AWS Kinesis流服务之上:https://aws.amazon.com/kinesis/。如果你不熟悉Kinesis,首先了解更多关于它的信息。是的,计划在将来将robocloud从Kinesis解耦。但如前所述,目前该项目高度实验性,处于原型设计阶段。
高级概述
想法是提供一个基础设施,其中两个或更多系统可以在所有方向上通信,以便它们不仅可以交换数据,还可以交换问题解决指令集,从而实现相互学习。
Robotalk
Robotalk将是用于执行此类通信的“语言”。
消息和模式
每个消息由两个模式文件定义。第一个是通用消息模式,它定义了所有消息都通用的基本消息结构。第二个定义了根据消息“目的”而变化的“数据”属性结构。
目前,消息模式库是此项目的一部分,位于“schema”目录下。请注意,消息“目的”属性定义了实际模式文件所在的目录结构。
Robocloud
Robocloud将是技术上启用Robotalk使用的平台。
提供的功能
Robocloud提供从AWS Kinesis流中推送和读取特定消息的功能。它遵循Kinesis消息生产者和消费者的概念。它执行了大部分繁重的工作,以利用Kinesis流,并添加了在从流中消费时验证和处理消息的可能性。
生产者示例
use robocloud\Event\KinesisProducerErrorConsoleLogger; use robocloud\Kinesis\Client\Producer; use robocloud\KinesisClientFactory; use robocloud\Message\Message; use robocloud\Message\MessageFactory; use robocloud\Message\MessageSchemaValidator; use Symfony\Component\Cache\Simple\FilesystemCache; use Symfony\Component\EventDispatcher\EventDispatcher; // Define the Kinesis stream name. $stream_name = 'robocloud'; // Get an event dispatcher instance. $event_dispatcher = new EventDispatcher(); // Add message schema validator. $event_dispatcher->addSubscriber(new MessageSchemaValidator('schema/stream/robocloud/message')); // Add error processors. A few simple ones that passivly log errors // are available in robocloud/Event. To provide more robust error // processing like requeuing failed messages you need to provide // your own. $event_dispatcher->addSubscriber(new KinesisProducerErrorConsoleLogger()); // Now get the message facory that will create and validate messages // for you. $message_factory = new MessageFactory(Message::class, $event_dispatcher); // Use the message factory to set the message data. $message_factory->setMessageData([ 'version' => 'v_0_1', 'roboId' => 'lost', 'purpose' => 'buddy.find', 'data' => [ 'reason' => 'line_follower.line.lost', ], ]); // Create the actual message that will be sent to Kinesis. // This will throw exception if message data validation // fails or if schema files could not be found. $message = $message_factory->createMessage(); // Create instance of the Kinesis client factory. $kinesis_factory = new KinesisClientFactory('2013-12-02', 'eu-west-1'); // Provide cache that will be used to cache shard info. $cache = new FilesystemCache(); // Create the Producer instance. $producer = new Producer( $kinesis_factory->getKinesisClient('AKIAJG2QTSBDKBFNACDA', 'Pg2c2AzMfY/5koj6b0IO3GgOvgF/m5nUDayjBOh/'), $stream_name, $message_factory, $event_dispatcher, $cache ); // Add the message and push it to the stream. $producer->add($message); var_dump(array_map(function($result) { return (string) $result; }, $producer->pushAll()));
消费者示例
// Define the Kinesis stream name. use robocloud\Event\KinesisConsumerErrorConsoleLogger; use robocloud\Kinesis\Client\Consumer; use robocloud\Kinesis\Client\ConsumerRecovery; use robocloud\KinesisClientFactory; use robocloud\Message\Message; use robocloud\Message\MessageFactory; use robocloud\Message\MessageSchemaValidator; use robocloud\MessageProcessing\Backend\KeepInMemoryBackend; use robocloud\MessageProcessing\Filter\KeepAllFilter; use robocloud\MessageProcessing\Processor\DefaultProcessor; use robocloud\MessageProcessing\Transformer\KeepOriginalTransformer; use Symfony\Component\Cache\Simple\FilesystemCache; use Symfony\Component\EventDispatcher\EventDispatcher; // Define the Kinesis stream name. $stream_name = 'robocloud'; // Create event dispatcher instance. $event_dispatcher = new EventDispatcher(); // Add message schema validator. $event_dispatcher->addSubscriber(new MessageSchemaValidator('schema/stream/robocloud/message')); // Add error handler(s). $event_dispatcher->addSubscriber(new KinesisConsumerErrorConsoleLogger()); // Create filter instance that will be used to filter out only those messages // that you are interested in. $filter = new KeepAllFilter(); // The transformer layer is responsible for extracting and processing the // message data into a form that is expected by your backend. $keep_original_transformer = new KeepOriginalTransformer(); // Finally provide your backend that will finish the message processing. $keep_in_memory_backend = new KeepInMemoryBackend(); // Add the message processor as the subscriber that will be used // during consuming to process the messages. $event_dispatcher->addSubscriber(new DefaultProcessor($filter, $keep_original_transformer, $keep_in_memory_backend)); // Get the message factory that will be used for creating the message objects // from the data pulled from Kinesis. $message_factory = new MessageFactory(Message::class, $event_dispatcher); // Create instance of the Kinesis client factory. $kinesis_factory = new KinesisClientFactory('2013-12-02', 'eu-west-1'); // Provide cache that will be used to cache shard info. $cache = new FilesystemCache(); // Provide the recovery object used to store last read position. $consumer_recovery = new ConsumerRecovery($stream_name, 'Shard-000001', '/tmp/consumer_recovery.rec'); // Instantiate the consumer and consume messages from Kinesis stream. $consumer = new Consumer( $kinesis_factory->getKinesisClient('AKIAINK5P33X2KBK2RAQ', 'EuUdvE7WW0SKaEpGWMWHvN5M+gIjGaoLAVTYzzhV'), $message_factory, $event_dispatcher, $cache, $consumer_recovery ); // One process $consumer->consume(0); // Print the messages to see what we pulled from the stream. var_dump($keep_in_memory_backend->flush()); // Note that this example is very trivial not providing any real functionality. // To get better idea on how to use message processor see other filter, // transformer and backend classes in the MessageProcessing namespace.