kaliop/queueingbundle-kinesis

Kaliop 队列组件 - Kinesis 插件

v0.2.1 2019-10-28 22:34 UTC

This package is auto-updated.

Last update: 2024-09-21 02:04:44 UTC


README

为 Kaliop 队列组件添加 AWS Kinesis 支持

参见:http://aws.amazon.com/kinesis/https://github.com/kaliop-uk/kueueingbundle

因为它比基本队列组件有更高的要求,所以它有一个自己的组件包

安装

  1. 使用 Composer 安装此组件包。

  2. 在您的内核类中启用 KaliopQueueingPluginsKinesisBundle 组件包,并在 registerBundles() 中启用 DoctrineCacheBundle。

  3. 如果不是开发环境,请清除所有缓存

使用方法

  1. 如果您没有 AWS 账户,请在 http://aws.amazon.com/ 注册一个账户。注意:Kinesis 没有免费层。定价信息请参阅:http://aws.amazon.com/kinesis/pricing/

  2. 使用 Web 界面创建 Kinesis 流:https://console.aws.amazon.com/kinesis/home

  3. 根据您的 AWS 账户设置配置

    • 编辑此组件包中的 parameters.yml
    • 将 kinesis_sample.yml 拷贝/包含到您的应用程序配置中,并根据需要编辑它
  4. 确保您能够列出流及其中的分片

     php app/console kaliop_queueing:managequeue list -bkinesis
     
     php app/console kaliop_queueing:managequeue info -bkinesis <stream>
    
  5. 向流中推送消息

     php app/console kaliop_queueing:queuemessage -bkinesis -r<shard-partition> <stream> <jsonpayload>
    
  6. 从流中接收消息

     php app/console kaliop_queueing:consumer -bkinesis -r<shard-id> <stream>
    

注意事项

  • Kinesis 将消息分组到流中,这些流被分成一个或多个分片。每条消息都推送到一个分片上,并必须从该分片上拉取。在 kalio-queueing 中,以下映射适用:

    • 流 => 队列
    • 分片 => 路由键
  • 默认情况下,Kinesis 在消息被消费时不会从其分片中删除消息。这意味着消费者必须保持对最后一个已消费消息的内部指针。使用一个服务来处理此值的本地存储 - 此组件包中提供的默认服务基于 Doctrine-Cache 组件,并必须配置为启用(参见上述第 7 点)。请注意,由于它使用缓存目录来存储数据,因此在任何缓存清除时,指针将重置,并将下载旧消息。要更改此行为,请参阅 parameters.yml 中的选项 kaliop_queueing_kinesis.default.missing_sequence_number_strategy

  • 当运行 kaliop_queueing:queuemessage 时,使用 -r 选项指定分片分区键是强制性的。注意:使用的值 不是 分片 id,而是经过散列后根据散列值选择分片

  • 当运行 kaliop_queueing:messageconsumer 时,使用 -r 选项指定分片 Id 是强制性的。如果您不知道分片 Id,请使用 kaliop_queueing:managequeue info <stream> 命令来查看它

License Latest Stable Version Total Downloads

Build Status Scrutinizer Code Quality SensioLabsInsight