cleancode / simple-bus-on-steroids
Requires
- php: >=7.0.0
- doctrine/doctrine-bundle: ~1.6
- doctrine/orm: ~2.5
- ramsey/uuid: ~3.5|~2.0
- sensio/framework-extra-bundle: ~3.0
- simple-bus/doctrine-orm-bridge: ~4.0
- simple-bus/jms-serializer-bundle-bridge: ~2.0
- simple-bus/symfony-bridge: ~4.1
- symfony/monolog-bundle: ~3.0|~2.8
This package is auto-updated.
Last update: 2024-08-29 04:09:15 UTC
README
描述
为异步simplebus扩展。提供多个功能以保护你的异步工作。
将 cleancode/simple-bus-on-steroids 添加到你的 composer.json
它提供什么
- 事件订阅者存在事务
- 由于每个事件可能有多个订阅者,每个订阅者都在自己的事务中
处理
。
如果其中一个订阅者失败,其他订阅者不会被中断并正常完成 - 每个事件都被
装饰
为- 事件 ID(这是事件的唯一标识符)
- 关联 ID(这是所有相关事件的 ID)
- 父 ID(这是触发此事件的事件的 ID)
- 发生时间
- 可选的描述,这可能用于创建事件源实体
- 实体发布的每个事件都保存与实体相同的同一事务中。
这确保了事件永远不会丢失 - 事件通过所谓的
异步 rabbit 模式
推送到队列,这意味着异步工作器从数据库中取出它们并将它们推送
到 rabbitmq。 - 如果事件失败,它将在特定的时间(可配置)内
重新入队
- 如果事件失败超过 x 次(可配置),它将被放入死信队列(可配置位置)
- 每个事件订阅者只能
成功处理
事件一次。
这意味着,如果一个订阅者失败而另一个成功完成,当消息重新入队时,成功的订阅者将不再处理该消息 - 有可能
"重启"
异步生产者,因此所有事件将再次发送。这可以通过从数据库中删除已发布事件的详细信息来完成。
这可以用于重新创建 Read Model 例如 - 少量配置即可开始使用
- 有可能为事件命名,与类名无关。
因此,当您更改类名时,数据库中已有的事件不会受到影响 - 有可能为订阅者命名,与类名无关。关于哪个订阅者处理了哪个事件的信息存储在数据库中。因此,当您更改订阅者的类名时,它不会产生任何影响
安装
在...
如果您已安装 LongRunningBundle,则需要将其删除或更改配置
long_running:
simple_bus_rabbit_mq: true
doctrine_orm: false
doctrine_dbal: false
monolog: true
swift_mailer: true
-
更改 rabbitmq 配置。您需要在您的 rabbitmq 实例中安装 delayed-message 插件
old_sound_rabbit_mq: producers: asynchronous_events: connection: default exchange_options: { name: 'asynchronous_events', type: x-delayed-message, arguments: {x-delayed-type: ['S', "topic"]} } consumers: asynchronous_events_consumer: connection: default exchange_options: { name: 'asynchronous_events', type: x-delayed-message, arguments: {x-delayed-type: ['S', "topic"]} } queue_options: { name: 'queue_asynchronous_events', routing_keys: ['all'] } callback: simple_bus.on_steorids.rabbit_mq_bundle_bridge.events_consumer qos_options: prefetch_count: 5
-
创建数据库结构
a) using doctrine migrations. Set in config.yml doctrine: orm: entity_managers: default: auto_mapping: true mappings: SimpleBusOnSteroidsBundle: type: yml dir: "Resources/Doctrine" prefix: CleanCode is_bundle: true b) using direct SQLs CREATE TABLE sb_event_store (event_meta_data_event_id VARCHAR(255) NOT NULL, event_data_event_name VARCHAR(255) NOT NULL, event_data_payload LONGTEXT NOT NULL COMMENT '(DC2Type:json_array)', event_meta_data_parent_id VARCHAR(255) DEFAULT NULL, event_meta_data_correlation_id VARCHAR(255) NOT NULL, event_meta_data_occurred_on DATETIME NOT NULL, event_meta_data_description VARCHAR(255) NOT NULL, PRIMARY KEY(event_meta_data_event_id)) CREATE TABLE sb_subscriber_handled_event (subscriber_name VARCHAR(255) NOT NULL, event_id VARCHAR(255) NOT NULL, PRIMARY KEY(subscriber_name, event_id)) CREATE TABLE sb_last_published_event (event_id VARCHAR(255) NOT NULL, PRIMARY KEY(event_id))
-
更改订阅者标签为
XML <tag name="asynchronous_steroids_event_subscriber" subscribes_to="Events\PersonWasCreated" subscriber_name="person_was_created_subscriber"/> Annotation @DI\Tag("asynchronous_steroids_event_subscriber", attributes={"subscribes_to" = "AppBundle\Entity\PersonWasCreated", "subscriber_name" = "person_was_created_sub"})
-
启动异步发布者
bin/console simplebus:async-producer -vvv
代码
来自实体
use SimpleBus\Message\Recorder\ContainsRecordedMessages
class Person implements ContainsRecordedMessages
{
}
如果您的类实现了 ContainsRecordedMessages
,事件将通过 recordedMessages()
方法自动检索并保存到数据库中。
直接通过事件存储
/** @var \CleanCode\SimpleBusOnSteroids\Middleware\EventStore\EventStore $eventStore*/
$eventStore = $container->get('simple_bus_event_store')
$eventStore->save([new SomeEvent()]);
如果您不在实体中存储事件,则需要自己使用事件存储来保存事件。
事件存储包含一个方法 save,该方法期望一个事件数组。
配置
simple_bus_on_steroids:
requeue_max_times: 3
requeue_time: 3
requeue_multiply_by: 3
dead_letter_exchange_name: asynchronous_events
dead_letter_queue_name: dead_letter
how_many_to_retrieve_at_once: 5
send_messages_every_seconds: 1.2
requeue_exchange_name: ""
requeue_routing_key: ""
requeue_max_times - Max tries of requeue before message will go to the dead letter queue (default: 3)
requeue_time - Amount of seconds before message will be handled after fail (default: 3)
requeue_multiply_by - How many times multiply requeue time for each time message which fail (default: 3)
dead_letter_exchange_name - Name of the exchange where broken message will be published (default: asynchronous_events)\
dead_letter_queue_name - Name of the queue where broken messages will be published (default: dead_letter)
how_many_to_retrieve_at_once - How many message should be retrieved at once to be published (default: 5)
send_messages_every_seconds - Break between publishing in seconds (default: 1.2)
requeue_exchange_name - Requeued message will be published to passed exchange name. If not passed it will be taken directly from message
requeue_routing_key - This routing key will be added to requeued message. If not passed it will be taken directly from message
requeue_exchange_name
和 requeue_routing_key
可用于,当您不希望重新入队的消息在其他交换中被其他应用程序再次处理时。
扩展
使用自己的名称保存事件
默认情况下,存储在数据库中的事件是按类名保存的(使用 CleanCode\SimpleBusOnSteroids\EventNameMapper\ClassNameEventNameMapper
)。这可能在您 想要更改
事件命名空间或类名时成为问题。
因为您数据库中已经存储了旧的类名。
为了处理这种情况
,您可能需要编写自己的事件映射器。
您可以通过在DI容器中扩展CleanCode\SimpleBusOnSteroids\EventNameMapper
并使用idsimple_bus_event_mapper
来实现。
示例
class EventNameMapper implements CleanCode\SimpleBusOnSteroids\EventNameMapper
{
const EVENT_NAME_MAPPER =
[
PersonWasRegistered::class => 'person_was_registered'
];
public function eventNameFrom($event) : string
{
if (array_key_exists(get_class($event), self::EVENT_NAME_MAPPER)) {
return self::EVENT_NAME_MAPPER[get_class($event)];
}
return '';
}
public function isMapped(string $eventName) : bool
{
return true;
}
public function classNameFrom(string $eventName) : string
{
$className = array_search($eventName, self::EVENT_NAME_MAPPER);
if ($className === false) {
return '';
}
return $className;
}
}
使用自定义名称保存订阅者信息
Simple Bus On Steroids通过将事件ID存储在订阅者名称中,以了解哪个事件已被处理。
默认情况下,它通过简单地保存事件类名来实现,但当您想要更改订阅者类名时,这可能会导致问题。
您可以通过添加标签属性"subscriber_name" = "person_was_created_subscriber"
来解决此问题。
XML
<tag name="asynchronous_steroids_event_subscriber" subscribes_to="Events\PersonWasCreated" subscriber_name="person_was_created_subscriber"/>
Annotation
@DI\Tag("asynchronous_steroids_event_subscriber", attributes={"subscribes_to" = "AppBundle\Entity\PersonWasCreated", "subscriber_name" = "person_was_created_sub"})