mattiabasone / avro-serde-php
一个使用 confluent schema registry 进行 Avro 记录序列化和反序列化的库
Requires
- php: ^8.1
- beberlei/assert: ^2.9.9|~3.0
- flix-tech/avro-php: ^5.0
- flix-tech/confluent-schema-registry-api: ^8.0
- guzzlehttp/promises: ^1.4.0|^2.0.0
- widmogrod/php-functional: ^6.0
Requires (Dev)
- doctrine/annotations: ^1.11
- phpbench/phpbench: 1.0.0-alpha2
- phpunit/phpunit: ^10.5
- rector/rector: ^1.2
- roave/security-advisories: dev-latest
- symfony/serializer: ^6.4|^7.0
- vlucas/phpdotenv: ~2.4
Suggests
- doctrine/annotations: To enable the generation of avro schemas from annotations
- symfony/serializer: To integrate avro-serde-php into symfony ecosystem
This package is auto-updated.
Last update: 2024-09-12 16:12:31 UTC
README
动机
在序列化和反序列化消息使用 Avro 序列化格式时,特别是当与 Confluent 平台集成时,您需要确保模式以不会影响下游消费者的方式演进。
因此,Confluent 开发了 Schema Registry,它负责根据可配置的兼容性策略验证给定的模式演进。
不幸的是,Confluent 没有为 PHP 提供官方的 Avro SerDe 包。这个库旨在为 PHP 提供一个 Avro SerDe 库,该库实现了 Confluent 的 wire format 并集成了 FlixTech 的 Schema Registry Client。
安装
这个库使用 composer 包管理器 进行 PHP。
composer require 'flix-tech/avro-serde-php:^1.6'
快速入门
注意
您应该始终使用缓存的 schema registry 客户端,否则您将为每个序列化或反序列化的消息发出 HTTP 请求。
1. 创建缓存的 Schema Registry 客户端
有关更多详细信息,请参阅 Schema Registry 客户端缓存文档。
<?php use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use GuzzleHttp\Client; $schemaRegistryClient = new CachedRegistry( new PromisingRegistry( new Client(['base_uri' => 'registry.example.com']) ), new AvroObjectCacheAdapter() );
2. 构建 RecordSerializer
实例
RecordSerializer
是您与这个库交互的主要方式。它提供了用于 SerDe 操作的 encodeRecord
和 decodeMessage
方法。
<?php use FlixTech\AvroSerializer\Objects\RecordSerializer; /** @var \FlixTech\SchemaRegistryApi\Registry $schemaRegistry */ $recordSerializer = new RecordSerializer( $schemaRegistry, [ // If you want to auto-register missing schemas set this to true RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false, // If you want to auto-register missing subjects set this to true RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => false, ] );
3. 编码记录
这是一个如何使用 RecordSerializer
在 Confluent Avro wire format 中编码消息的简单示例。
<?php /** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */ $subject = 'my-topic-value'; $avroSchema = AvroSchema::parse('{"type": "string"}'); $record = 'Test message'; $encodedBinaryAvro = $recordSerializer->encodeRecord($subject, $avroSchema, $record); // Send this over the wire...
4. 解码消息
这是一个如何使用 RecordSerializer
解码消息的简单示例。
<?php /** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */ /** @var string $encodedBinaryAvro */ $record = $recordSerializer->decodeMessage($encodedBinaryAvro); echo $record; // 'Test message'
Schema Resolvers
Schema Resolvers 负责知道哪种 Avro 模式属于哪种类型的记录。如果您想将 Avro 模式分别保存在单独的文件中,这特别有用。Schema Resolvers 允许您将此库范围之外的任何模式管理概念集成进来。
Schema Resolvers 接受任何类型的 $record
并尝试为它解析一个匹配的 AvroSchema
实例。
FileResolver
在即使是中等复杂的应用程序中,您也想在 VCS 中管理您的模式,最可能的是作为 .avsc
文件。这些文件包含描述 Avro 模式的 JSON。
解析器接受一个 $baseDir
,您想在其中管理文件,以及一个 callable
弯曲器,这是一个简单函数,它接受记录作为第一个参数,以及一个表示弯曲是否针对键模式($isKey
)的布尔参数。
<?php namespace MyNamespace; use FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver; use function get_class;use function is_object; use function str_replace; class MyRecord {} $record = new MyRecord(); $baseDir = __DIR__ . '/files'; $inflector = static function ($record, bool $isKey) { $ext = $isKey ? '.key.avsc' : '.avsc'; $fileName = is_object($record) ? str_replace('\\', '.', get_class($record)) : 'default'; return $fileName . $ext; }; echo $inflector($record, false); // MyNamespace.MyRecord.avsc echo $inflector($record, true); // MyNamespace.MyRecord.key.avsc $resolver = new FileResolver($baseDir, $inflector); $resolver->valueSchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.avsc $resolver->keySchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.key.avsc
CallableResolver
这是一个最简单但也最灵活的解析器。它只接受两个 callables
,分别负责获取值或键模式。键模式解析器是可选的。
<?php use FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver; use PHPUnit\Framework\Assert; use function Widmogrod\Functional\constt; $valueSchemaJson = ' { "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] } '; $valueSchema = AvroSchema::parse($valueSchemaJson); $resolver = new CallableResolver( constt( AvroSchema::parse($valueSchemaJson) ) ); $record = [ 'foo' => 'bar' ]; $schema = $resolver->valueSchemaFor($record); Assert::assertEquals($schema, $valueSchema);
DefinitionInterfaceResolver
此库还提供了一个 HasSchemaDefinitionInterface
接口,该接口公开了两个静态方法
HasSchemaDefinitionInterface::valueSchemaJson
返回值的模式定义的 JSON 字符串HasSchemaDefinitionInterface::keySchemaJson
返回NULL
或键的模式定义的 JSON 字符串。
DefinitionInterfaceResolver
检查给定的记录是否实现该接口(如果没有,将抛出 InvalidArgumentException
)并通过静态方法解决模式。
<?php namespace MyNamespace; use FlixTech\AvroSerializer\Objects\HasSchemaDefinitionInterface; use FlixTech\AvroSerializer\Objects\SchemaResolvers\DefinitionInterfaceResolver; class MyRecord implements HasSchemaDefinitionInterface { public static function valueSchemaJson() : string { return ' { "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] } '; } public static function keySchemaJson() : ?string { return '{"type": "string"}'; } } $record = new MyRecord(); $resolver = new DefinitionInterfaceResolver(); $resolver->valueSchemaFor($record); // Will resolve from $record::valueSchemaJson(); $resolver->keySchemaFor($record); // Will resolve from $record::keySchemaJson();
链式解析器
链式解析器是组合多个解析器的有用工具。第一个能够解决模式解析的解析器将获胜。如果链中的任何解析器都无法确定模式,将抛出 InvalidArgumentException
。
<?php namespace MyNamespace; use FlixTech\AvroSerializer\Objects\SchemaResolvers\ChainResolver; $record = ['foo' => 'bar']; /** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver $fileResolver */ /** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver $callableResolver */ $resolver = new ChainResolver($fileResolver, $callableResolver); // or new ChainResolver(...[$fileResolver, $callableResolver]); $resolver->valueSchemaFor($record); // Will resolve $fileResolver, then $callableResolver $resolver->keySchemaFor($record); // Will resolve $fileResolver, then $callableResolver
Symfony 序列化器集成
此库提供了与 Symfony 序列化器组件 的集成。
<?php use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder; use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory; use PHPUnit\Framework\Assert; use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer; use Symfony\Component\Serializer\Serializer; class User { /** @var string */ private $name; /** @var int */ private $age; public function __construct(string $name, int $age) { $this->name = $name; $this->age = $age; } public function getName(): string { return $this->name; } public function setName(string $name): void { $this->name = $name; } public function getAge(): int { return $this->age; } public function setAge(int $age): void { $this->age = $age; } } $recordSerializer = DefaultRecordSerializerFactory::get( getenv('SCHEMA_REGISTRY_HOST') ); $avroSchemaJson = '{ "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] }'; $user = new User('Thomas', 38); $normalizer = new GetSetMethodNormalizer(); $encoder = new AvroSerDeEncoder($recordSerializer); $symfonySerializer = new Serializer([$normalizer], [$encoder]); $serialized = $symfonySerializer->serialize( $user, AvroSerDeEncoder::FORMAT_AVRO, [ AvroSerDeEncoder::CONTEXT_ENCODE_SUBJECT => 'users-value', AvroSerDeEncoder::CONTEXT_ENCODE_WRITERS_SCHEMA => AvroSchema::parse($avroSchemaJson), ] ); $deserializedUser = $symfonySerializer->deserialize( $serialized, User::class, AvroSerDeEncoder::FORMAT_AVRO ); Assert::assertEquals($deserializedUser, $user);
名称转换器
有时您的属性名称可能与您的模式中的字段名称不同。解决此问题的一个选项是使用 自定义序列化器注解。但是,如果您使用此库提供的注解,您可以使用我们的 名称转换器,它解析这些注解并将模式字段名称与属性名称映射。
<?php use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder; use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\NameConverter\AvroNameConverter; use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory; use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer; use Symfony\Component\Serializer\Serializer; use Doctrine\Common\Annotations\AnnotationReader as DoctrineAnnotationReader; use Doctrine\Common\Annotations\AnnotationRegistry; use FlixTech\AvroSerializer\Objects\Schema\Generation\AnnotationReader; $recordSerializer = DefaultRecordSerializerFactory::get( getenv('SCHEMA_REGISTRY_HOST') ); AnnotationRegistry::registerLoader('class_exists'); $reader = new AnnotationReader( new DoctrineAnnotationReader() ); $nameConverter = new AvroNameConverter($reader); $normalizer = new GetSetMethodNormalizer(null, $nameConverter); $encoder = new AvroSerDeEncoder($recordSerializer); $symfonySerializer = new Serializer([$normalizer], [$encoder]);
模式构建器
此库还提供了使用 PHP 定义模式的方法,这与 Java SDK 提供的 SchemaBuilder API 非常相似
<?php use FlixTech\AvroSerializer\Objects\Schema; use FlixTech\AvroSerializer\Objects\Schema\Record\FieldOption; Schema::record() ->name('object') ->namespace('org.acme') ->doc('A test object') ->aliases(['stdClass', 'array']) ->field('name', Schema::string(), FieldOption::doc('Name of the object'), FieldOption::orderDesc()) ->field('answer', Schema::int(), FieldOption::default(42), FieldOption::orderAsc(), FieldOption::aliases('wrong', 'correct')) ->field('ignore', Schema::boolean(), FieldOption::orderIgnore()) ->parse();
模式生成器
除了提供用于定义模式的流畅 API 之外,我们还提供了从类元数据(注解)生成模式的方法。为此,您必须安装 doctrine/annotations
包。
<?php use FlixTech\AvroSerializer\Objects\DefaultSchemaGeneratorFactory; use FlixTech\AvroSerializer\Objects\Schema\Generation\Annotations as SerDe; /** * @SerDe\AvroType("record") * @SerDe\AvroName("user") */ class User { /** * @SerDe\AvroType("string") * @var string */ private $firstName; /** * @SerDe\AvroType("string") * @var string */ private $lastName; /** * @SerDe\AvroType("int") * @var int */ private $age; public function __construct(string $firstName, string $lastName, int $age) { $this->firstName = $firstName; $this->lastName = $lastName; $this->age = $age; } public function getFirstName(): string { return $this->firstName; } public function getLastName(): string { return $this->lastName; } public function getAge(): int { return $this->age; } } $generator = DefaultSchemaGeneratorFactory::get(); $schema = $generator->generate(User::class); $avroSchema = $schema->parse();
有关可能注解的更多示例,请参阅 测试用例。
示例
此库在 examples 文件夹中提供了一些可执行示例。您应该查看这些示例以了解此库的工作原理。