b2pweb/bdf-prime-events

用于监听MySQL复制协议事件的Prime库

v2.0.1 2024-02-28 14:27 UTC

This package is auto-updated.

Last update: 2024-09-28 15:49:43 UTC


README

build codecov Packagist Version Total Downloads Type Coverage

Prime扩展,用于监听Prime实体上的MySQL复制事件,以跟踪插入、更新和删除操作。

安装

使用composer安装

composer require b2pweb/bdf-prime-events

Symfony上的配置

config/bundles.php 中注册

<?php

return [
    // ...
    Bdf\PrimeEvents\Bundle\PrimeEventsBundle::class => ['all' => true],
    Bdf\PrimeBundle\PrimeBundle::class => ['all' => true],
];

config/packages/prime_events.yaml 中配置索引

prime_events:
  # Configure replication connection parameter here, by connection name
  my_connection:
    user: other_user # Define a custom username/password which as REPLICATION CLIENT and REPLICATION SLAVE permissions
    password: other_pass
    logPositionFile: '%kernel.project_dir%/var/events' # The file for store the last consumed event, to allow restart consumer without loose events 

在应用上启用自动配置,以让Symfony容器配置监听器

services:
  _defaults:
    autowire: true
    autoconfigure: true

  App\Entity\Listener\:
    resource: './src/Entity/Listener'

配置MySQL

有关在MySQL服务器上启用复制协议的设置,请参阅 krowinski/php-mysql-replication

使用方法

Prime实体用于事件,请参阅 创建您的映射器 以定义一个实体。

简单用法/无Symfony

简单地创建一个 EntityEventsConsumer,定义监听器,并运行消费者

use Bdf\PrimeEvents\EntityEventsConsumer;
use MySQLReplication\Config\ConfigBuilder;

$consumer = new EntityEventsConsumer(
    $prime, // The ServiceLocator instance
    __DIR__.'/mysql_last_event.log', // File for store the last consumed event, to allow restart without loosing events
    function (ConfigBuilder $config) {
        $config
            // Define custom connection configuration
            // Note: by default, the connection user and password is used
            // So it's not required to redefine it if the user has the replication permissions
            ->withUser('replication_user')
            ->withPassword('replication_pass')
            // Define the slave id. define this value is required if you want to run multiple
            // consumers on the same database
            ->withSlaveId(123) 
        ;
    }
);

// Configure listener for MyEntity
$consumer->forEntity(MyEntity::class)
    ->inserted(function (MyEntity $entity) { /* $entity has been inserted */})
    ->updated(function (MyEntity $before, MyEntity $now) { /* The entity has been updated. $before is its value before the update, and $now the current value */ })
    ->deleted(function (MyEntity $entity) { /* $entity has been deleted */})
;

// Other entities may be configure...

// Consume all events
// Note: consume() will only consume 1 event
while ($running) {
    $consumer->consume();
}

// Stop the consumer and save the last consumed events
$consumer->stop();

使用Symfony的用法

如果实现 EntityEventsListenerInterface,则Symfony将自动配置监听器

use Bdf\PrimeEvents\Factory\EntityEventsListenerInterface;

/**
 * @implements EntityEventsListenerInterface<MyEntity>
 */
class MyEntityListeners implements EntityEventsListenerInterface
{
    /**
     * {@inheritdoc}
     */
    public function entityClass() : string
    {
        return MyEntity::class;
    }

    /**
     * {@inheritdoc} 
     * @param MyEntity $entity
     */
    public function onInsert($entity) : void
    {
        // $entity has been inserted
    }
    
    /**
     * {@inheritdoc} 
     * @param MyEntity $oldEntity
     * @param MyEntity $newEntity
     */
    public function onUpdate($oldEntity, $newEntity) : void
    {
        // The entity has been updated.
        // $before is its value before the update, and $now the current value
    }
    
    /**
     * {@inheritdoc} 
     * @param MyEntity $entity
     */
    public function onDelete($entity) : void
    {
        // $entity has been deleted
    }  
}

要消费事件,只需运行 prime:events:consume 命令

bin/console prime:events:consume my_connection --limit 10000 --memory 500m