mattiabasone/avro-serde-php

一个使用 confluent schema registry 进行 Avro 记录序列化和反序列化的库

1.1.0 2024-08-12 15:43 UTC

This package is auto-updated.

Last update: 2024-09-12 16:12:31 UTC


README

php-confluent-serde Actions Status Maintainability Test Coverage Latest Stable Version Total Downloads License

动机

在序列化和反序列化消息使用 Avro 序列化格式时,特别是当与 Confluent 平台集成时,您需要确保模式以不会影响下游消费者的方式演进。

因此,Confluent 开发了 Schema Registry,它负责根据可配置的兼容性策略验证给定的模式演进。

不幸的是,Confluent 没有为 PHP 提供官方的 Avro SerDe 包。这个库旨在为 PHP 提供一个 Avro SerDe 库,该库实现了 Confluent 的 wire format 并集成了 FlixTech 的 Schema Registry Client

安装

这个库使用 composer 包管理器 进行 PHP。

composer require 'flix-tech/avro-serde-php:^1.6'

快速入门

注意

您应该始终使用缓存的 schema registry 客户端,否则您将为每个序列化或反序列化的消息发出 HTTP 请求。

1. 创建缓存的 Schema Registry 客户端

有关更多详细信息,请参阅 Schema Registry 客户端缓存文档

<?php

use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use GuzzleHttp\Client;

$schemaRegistryClient = new CachedRegistry(
    new PromisingRegistry(
        new Client(['base_uri' => 'registry.example.com'])
    ),
    new AvroObjectCacheAdapter()
);

2. 构建 RecordSerializer 实例

RecordSerializer 是您与这个库交互的主要方式。它提供了用于 SerDe 操作的 encodeRecorddecodeMessage 方法。

<?php

use FlixTech\AvroSerializer\Objects\RecordSerializer;

/** @var \FlixTech\SchemaRegistryApi\Registry $schemaRegistry */
$recordSerializer = new RecordSerializer(
    $schemaRegistry,
    [
        // If you want to auto-register missing schemas set this to true
        RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false,
        // If you want to auto-register missing subjects set this to true
        RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => false,
    ]
);

3. 编码记录

这是一个如何使用 RecordSerializer 在 Confluent Avro wire format 中编码消息的简单示例。

<?php

/** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */
$subject = 'my-topic-value';
$avroSchema = AvroSchema::parse('{"type": "string"}');
$record = 'Test message';

$encodedBinaryAvro = $recordSerializer->encodeRecord($subject, $avroSchema, $record);
// Send this over the wire...

4. 解码消息

这是一个如何使用 RecordSerializer 解码消息的简单示例。

<?php

/** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */
/** @var string $encodedBinaryAvro */
$record = $recordSerializer->decodeMessage($encodedBinaryAvro);

echo $record; // 'Test message'

Schema Resolvers

Schema Resolvers 负责知道哪种 Avro 模式属于哪种类型的记录。如果您想将 Avro 模式分别保存在单独的文件中,这特别有用。Schema Resolvers 允许您将此库范围之外的任何模式管理概念集成进来。

Schema Resolvers 接受任何类型的 $record 并尝试为它解析一个匹配的 AvroSchema 实例。

FileResolver

在即使是中等复杂的应用程序中,您也想在 VCS 中管理您的模式,最可能的是作为 .avsc 文件。这些文件包含描述 Avro 模式的 JSON。

解析器接受一个 $baseDir,您想在其中管理文件,以及一个 callable 弯曲器,这是一个简单函数,它接受记录作为第一个参数,以及一个表示弯曲是否针对键模式($isKey)的布尔参数。

<?php

namespace MyNamespace;

use FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver;
use function get_class;use function is_object;
use function str_replace;

class MyRecord {}

$record = new MyRecord();

$baseDir = __DIR__ . '/files';

$inflector = static function ($record, bool $isKey) {
    $ext = $isKey ? '.key.avsc' : '.avsc';
    $fileName = is_object($record)
        ? str_replace('\\', '.', get_class($record))
        : 'default';
    
    return $fileName . $ext;
};


echo $inflector($record, false); // MyNamespace.MyRecord.avsc
echo $inflector($record, true); // MyNamespace.MyRecord.key.avsc

$resolver = new FileResolver($baseDir, $inflector);

$resolver->valueSchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.avsc
$resolver->keySchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.key.avsc

CallableResolver

这是一个最简单但也最灵活的解析器。它只接受两个 callables,分别负责获取值或键模式。键模式解析器是可选的。

<?php

use FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver;
use PHPUnit\Framework\Assert;
use function Widmogrod\Functional\constt;

$valueSchemaJson = '
{
  "type": "record",
  "name": "user",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}
';
$valueSchema = AvroSchema::parse($valueSchemaJson);

$resolver = new CallableResolver(
    constt(
        AvroSchema::parse($valueSchemaJson)
    )
);

$record = [ 'foo' => 'bar' ];

$schema = $resolver->valueSchemaFor($record);

Assert::assertEquals($schema, $valueSchema);

DefinitionInterfaceResolver

此库还提供了一个 HasSchemaDefinitionInterface 接口,该接口公开了两个静态方法

  • HasSchemaDefinitionInterface::valueSchemaJson 返回值的模式定义的 JSON 字符串
  • HasSchemaDefinitionInterface::keySchemaJson 返回 NULL 或键的模式定义的 JSON 字符串。

DefinitionInterfaceResolver 检查给定的记录是否实现该接口(如果没有,将抛出 InvalidArgumentException)并通过静态方法解决模式。

<?php

namespace MyNamespace;

use FlixTech\AvroSerializer\Objects\HasSchemaDefinitionInterface;
use FlixTech\AvroSerializer\Objects\SchemaResolvers\DefinitionInterfaceResolver;

class MyRecord implements HasSchemaDefinitionInterface {
    public static function valueSchemaJson() : string
    {
        return '
               {
                 "type": "record",
                 "name": "user",
                 "fields": [
                   {"name": "name", "type": "string"},
                   {"name": "age", "type": "int"}
                 ]
               }
               ';
    }
    
    public static function keySchemaJson() : ?string
    {
        return '{"type": "string"}';
    }
}

$record = new MyRecord();

$resolver = new DefinitionInterfaceResolver();

$resolver->valueSchemaFor($record); // Will resolve from $record::valueSchemaJson();
$resolver->keySchemaFor($record); // Will resolve from $record::keySchemaJson();

链式解析器

链式解析器是组合多个解析器的有用工具。第一个能够解决模式解析的解析器将获胜。如果链中的任何解析器都无法确定模式,将抛出 InvalidArgumentException

<?php

namespace MyNamespace;

use FlixTech\AvroSerializer\Objects\SchemaResolvers\ChainResolver;

$record = ['foo' => 'bar'];

/** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver $fileResolver */
/** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver $callableResolver */

$resolver = new ChainResolver($fileResolver, $callableResolver);
// or new ChainResolver(...[$fileResolver, $callableResolver]);

$resolver->valueSchemaFor($record); // Will resolve $fileResolver, then $callableResolver
$resolver->keySchemaFor($record); // Will resolve $fileResolver, then $callableResolver

Symfony 序列化器集成

此库提供了与 Symfony 序列化器组件 的集成。

<?php

use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder;
use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory;
use PHPUnit\Framework\Assert;
use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer;
use Symfony\Component\Serializer\Serializer;

class User
{
    /** @var string */
    private $name;

    /** @var int */
    private $age;

    public function __construct(string $name, int $age)
    {
        $this->name = $name;
        $this->age = $age;
    }

    public function getName(): string
    {
        return $this->name;
    }

    public function setName(string $name): void
    {
        $this->name = $name;
    }

    public function getAge(): int
    {
        return $this->age;
    }

    public function setAge(int $age): void
    {
        $this->age = $age;
    }
}

$recordSerializer = DefaultRecordSerializerFactory::get(
    getenv('SCHEMA_REGISTRY_HOST')
);

$avroSchemaJson = '{
  "type": "record",
  "name": "user",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}';

$user = new User('Thomas', 38);

$normalizer = new GetSetMethodNormalizer();
$encoder = new AvroSerDeEncoder($recordSerializer);

$symfonySerializer = new Serializer([$normalizer], [$encoder]);

$serialized = $symfonySerializer->serialize(
    $user,
    AvroSerDeEncoder::FORMAT_AVRO,
    [
        AvroSerDeEncoder::CONTEXT_ENCODE_SUBJECT => 'users-value',
        AvroSerDeEncoder::CONTEXT_ENCODE_WRITERS_SCHEMA => AvroSchema::parse($avroSchemaJson),
    ]
);

$deserializedUser = $symfonySerializer->deserialize(
    $serialized,
    User::class,
    AvroSerDeEncoder::FORMAT_AVRO
);

Assert::assertEquals($deserializedUser, $user);

名称转换器

有时您的属性名称可能与您的模式中的字段名称不同。解决此问题的一个选项是使用 自定义序列化器注解。但是,如果您使用此库提供的注解,您可以使用我们的 名称转换器,它解析这些注解并将模式字段名称与属性名称映射。

<?php

use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder;
use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\NameConverter\AvroNameConverter;
use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory;
use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer;
use Symfony\Component\Serializer\Serializer;
use Doctrine\Common\Annotations\AnnotationReader as DoctrineAnnotationReader;
use Doctrine\Common\Annotations\AnnotationRegistry;
use FlixTech\AvroSerializer\Objects\Schema\Generation\AnnotationReader;

$recordSerializer = DefaultRecordSerializerFactory::get(
    getenv('SCHEMA_REGISTRY_HOST')
);

AnnotationRegistry::registerLoader('class_exists');

$reader = new AnnotationReader(
    new DoctrineAnnotationReader()
);

$nameConverter = new AvroNameConverter($reader);

$normalizer = new GetSetMethodNormalizer(null, $nameConverter);
$encoder = new AvroSerDeEncoder($recordSerializer);

$symfonySerializer = new Serializer([$normalizer], [$encoder]);

模式构建器

此库还提供了使用 PHP 定义模式的方法,这与 Java SDK 提供的 SchemaBuilder API 非常相似

<?php

use FlixTech\AvroSerializer\Objects\Schema;
use FlixTech\AvroSerializer\Objects\Schema\Record\FieldOption;

Schema::record()
    ->name('object')
    ->namespace('org.acme')
    ->doc('A test object')
    ->aliases(['stdClass', 'array'])
    ->field('name', Schema::string(), FieldOption::doc('Name of the object'), FieldOption::orderDesc())
    ->field('answer', Schema::int(), FieldOption::default(42), FieldOption::orderAsc(), FieldOption::aliases('wrong', 'correct'))
    ->field('ignore', Schema::boolean(), FieldOption::orderIgnore())
    ->parse();

模式生成器

除了提供用于定义模式的流畅 API 之外,我们还提供了从类元数据(注解)生成模式的方法。为此,您必须安装 doctrine/annotations 包。

<?php

use FlixTech\AvroSerializer\Objects\DefaultSchemaGeneratorFactory;
use FlixTech\AvroSerializer\Objects\Schema\Generation\Annotations as SerDe;

/**
 * @SerDe\AvroType("record")
 * @SerDe\AvroName("user")
 */
class User
{
    /**
     * @SerDe\AvroType("string")
     * @var string
     */
    private $firstName;

    /**
     * @SerDe\AvroType("string")
     * @var string
     */
    private $lastName;

    /**
     * @SerDe\AvroType("int")
     * @var int
     */
    private $age;

    public function __construct(string $firstName, string $lastName, int $age)
    {
        $this->firstName = $firstName;
        $this->lastName = $lastName;
        $this->age = $age;
    }

    public function getFirstName(): string
    {
        return $this->firstName;
    }

    public function getLastName(): string
    {
        return $this->lastName;
    }

    public function getAge(): int
    {
        return $this->age;
    }
}

$generator = DefaultSchemaGeneratorFactory::get();

$schema = $generator->generate(User::class);
$avroSchema = $schema->parse();

有关可能注解的更多示例,请参阅 测试用例

示例

此库在 examples 文件夹中提供了一些可执行示例。您应该查看这些示例以了解此库的工作原理。