b2pweb / bdf-prime-events
用于监听MySQL复制协议事件的Prime库
v2.0.1
2024-02-28 14:27 UTC
Requires
- php: ~7.4 | ~8.0.0 | ~8.1.0 | ~8.2.0 | ~8.3.0
- b2pweb/bdf-prime: ~1.1|~2.0
- moln/php-mysql-replication: ~1.2
- psr/simple-cache: ^1.0@stable | ^3.0@stable
Requires (Dev)
- ext-pcntl: *
- b2pweb/bdf-prime-bundle: ~1.0
- phpunit/phpunit: ~9.0
- squizlabs/php_codesniffer: ~3.0
- symfony/console: ~5.0|~6.0
- symfony/framework-bundle: ~5.0|~6.0
- symfony/yaml: ~5.0|~6.0
- vimeo/psalm: ~5.0
README
Prime扩展,用于监听Prime实体上的MySQL复制事件,以跟踪插入、更新和删除操作。
安装
使用composer安装
composer require b2pweb/bdf-prime-events
Symfony上的配置
在 config/bundles.php
中注册
<?php return [ // ... Bdf\PrimeEvents\Bundle\PrimeEventsBundle::class => ['all' => true], Bdf\PrimeBundle\PrimeBundle::class => ['all' => true], ];
在 config/packages/prime_events.yaml
中配置索引
prime_events: # Configure replication connection parameter here, by connection name my_connection: user: other_user # Define a custom username/password which as REPLICATION CLIENT and REPLICATION SLAVE permissions password: other_pass logPositionFile: '%kernel.project_dir%/var/events' # The file for store the last consumed event, to allow restart consumer without loose events
在应用上启用自动配置,以让Symfony容器配置监听器
services: _defaults: autowire: true autoconfigure: true App\Entity\Listener\: resource: './src/Entity/Listener'
配置MySQL
有关在MySQL服务器上启用复制协议的设置,请参阅 krowinski/php-mysql-replication。
使用方法
Prime实体用于事件,请参阅 创建您的映射器 以定义一个实体。
简单用法/无Symfony
简单地创建一个 EntityEventsConsumer
,定义监听器,并运行消费者
use Bdf\PrimeEvents\EntityEventsConsumer; use MySQLReplication\Config\ConfigBuilder; $consumer = new EntityEventsConsumer( $prime, // The ServiceLocator instance __DIR__.'/mysql_last_event.log', // File for store the last consumed event, to allow restart without loosing events function (ConfigBuilder $config) { $config // Define custom connection configuration // Note: by default, the connection user and password is used // So it's not required to redefine it if the user has the replication permissions ->withUser('replication_user') ->withPassword('replication_pass') // Define the slave id. define this value is required if you want to run multiple // consumers on the same database ->withSlaveId(123) ; } ); // Configure listener for MyEntity $consumer->forEntity(MyEntity::class) ->inserted(function (MyEntity $entity) { /* $entity has been inserted */}) ->updated(function (MyEntity $before, MyEntity $now) { /* The entity has been updated. $before is its value before the update, and $now the current value */ }) ->deleted(function (MyEntity $entity) { /* $entity has been deleted */}) ; // Other entities may be configure... // Consume all events // Note: consume() will only consume 1 event while ($running) { $consumer->consume(); } // Stop the consumer and save the last consumed events $consumer->stop();
使用Symfony的用法
如果实现 EntityEventsListenerInterface
,则Symfony将自动配置监听器
use Bdf\PrimeEvents\Factory\EntityEventsListenerInterface; /** * @implements EntityEventsListenerInterface<MyEntity> */ class MyEntityListeners implements EntityEventsListenerInterface { /** * {@inheritdoc} */ public function entityClass() : string { return MyEntity::class; } /** * {@inheritdoc} * @param MyEntity $entity */ public function onInsert($entity) : void { // $entity has been inserted } /** * {@inheritdoc} * @param MyEntity $oldEntity * @param MyEntity $newEntity */ public function onUpdate($oldEntity, $newEntity) : void { // The entity has been updated. // $before is its value before the update, and $now the current value } /** * {@inheritdoc} * @param MyEntity $entity */ public function onDelete($entity) : void { // $entity has been deleted } }
要消费事件,只需运行 prime:events:consume
命令
bin/console prime:events:consume my_connection --limit 10000 --memory 500m