userqq / mysql-binlog

MySQL/MariaDB binlog 监听器


README

此包使您能够在不增加数据库额外负载的情况下(如触发器)响应数据库中的更改,并且比类似解决方案运行得更快。它模拟副本并使用 MySQL/MariaDB 的 binlog 来跟踪数据库中的更改。当数据发生变化时,数据库将事件写入 binlog,该事件可以转换为 JSON。此 JSON 可以发送到任何目的地,例如 Apache Kafka、Kinesis、RabbitMQ 或直接发送到其他数据库,如 MongoDB、ClickHouse、Elastic,以确保快速高效的数据处理。使用此包,无论您的数据库有多大,都可以在几分钟或几秒钟的延迟内拥有数据库的副本。

这对于想要在其他系统中保留数据副本以实现更可靠的存储或使用其他工具处理数据的公司非常有用。它还可以用于将数据发送到消息队列,从而更容易实时处理和分析数据。因此,公司可以利用消息队列系统的优势,如负载均衡、可伸缩性和容错性,同时在其他系统中保持数据副本的可靠性。

安装

此包可以作为 Composer 依赖项安装。

composer require userqq/mysql-binlog 

配置

数据库配置

为了在不进行额外操作和不向 information_schema 发起过多请求的情况下解析 binlog 中的数据,数据库必须进行适当配置。这允许重新读取数据类型和列名而不进行额外请求,因此即使没有数据库,也可以重新读取 binlog 中存储的数据,同时仍然提供有关数据的所有详细信息。

此库需要 binlog_row_metadata 选项,该选项在 MySQL >= 8.0.1MariaDB >= 10.5.0 中可用。必须将 binlog_row_metadata 选项设置为 FULL,以确保 binlog 中包含所有必要元数据以进行正确解析。

只需将以下示例配置添加到数据库配置中,然后重新启动数据库并修剪旧的二进制日志

FLUSH BINARY LOGS; 
PURGE BINARY LOGS BEFORE NOW();

这将刷新当前二进制日志到磁盘,然后删除当前时间之前的所有二进制日志。

通常可以在 /etc/mysql 目录中找到配置。

[mysqld]
server-id           = 1
log_bin             = /var/log/mysql/mysql-bin.log
expire_logs_days    = 10
max_binlog_size     = 100M
binlog-format       = row
log_slave_updates   = on
binlog_row_image    = full
binlog_row_metadata = full
binlog_do_db        = mydatabase
net_read_timeout    = 3600
net_write_timeout   = 3600

binlog 读取器配置

为了简化配置,库公开了 UserQQ\MySQL\Binlog\Config

use UserQQ\MySQL\Binlog\Config;

$config = (new Config())
    ->withUser('root')
    ->withPassword('toor');

以下是一份可用的配置选项列表

withUser(string $user): static                   // Database user with replication privileges
withPassword(string $password): static           // Database user's password
withHost(string $host): static                   // Database host
withPort(int $port): static                      // Database host
withBinlogFile(?string $binlogFile): static      // Binlog file to start from
withBinlogPosition(?int $binlogPosition): static // Position in binlog file to start from

有关更多详细信息,请参阅 Config.php

运行应用程序

然后只需创建一个 UserQQ\MySQL\Binlog\EventsIterator 实例并遍历它。

<?php declare(strict_types=1);

require __DIR__ . '/vendor/autoload.php';

use UserQQ\MySQL\Binlog\Config;
use UserQQ\MySQL\Binlog\EventsIterator;

$config = (new Config())
    ->withUser('root')
    ->withPassword('toor');

$eventsIterator = new EventsIterator($config);

foreach ($eventsIterator as $position => $event) {
    echo json_encode($position) . PHP_EOL;
    echo json_encode($event, JSON_PRETTY_PRINT) . PHP_EOL;
    echo PHP_EOL;
}

在您的数据库上运行插入、更新和删除操作,看看它如何响应!

请参阅 examples 文件夹以获取更多示例。